From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from lists.gentoo.org (pigeon.gentoo.org [208.92.234.80]) by finch.gentoo.org (Postfix) with ESMTP id 0B6A1138200 for ; Thu, 22 Aug 2013 09:01:43 +0000 (UTC) Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id 58861E0B22; Thu, 22 Aug 2013 09:01:42 +0000 (UTC) Received: from smtp.gentoo.org (smtp.gentoo.org [140.211.166.183]) (using TLSv1 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by pigeon.gentoo.org (Postfix) with ESMTPS id 9709EE0B97 for ; Thu, 22 Aug 2013 09:01:41 +0000 (UTC) Received: from hornbill.gentoo.org (hornbill.gentoo.org [94.100.119.163]) (using TLSv1 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by smtp.gentoo.org (Postfix) with ESMTPS id 76DEF33EC8D for ; Thu, 22 Aug 2013 09:01:40 +0000 (UTC) Received: from localhost.localdomain (localhost [127.0.0.1]) by hornbill.gentoo.org (Postfix) with ESMTP id 304C8E5461 for ; Thu, 22 Aug 2013 09:01:39 +0000 (UTC) From: "André Erdmann" To: gentoo-commits@lists.gentoo.org Content-Transfer-Encoding: 8bit Content-type: text/plain; charset=UTF-8 Reply-To: gentoo-dev@lists.gentoo.org, "André Erdmann" Message-ID: <1377161428.2f5e40896c821d2a52c3ef53163a953217b0a366.dywi@gentoo> Subject: [gentoo-commits] proj/R_overlay:master commit in: roverlay/util/ X-VCS-Repository: proj/R_overlay X-VCS-Files: roverlay/util/hashpool.py X-VCS-Directories: roverlay/util/ X-VCS-Committer: dywi X-VCS-Committer-Name: André Erdmann X-VCS-Revision: 2f5e40896c821d2a52c3ef53163a953217b0a366 X-VCS-Branch: master Date: Thu, 22 Aug 2013 09:01:39 +0000 (UTC) Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-commits@lists.gentoo.org X-Archives-Salt: 2a993385-4af6-4aa0-88d5-49647f627204 X-Archives-Hash: 415731272233de62c317c86884f7b320 commit: 2f5e40896c821d2a52c3ef53163a953217b0a366 Author: André Erdmann mailerd de> AuthorDate: Thu Aug 22 08:50:28 2013 +0000 Commit: André Erdmann mailerd de> CommitDate: Thu Aug 22 08:50:28 2013 +0000 URL: http://git.overlays.gentoo.org/gitweb/?p=proj/R_overlay.git;a=commit;h=2f5e4089 hashpool: fixup * run properly in no-thread/mutliproc mode * check if thread/proc executor has been aborted * HashFunction can now be pickled * run_as_completed() function for getting results as soon as they're available --- roverlay/util/hashpool.py | 103 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 19 deletions(-) diff --git a/roverlay/util/hashpool.py b/roverlay/util/hashpool.py index 70d3d55..61d8336 100644 --- a/roverlay/util/hashpool.py +++ b/roverlay/util/hashpool.py @@ -5,6 +5,12 @@ # either version 2 of the License, or (at your option) any later version. try: + import copyreg +except ImportError: + # python 2 + import copy_reg as copyreg + +try: import concurrent.futures except ImportError: import sys @@ -20,11 +26,31 @@ else: import roverlay.digest -def _calculate_hashes ( hash_job, hashes ): - hash_job.hashdict.update ( - roverlay.digest.multihash_file ( hash_job.filepath, hashes ) - ) -# --- end of _calculate_hashes (...) --- +class HashFunction ( object ): + + def __init__ ( self, hashes ): + super ( HashFunction, self ).__init__() + self.hashes = frozenset ( hashes ) + # --- end of __init__ (...) --- + + def multihash_file ( self, filepath ): + return roverlay.digest.multihash_file ( filepath, self.hashes ) + # --- end of multihash_file (...) --- + + def calculate ( self, hash_job ): + hash_job.hashdict.update ( self.multihash_file ( hash_job.filepath ) ) + return hash_job + # --- end of calculate (...) --- + + __call__ = calculate + + def pack ( self ): + return ( self.__class__, ( self.hashes, ) ) + # --- end of pickle (...) --- + +# --- end of HashFunction --- + +copyreg.pickle ( HashFunction, HashFunction.pack ) class HashJob ( object ): @@ -37,30 +63,69 @@ class HashJob ( object ): # --- end of HashJob --- + class HashPool ( object ): - def __init__ ( self, hashes, max_workers ): + def __init__ ( self, hashes, max_workers, use_threads=None ): super ( HashPool, self ).__init__() - self.hashes = frozenset ( hashes ) + self.hashfunc = HashFunction ( hashes ) self._jobs = dict() - self.max_workers = int ( max_workers ) + self.max_workers = ( + int ( max_workers ) if max_workers is not None else max_workers + ) + + if use_threads or use_threads is None: + self.executor_cls = concurrent.futures.ThreadPoolExecutor + else: + self.executor_cls = concurrent.futures.ProcessPoolExecutor # --- end of __init__ (...) --- def add ( self, backref, filepath, hashdict=None ): self._jobs [backref] = HashJob ( filepath, hashdict ) # --- end of add (...) --- + def get_executor ( self ): + return self.executor_cls ( self.max_workers ) + # --- end of get_executor (...) --- + + def is_concurrent ( self ): + return HAVE_CONCURRENT_FUTURES and ( + self.max_workers is None or self.max_workers > 0 + ) + # --- end of is_concurrent (...) --- + + def run_as_completed ( self ): + if self.is_concurrent(): + with self.get_executor() as exe: + for backref, hash_job in zip ( + self._jobs.keys(), + exe.map ( self.hashfunc, self._jobs.values() ) + ): + yield ( backref, hash_job.hashdict ) + else: + for backref, hash_job in self._jobs.items(): + self.hashfunc.calculate ( hash_job ) + yield ( backref, hash_job.hashdict ) + # --- end of run_as_completed (...) --- + def run ( self ): - #with concurrent.futures.ProcessPoolExecutor ( self.max_workers ) as exe: - with concurrent.futures.ThreadPoolExecutor ( self.max_workers ) as exe: - running_jobs = frozenset ( - exe.submit ( _calculate_hashes, job, self.hashes ) - for job in self._jobs.values() - ) - - # wait - for finished_job in concurrent.futures.as_completed ( running_jobs ): - if finished_job.exception() is not None: - raise finished_job.exception() + if self.is_concurrent(): + with self.get_executor() as exe: + running_jobs = frozenset ( + exe.submit ( self.hashfunc, job ) + for job in self._jobs.values() + ) + + # wait + for finished_job in ( + concurrent.futures.as_completed ( running_jobs ) + ): + if finished_job.exception() is not None: + raise finished_job.exception() + elif finished_job.cancelled(): + break + else: + for hash_job in self._jobs.values(): + self.hashfunc.calculate ( hash_job ) # --- end of run (...) --- def reset ( self ):