From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from pigeon.gentoo.org ([208.92.234.80] helo=lists.gentoo.org) by finch.gentoo.org with esmtp (Exim 4.77) (envelope-from ) id 1Sqnyt-00016L-Re for garchives@archives.gentoo.org; Mon, 16 Jul 2012 16:17:08 +0000 Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id C9A3CE0783; Mon, 16 Jul 2012 16:15:50 +0000 (UTC) Received: from smtp.gentoo.org (smtp.gentoo.org [140.211.166.183]) by pigeon.gentoo.org (Postfix) with ESMTP id 8C2B1E077F for ; Mon, 16 Jul 2012 16:15:50 +0000 (UTC) Received: from hornbill.gentoo.org (hornbill.gentoo.org [94.100.119.163]) (using TLSv1 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by smtp.gentoo.org (Postfix) with ESMTPS id B34AB1B40BD for ; Mon, 16 Jul 2012 16:15:49 +0000 (UTC) Received: from localhost.localdomain (localhost [127.0.0.1]) by hornbill.gentoo.org (Postfix) with ESMTP id DD116E5436 for ; Mon, 16 Jul 2012 16:15:46 +0000 (UTC) From: "André Erdmann" To: gentoo-commits@lists.gentoo.org Content-type: text/plain; charset=UTF-8 Reply-To: gentoo-dev@lists.gentoo.org, "André Erdmann" Message-ID: <1342430205.313ea3d9c2ab34d4bb72423e0bf9a99a0d094b9f.dywi@gentoo> Subject: [gentoo-commits] proj/R_overlay:master commit in: roverlay/depres/ X-VCS-Repository: proj/R_overlay X-VCS-Files: roverlay/depres/depresolver.py X-VCS-Directories: roverlay/depres/ X-VCS-Committer: dywi X-VCS-Committer-Name: André Erdmann X-VCS-Revision: 313ea3d9c2ab34d4bb72423e0bf9a99a0d094b9f X-VCS-Branch: master Date: Mon, 16 Jul 2012 16:15:46 +0000 (UTC) Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-commits@lists.gentoo.org Content-Transfer-Encoding: quoted-printable X-Archives-Salt: f4f886c8-182d-4f9b-ae3b-36f9f22bbc84 X-Archives-Hash: 764d59ca06e1e93d5a31e92301e79465 commit: 313ea3d9c2ab34d4bb72423e0bf9a99a0d094b9f Author: Andr=C3=A9 Erdmann mailerd de> AuthorDate: Mon Jul 16 09:14:13 2012 +0000 Commit: Andr=C3=A9 Erdmann mailerd de> CommitDate: Mon Jul 16 09:16:45 2012 +0000 URL: http://git.overlays.gentoo.org/gitweb/?p=3Dproj/R_overlay.git= ;a=3Dcommit;h=3D313ea3d9 depresolver: separate run method for threads #2 Re-added threaded resolving which runs async now (as "daemon" - start once and ask for depres at any time). ge=C3=A4ndert: roverlay/depres/depresolver.py --- roverlay/depres/depresolver.py | 182 +++++++++++++++++++---------------= ----- 1 files changed, 89 insertions(+), 93 deletions(-) diff --git a/roverlay/depres/depresolver.py b/roverlay/depres/depresolver= .py index 5f85ad1..40d696e 100644 --- a/roverlay/depres/depresolver.py +++ b/roverlay/depres/depresolver.py @@ -39,14 +39,17 @@ class DependencyResolver ( object ): 'RESOLVED', 'UNRESOLVABLE' ) =20 - self._jobs =3D config.get ( "DEPRES.jobcount", 0 ) + self._jobs =3D config.get ( "DEPRES.jobcount", 1 ) =20 # used to lock the run methods, self._runlock =3D threading.Lock() =20 - if self._jobs > 0: + + + if self._jobs > 1: # the dep res main thread self._mainthread =3D None + self._thread_close =3D False =20 =20 self.err_queue =3D err_queue @@ -270,7 +273,7 @@ class DependencyResolver ( object ): # --- end of _queue_previously_failed (...) --- =20 def start ( self ): - if self._jobs =3D=3D 0: + if self._jobs < 2: if not self._depqueue.empty(): self._run_resolver() if not self.err_queue.really_empty(): @@ -357,7 +360,6 @@ class DependencyResolver ( object ): break # -- =20 - # -- done with resolving =20 if is_resolved !=3D 2: @@ -412,9 +414,89 @@ class DependencyResolver ( object ): # --- end of _run_resolver (...) --- =20 def _thread_run_resolver ( self ): - raise Exception ( "method stub" ) + """master thread""" + try: + self.logger.debug ( + "Running in concurrent mode with {} worker threads.".format ( + self._jobs + ) + ) + send_queues =3D tuple ( + queue.Queue ( maxsize=3D1 ) for k in range ( self._jobs ) + ) + rec_queues =3D tuple ( + queue.Queue ( maxsize=3D1 ) for k in range ( self._jobs ) + ) + threads =3D tuple ( + threading.Thread ( + target=3Dself._thread_resolve, + # this thread's send queue is the worker thread's receive queue + # and vice versa + kwargs=3D{ 'recq' : send_queues [n], 'sendq' : rec_queues [n] } + ) for n in range ( self._jobs ) + ) + + try: + for t in threads: t.start() + + # *loop forever* + # wait for the resolver threads to process the dep queue, + # mark remaining deps as unresolvable and + # tell the threads to continue + while self.err_queue.really_empty() and not self._thread_close: + for q in rec_queues: + if q.get() !=3D 0: + self._thread_close =3D True + break + else: + self._process_unresolvable_queue() + # tell the threads to continue + for q in send_queues: q.put_nowait ( 0 ) + + except ( Exception, KeyboardInterrupt ) as e: + self.err_queue.push ( context=3Did ( self ), error=3De ) + + self._thread_close =3D True + + # on-error code (self.err_queue not empty or close requested) + try: + for q in send_queues: q.put_nowait ( 2 ) + except: + pass + + for t in threads: t.join() + + finally: + self._runlock.release() # --- end of _thread_run_resolver (...) --- =20 + def _thread_resolve ( self, sendq=3D0, recq=3D0 ): + """worker thread""" + try: + while not self._thread_close and self.err_queue.empty: + try: + # process remaining deps + while not self._thread_close and self.err_queue.empty: + self._process_dep ( self._depqueue.get_nowait() ) + except queue.Empty: + pass + + # dep queue has been processed, + # let the master thread process all unresolvable deps + # only 0 means continue, anything else stops this thread + sendq.put_nowait ( 0 ) + if recq.get() !=3D 0: break + except ( Exception, KeyboardInterrupt ) as e: + self.err_queue.push ( id ( self ), e ) + + # this is on-error code (err_queue is not empty or close requested) + self._thread_close =3D True + try: + sendq.put_nowait ( 2 ) + except queue.Full: + pass + # --- end of _thread_resolve (...) --- + def enqueue ( self, dep_env, channel_id ): """Adds a DepEnv to the queue of deps to resolve. =20 @@ -425,11 +507,11 @@ class DependencyResolver ( object ): returns: None (implicit) """ self._depqueue.put ( ( channel_id, dep_env ) ) - # --- end of enqueue (...) --- =20 def close ( self ): - if self._jobs > 0: + if self._jobs > 1: + self._thread_close =3D True if self._mainthread: self._mainthread.join() for lis in self.listeners: lis.close() @@ -439,89 +521,3 @@ class DependencyResolver ( object ): "{} channels were in use.".format ( len ( self.all_channel_ids ) ) ) # --- end of close (...) --- - - - def _thread_run_main ( self ): - """Tells the resolver to run.""" - raise Exception ( "to be removed" ) - jobcount =3D self.__class__.NUMTHREADS - - try: - if jobcount < 1: - ( self.logger.warning if jobcount < 0 else self.logger.debug ) ( - "Running in sequential mode." - ) - self._thread_run_resolve() - else: - - # wait for old threads - if not self._threads is None: - self.logger.warning ( "Waiting for old threads..." ) - for t in self._threads: t.join() - - self.logger.debug ( - "Running in concurrent mode with %i jobs." % jobcount - ) - - # create threads, - self._threads =3D tuple ( - threading.Thread ( target=3Dself._thread_run_resolve ) - for n in range (jobcount) - ) - # run them - for t in self._threads: t.start() - # and wait until done - for t in self._threads: t.join() - - # finally remove them - del self._threads - self._threads =3D None - - # iterate over _depqueue_failed and report unresolved - ## todo can thread this - self._process_unresolvable_queue() - - if not self.err_queue.really_empty(): - self.err_queue.unblock_queues() - - except ( Exception, KeyboardInterrupt ) as e: - if jobcount > 0: - self.err_queue.push ( id ( self ), e ) - return - else: - raise e - - finally: - # release the lock - self._runlock.release() - - # --- end of _thread_run_main (...) --- - - def _thread_run_resolve ( self ): - """Resolves dependencies (thread target). - - returns: None (implicit) - """ - raise Exception ( "to be removed" ) - try: - while self.err_queue.empty and not self._depqueue.empty(): - try: - to_resolve =3D self._depqueue.get_nowait() - self._process_dep ( queue_item=3Dto_resolve ) - self._depqueue.task_done() - except queue.Empty: - # this thread is done when the queue is empty, so this is - # no error, but just the result of the race condition between - # queue.empty() and queue.get(False) - return - # --- end while - - except ( Exception, KeyboardInterrupt ) as e: - if self.__class__.NUMTHREADS > 0: - self.err_queue.push ( id ( self ), e ) - return - else: - raise e - - - # --- end of _thread_run_resolve (...) --- From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from pigeon.gentoo.org ([208.92.234.80] helo=lists.gentoo.org) by finch.gentoo.org with esmtp (Exim 4.77) (envelope-from ) id 1SqnyH-0000yA-8O for garchives@archives.gentoo.org; Mon, 16 Jul 2012 16:16:29 +0000 Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id 26BF6E0761; Mon, 16 Jul 2012 16:15:47 +0000 (UTC) Received: from smtp.gentoo.org (smtp.gentoo.org [140.211.166.183]) by pigeon.gentoo.org (Postfix) with ESMTP id DD5CDE0761 for ; Mon, 16 Jul 2012 16:15:46 +0000 (UTC) Received: from hornbill.gentoo.org (hornbill.gentoo.org [94.100.119.163]) (using TLSv1 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by smtp.gentoo.org (Postfix) with ESMTPS id 0E87F1B404E for ; Mon, 16 Jul 2012 16:15:46 +0000 (UTC) Received: from localhost.localdomain (localhost [127.0.0.1]) by hornbill.gentoo.org (Postfix) with ESMTP id C41FDE5434 for ; Mon, 16 Jul 2012 16:15:44 +0000 (UTC) From: "André Erdmann" To: gentoo-commits@lists.gentoo.org Content-type: text/plain; charset=UTF-8 Reply-To: gentoo-dev@lists.gentoo.org, "André Erdmann" Message-ID: <1342430205.313ea3d9c2ab34d4bb72423e0bf9a99a0d094b9f.dywi@gentoo> Subject: [gentoo-commits] proj/R_overlay:depres_wip commit in: roverlay/depres/ X-VCS-Repository: proj/R_overlay X-VCS-Files: roverlay/depres/depresolver.py X-VCS-Directories: roverlay/depres/ X-VCS-Committer: dywi X-VCS-Committer-Name: André Erdmann X-VCS-Revision: 313ea3d9c2ab34d4bb72423e0bf9a99a0d094b9f X-VCS-Branch: depres_wip Date: Mon, 16 Jul 2012 16:15:44 +0000 (UTC) Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-commits@lists.gentoo.org Content-Transfer-Encoding: quoted-printable X-Archives-Salt: 8575e2d0-38fc-4f06-a9b6-57c79990c96d X-Archives-Hash: 89422d4a236f716af8eb435ff398b724 Message-ID: <20120716161544.3vy4gtSLbpAHZpJNRBc940xOIhldvODPHPpMJLGI6-0@z> commit: 313ea3d9c2ab34d4bb72423e0bf9a99a0d094b9f Author: Andr=C3=A9 Erdmann mailerd de> AuthorDate: Mon Jul 16 09:14:13 2012 +0000 Commit: Andr=C3=A9 Erdmann mailerd de> CommitDate: Mon Jul 16 09:16:45 2012 +0000 URL: http://git.overlays.gentoo.org/gitweb/?p=3Dproj/R_overlay.git= ;a=3Dcommit;h=3D313ea3d9 depresolver: separate run method for threads #2 Re-added threaded resolving which runs async now (as "daemon" - start once and ask for depres at any time). ge=C3=A4ndert: roverlay/depres/depresolver.py --- roverlay/depres/depresolver.py | 182 +++++++++++++++++++---------------= ----- 1 files changed, 89 insertions(+), 93 deletions(-) diff --git a/roverlay/depres/depresolver.py b/roverlay/depres/depresolver= .py index 5f85ad1..40d696e 100644 --- a/roverlay/depres/depresolver.py +++ b/roverlay/depres/depresolver.py @@ -39,14 +39,17 @@ class DependencyResolver ( object ): 'RESOLVED', 'UNRESOLVABLE' ) =20 - self._jobs =3D config.get ( "DEPRES.jobcount", 0 ) + self._jobs =3D config.get ( "DEPRES.jobcount", 1 ) =20 # used to lock the run methods, self._runlock =3D threading.Lock() =20 - if self._jobs > 0: + + + if self._jobs > 1: # the dep res main thread self._mainthread =3D None + self._thread_close =3D False =20 =20 self.err_queue =3D err_queue @@ -270,7 +273,7 @@ class DependencyResolver ( object ): # --- end of _queue_previously_failed (...) --- =20 def start ( self ): - if self._jobs =3D=3D 0: + if self._jobs < 2: if not self._depqueue.empty(): self._run_resolver() if not self.err_queue.really_empty(): @@ -357,7 +360,6 @@ class DependencyResolver ( object ): break # -- =20 - # -- done with resolving =20 if is_resolved !=3D 2: @@ -412,9 +414,89 @@ class DependencyResolver ( object ): # --- end of _run_resolver (...) --- =20 def _thread_run_resolver ( self ): - raise Exception ( "method stub" ) + """master thread""" + try: + self.logger.debug ( + "Running in concurrent mode with {} worker threads.".format ( + self._jobs + ) + ) + send_queues =3D tuple ( + queue.Queue ( maxsize=3D1 ) for k in range ( self._jobs ) + ) + rec_queues =3D tuple ( + queue.Queue ( maxsize=3D1 ) for k in range ( self._jobs ) + ) + threads =3D tuple ( + threading.Thread ( + target=3Dself._thread_resolve, + # this thread's send queue is the worker thread's receive queue + # and vice versa + kwargs=3D{ 'recq' : send_queues [n], 'sendq' : rec_queues [n] } + ) for n in range ( self._jobs ) + ) + + try: + for t in threads: t.start() + + # *loop forever* + # wait for the resolver threads to process the dep queue, + # mark remaining deps as unresolvable and + # tell the threads to continue + while self.err_queue.really_empty() and not self._thread_close: + for q in rec_queues: + if q.get() !=3D 0: + self._thread_close =3D True + break + else: + self._process_unresolvable_queue() + # tell the threads to continue + for q in send_queues: q.put_nowait ( 0 ) + + except ( Exception, KeyboardInterrupt ) as e: + self.err_queue.push ( context=3Did ( self ), error=3De ) + + self._thread_close =3D True + + # on-error code (self.err_queue not empty or close requested) + try: + for q in send_queues: q.put_nowait ( 2 ) + except: + pass + + for t in threads: t.join() + + finally: + self._runlock.release() # --- end of _thread_run_resolver (...) --- =20 + def _thread_resolve ( self, sendq=3D0, recq=3D0 ): + """worker thread""" + try: + while not self._thread_close and self.err_queue.empty: + try: + # process remaining deps + while not self._thread_close and self.err_queue.empty: + self._process_dep ( self._depqueue.get_nowait() ) + except queue.Empty: + pass + + # dep queue has been processed, + # let the master thread process all unresolvable deps + # only 0 means continue, anything else stops this thread + sendq.put_nowait ( 0 ) + if recq.get() !=3D 0: break + except ( Exception, KeyboardInterrupt ) as e: + self.err_queue.push ( id ( self ), e ) + + # this is on-error code (err_queue is not empty or close requested) + self._thread_close =3D True + try: + sendq.put_nowait ( 2 ) + except queue.Full: + pass + # --- end of _thread_resolve (...) --- + def enqueue ( self, dep_env, channel_id ): """Adds a DepEnv to the queue of deps to resolve. =20 @@ -425,11 +507,11 @@ class DependencyResolver ( object ): returns: None (implicit) """ self._depqueue.put ( ( channel_id, dep_env ) ) - # --- end of enqueue (...) --- =20 def close ( self ): - if self._jobs > 0: + if self._jobs > 1: + self._thread_close =3D True if self._mainthread: self._mainthread.join() for lis in self.listeners: lis.close() @@ -439,89 +521,3 @@ class DependencyResolver ( object ): "{} channels were in use.".format ( len ( self.all_channel_ids ) ) ) # --- end of close (...) --- - - - def _thread_run_main ( self ): - """Tells the resolver to run.""" - raise Exception ( "to be removed" ) - jobcount =3D self.__class__.NUMTHREADS - - try: - if jobcount < 1: - ( self.logger.warning if jobcount < 0 else self.logger.debug ) ( - "Running in sequential mode." - ) - self._thread_run_resolve() - else: - - # wait for old threads - if not self._threads is None: - self.logger.warning ( "Waiting for old threads..." ) - for t in self._threads: t.join() - - self.logger.debug ( - "Running in concurrent mode with %i jobs." % jobcount - ) - - # create threads, - self._threads =3D tuple ( - threading.Thread ( target=3Dself._thread_run_resolve ) - for n in range (jobcount) - ) - # run them - for t in self._threads: t.start() - # and wait until done - for t in self._threads: t.join() - - # finally remove them - del self._threads - self._threads =3D None - - # iterate over _depqueue_failed and report unresolved - ## todo can thread this - self._process_unresolvable_queue() - - if not self.err_queue.really_empty(): - self.err_queue.unblock_queues() - - except ( Exception, KeyboardInterrupt ) as e: - if jobcount > 0: - self.err_queue.push ( id ( self ), e ) - return - else: - raise e - - finally: - # release the lock - self._runlock.release() - - # --- end of _thread_run_main (...) --- - - def _thread_run_resolve ( self ): - """Resolves dependencies (thread target). - - returns: None (implicit) - """ - raise Exception ( "to be removed" ) - try: - while self.err_queue.empty and not self._depqueue.empty(): - try: - to_resolve =3D self._depqueue.get_nowait() - self._process_dep ( queue_item=3Dto_resolve ) - self._depqueue.task_done() - except queue.Empty: - # this thread is done when the queue is empty, so this is - # no error, but just the result of the race condition between - # queue.empty() and queue.get(False) - return - # --- end while - - except ( Exception, KeyboardInterrupt ) as e: - if self.__class__.NUMTHREADS > 0: - self.err_queue.push ( id ( self ), e ) - return - else: - raise e - - - # --- end of _thread_run_resolve (...) ---