public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/R_overlay:master commit in: roverlay/ebuild/, roverlay/depres/, roverlay/overlay/
@ 2012-07-02 16:52 André Erdmann
  0 siblings, 0 replies; only message in thread
From: André Erdmann @ 2012-07-02 16:52 UTC (permalink / raw
  To: gentoo-commits

commit:     8a174d86bba8fa67d38935da2b7b7c51f53606fa
Author:     André Erdmann <dywi <AT> mailerd <DOT> de>
AuthorDate: Mon Jul  2 16:48:31 2012 +0000
Commit:     André Erdmann <dywi <AT> mailerd <DOT> de>
CommitDate: Mon Jul  2 16:48:31 2012 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/R_overlay.git;a=commit;h=8a174d86

threaded overlay writing fix some threading issues

* the overlay produced can now be written to fs using per-pacackage dir threads
* fixed some thread locking issues
* added an error queue used by threads to pass exceptions to the main module

	modified:   roverlay/depres/channels.py
	modified:   roverlay/depres/depresolver.py
	modified:   roverlay/ebuild/creation.py
	modified:   roverlay/ebuild/depres.py
	modified:   roverlay/overlay/__init__.py
	modified:   roverlay/overlay/category.py
	modified:   roverlay/overlay/creator.py
	modified:   roverlay/overlay/package.py
	modified:   roverlay/overlay/worker.py

---
 roverlay/depres/channels.py    |   16 ++-
 roverlay/depres/depresolver.py |  296 +++++++++++++++++++++++++---------------
 roverlay/ebuild/creation.py    |   17 ++-
 roverlay/ebuild/depres.py      |    9 +-
 roverlay/overlay/__init__.py   |   18 ++-
 roverlay/overlay/category.py   |   76 +++++++++--
 roverlay/overlay/creator.py    |  119 ++++++++++++----
 roverlay/overlay/package.py    |  165 +++++++++++------------
 roverlay/overlay/worker.py     |   81 ++++++++----
 9 files changed, 510 insertions(+), 287 deletions(-)

diff --git a/roverlay/depres/channels.py b/roverlay/depres/channels.py
index 65d4d45..56491f4 100644
--- a/roverlay/depres/channels.py
+++ b/roverlay/depres/channels.py
@@ -19,7 +19,7 @@ class EbuildJobChannel ( DependencyResolverChannel ):
 	  add deps, then satisfy_request(): collect/lookup
 	"""
 
-	def __init__ ( self, name=None, logger=None ):
+	def __init__ ( self, err_queue, name=None, logger=None ):
 		"""EbuildJobChannel
 
 		arguments:
@@ -32,6 +32,8 @@ class EbuildJobChannel ( DependencyResolverChannel ):
 		# in the join()-method
 		self._depdone = 0
 
+		self.err_queue = err_queue
+
 		# set of portage packages (resolved deps)
 		#  this is None unless all deps have been successfully resolved
 		self._collected_deps = None
@@ -170,12 +172,15 @@ class EbuildJobChannel ( DependencyResolverChannel ):
 		# DEPEND/RDEPEND/.. later, seewave requires sci-libs/fftw
 		# in both DEPEND and RDEPEND for example
 		dep_collected = set()
-		satisfiable   = True
+		satisfiable   = self.err_queue.empty()
 
 
 		def handle_queue_item ( dep_env ):
 			self._depdone += 1
-			if dep_env.is_resolved():
+			if dep_env is None:
+				# could used to unblock the queue
+				return self.err_queue.empty()
+			elif dep_env.is_resolved():
 				### and dep_env in self.dep_env_list
 				# successfully resolved
 				dep_collected.add ( dep_env.get_result() [1] )
@@ -190,7 +195,8 @@ class EbuildJobChannel ( DependencyResolverChannel ):
 
 		# loop until
 		#  (a) at least one dependency could not be resolved or
-		#  (b) all deps processed
+		#  (b) all deps processed or
+		#  (c) error queue not empty
 		while self._depdone < len ( self.dep_env_list ) and satisfiable:
 			# tell the resolver to start
 			self._depres_master.start()
@@ -204,7 +210,7 @@ class EbuildJobChannel ( DependencyResolverChannel ):
 		# --- end while
 
 		if satisfiable:
-			self._collected_deps = dep_collected
+			self._collected_deps = frozenset ( dep_collected )
 			return self._collected_deps
 		else:
 			if close_if_unresolvable: self.close()

diff --git a/roverlay/depres/depresolver.py b/roverlay/depres/depresolver.py
index 26040c0..7f91983 100644
--- a/roverlay/depres/depresolver.py
+++ b/roverlay/depres/depresolver.py
@@ -29,12 +29,11 @@ class DependencyResolver ( object ):
 	"""Main object for dependency resolution."""
 
 
-	NUMTHREADS = config.get ( "DEPRES.jobcount", 0 )
+	NUMTHREADS = config.get ( "DEPRES.jobcount", 15 )
 
 	def __init__ ( self ):
 		"""Initializes a DependencyResolver."""
 
-		# these loggers are temporary helpers
 		self.logger              = logging.getLogger ( self.__class__.__name__ )
 		self.logger_unresolvable = self.logger.getChild ( "UNRESOLVABLE" )
 		self.logger_resolved     = self.logger.getChild ( "RESOLVED" )
@@ -46,6 +45,7 @@ class DependencyResolver ( object ):
 
 		# this lock tells whether a dep res 'master' thread is running (locked)
 		self.runlock     = threading.Lock()
+		self.startlock   = threading.Lock()
 		# the dep res main thread
 		self._mainthread = None
 		# the dep res worker threads
@@ -86,6 +86,9 @@ class DependencyResolver ( object ):
 			self.all_channel_ids = set()
 	# --- end of __init__ (...) ---
 
+	def set_exception_queue ( self, equeue ):
+		self.err_queue = equeue
+
 	def _sort ( self ):
 		"""Sorts the rule pools of this resolver."""
 		for pool in self.static_rule_pools: pool.sort()
@@ -188,7 +191,7 @@ class DependencyResolver ( object ):
 		* channel -- channel to be registered
 		             automatically sets channel's resolver to self if it is None
 
-		raises: Exception if channels is already registered with this resolver
+		raises: Exception if channel is already registered with this resolver
 
 		returns: channel
 		 """
@@ -242,7 +245,6 @@ class DependencyResolver ( object ):
 		except KeyError as expected:
 			# ok
 			pass
-
 	# --- end of channel_closed (...) ---
 
 	def _queue_previously_failed ( self ):
@@ -259,81 +261,138 @@ class DependencyResolver ( object ):
 	# --- end of _queue_previously_failed (...) ---
 
 	def start ( self ):
-		if not self.runlock.acquire ( False ):
-			# already running
-			return True
-		# --
+		# -- verify whether resolver has to be started
+		if self._depqueue.empty():
+			# nothing to resolve
+			return
+
+		if not self.startlock.acquire ( False ):
+			# another channel/.. is starting the resolver
+			return
+		elif self._depqueue.empty():
+			self.startlock.release()
+			return
+
+		# -- verify...
+
+		# acquire the run lock (that locks _run_main)
+		try:
+			self.runlock.acquire()
+		finally:
+			self.startlock.release()
+
+		if self._depqueue.empty():
+			self.runlock.release()
+			return
 
 		if DependencyResolver.NUMTHREADS > 0:
 			# no need to wait for the old thread
+			# FIXME: could remove the following block
+			if self._mainthread is not None:
+				self._mainthread.join()
+				del self._mainthread
+
 			self._mainthread = threading.Thread ( target=self._thread_run_main )
 			self._mainthread.start()
+
 		else:
 			self._thread_run_main()
 
 		# self.runlock is released when _thread_run_main is done
 	# --- end of start (...) ---
 
+	def unblock_channels ( self ):
+		# unblock all channels by processing all remaining deps as
+		# unresolved
+		## other option: select channel, but this may interfere with
+		## channel_closed()
+		channel_gone = set()
+		while not self._depqueue.empty():
+			chan, dep_env = self._depqueue.get_nowait()
+			dep_env.set_unresolvable()
+
+			if chan not in channel_gone:
+				try:
+					self._depqueue_done [chan].put_nowait ( dep_env )
+				except KeyError:
+					channel_gone.add ( chan )
+	# --- end of unblock_channels (...) ---
+
 	def _thread_run_main ( self ):
 		"""Tells the resolver to run."""
-
 		jobcount = DependencyResolver.NUMTHREADS
 
-		if jobcount < 1:
-			( self.logger.warning if jobcount < 0 else self.logger.debug ) (
-				"Running in sequential mode."
-			)
-			self._thread_run_resolve()
-		else:
+		try:
+			if jobcount < 1:
+				( self.logger.warning if jobcount < 0 else self.logger.debug ) (
+					"Running in sequential mode."
+				)
+				self._thread_run_resolve()
+			else:
 
-			# wait for old threads
-			if not self._threads is None:
-				self.logger.warning ( "Waiting for old threads..." )
+				# wait for old threads
+				if not self._threads is None:
+					self.logger.warning ( "Waiting for old threads..." )
+					for t in self._threads: t.join()
+
+				self.logger.debug (
+					"Running in concurrent mode with %i jobs." % jobcount
+				)
+
+				# create threads,
+				self._threads = tuple (
+					threading.Thread ( target=self._thread_run_resolve )
+					for n in range (jobcount)
+				)
+				# run them
+				for t in self._threads: t.start()
+				# and wait until done
 				for t in self._threads: t.join()
 
-			self.logger.warning (
-				"Running in concurrent mode with %i jobs." % jobcount
-			)
+				# finally remove them
+				del self._threads
+				self._threads = None
 
-			# create threads,
-			self._threads = [
-				threading.Thread ( target=self._thread_run_resolve )
-				for n in range (jobcount)
-			]
-			# run them
-			for t in self._threads: t.start()
-			# and wait until done
-			for t in self._threads: t.join()
 
-			# finally remove them
-			del self._threads
-			self._threads = None
+			# iterate over _depqueue_failed and report unresolved
+			## todo can thread this
+			while not self._depqueue_failed.empty():
+				try:
+					channel_id, dep_env = self._depqueue_failed.get_nowait()
 
+				except queue.Empty:
+					# race cond empty() <-> get_nowait()
+					return
 
-		# iterate over _depqueue_failed and report unresolved
-		## todo can thread this
-		while not self._depqueue_failed.empty():
-			try:
-				channel_id, dep_env = self._depqueue_failed.get_nowait()
 
 				dep_env.set_unresolvable()
 
 				self._report_event ( 'UNRESOLVABLE', dep_env )
 
-				if channel_id in self._depqueue_done:
-					## todo/fixme/whatever: this 'if' can filter out channels
-					## that have been added again
-					self._depqueue_done [channel_id].put ( dep_env )
 
-			except queue.Empty:
-				# race cond empty() <-> get_nowait()
-				break
-			except KeyError:
-				# channel has been closed before calling put, ignore this err
-				pass
+				try:
+					if channel_id in self._depqueue_done:
+						self._depqueue_done [channel_id].put_nowait ( dep_env )
+				except KeyError:
+					# channel has been closed before calling put, ignore this
+					pass
+
+				if not self.err_queue.empty():
+					self.unblock_channels()
+
+		except ( Exception, KeyboardInterrupt ) as e:
+
+			self.unblock_channels()
 
-		# release the lock
-		self.runlock.release()
+			if jobcount > 0 and hasattr ( self, 'err_queue' ):
+				self.err_queue.put_nowait ( id ( self ), e )
+				return
+			else:
+				raise e
+
+		finally:
+			# release the lock
+			self.runlock.release()
 
 	# --- end of _thread_run_main (...) ---
 
@@ -342,77 +401,90 @@ class DependencyResolver ( object ):
 
 		returns: None (implicit)
 		"""
+		try:
+			while self.err_queue.empty() and not self._depqueue.empty():
 
-		while not self._depqueue.empty():
-
-			try:
-				to_resolve = self._depqueue.get_nowait()
-			except queue.Empty:
-				# this thread is done when the queue is empty, so this is
-				# no error, but just the result of the race condition between
-				# queue.empty() and queue.get(False)
-				return None
-
-			channel_id, dep_env = to_resolve
-
-			if channel_id in self._depqueue_done:
-				# else channel has been closed, drop dep
-
-				self.logger.debug ( "Trying to resolve '%s'." % dep_env.dep_str )
-
-				#have_new_rule = False
+				try:
+					to_resolve = self._depqueue.get_nowait()
+				except queue.Empty:
+					# this thread is done when the queue is empty, so this is
+					# no error, but just the result of the race condition between
+					# queue.empty() and queue.get(False)
+					return
 
-				resolved    = None
-				# resolved can be None, so use a tri-state int for checking
-				#  0 -> unresolved, but resolvable
-				#  1 -> unresolved and (currently, new rules may change this)
-				#        not resolvable
-				#  2 -> resolved
-				is_resolved = 0
+				channel_id, dep_env = to_resolve
 
-				# TODO:
-				#  (threading: could search the pools in parallel)
+				if channel_id in self._depqueue_done:
+					# else channel has been closed, drop dep
 
-				if USING_DEPRES_CACHE:
-					if dep_env.dep_str_low in self._dep_unresolvable:
-						# cannot resolve
-						is_resolved = 1
+					self.logger.debug (
+						"Trying to resolve '%s'." % dep_env.dep_str
+					)
 
-				if is_resolved == 0:
-					# search for a match in the rule pools
-					for rulepool in self.static_rule_pools:
-						result = rulepool.matches ( dep_env )
-						if not result is None and result [0] > 0:
-							resolved    = result [1]
-							is_resolved = 2
-							break
+					#have_new_rule = False
 
+					resolved = None
+					# resolved can be None, so use a tri-state int for checking
+					#  0 -> unresolved, but resolvable
+					#  1 -> unresolved and (currently, new rules may change this)
+					#        not resolvable
+					#  2 -> resolved
+					is_resolved = 0
 
+					# TODO:
+					#  (threading: could search the pools in parallel)
 
-				if is_resolved == 2:
-					dep_env.set_resolved ( resolved, append=False )
-					self._report_event ( 'RESOLVED', dep_env )
-					self._depqueue_done [channel_id].put ( dep_env )
-				else:
-					self._depqueue_failed.put ( to_resolve )
-
-					if USING_DEPRES_CACHE:
-						# does not work when adding new rules is possible
-						self._dep_unresolvable.add ( dep_env.dep_str_low )
-
-				"""
-				## only useful if new rules can be created
-				# new rule found, requeue all previously failed dependency searches
-				if have_new_rule:
-					self._queue_previously_failed
 					if USING_DEPRES_CACHE:
-						self._dep_unresolvable.clear() #?
-				"""
-			# --- end if channel_id in self._depqueue_done
-
-			self._depqueue.task_done()
-		# --- end while
-
+						if dep_env.dep_str_low in self._dep_unresolvable:
+							# cannot resolve
+							is_resolved = 1
+
+					if is_resolved == 0:
+						# search for a match in the rule pools
+						for rulepool in self.static_rule_pools:
+							result = rulepool.matches ( dep_env )
+							if not result is None and result [0] > 0:
+								resolved    = result [1]
+								is_resolved = 2
+								break
+
+
+
+					if is_resolved == 2:
+						dep_env.set_resolved ( resolved, append=False )
+						self._report_event ( 'RESOLVED', dep_env )
+						try:
+							self._depqueue_done [channel_id].put ( dep_env )
+						except KeyError:
+							# channel gone while resolving
+							pass
+					else:
+						self._depqueue_failed.put ( to_resolve )
+
+						if USING_DEPRES_CACHE:
+							# does not work when adding new rules is possible
+							self._dep_unresolvable.add ( dep_env.dep_str_low )
+
+					"""
+					## only useful if new rules can be created
+					# new rule found, requeue all previously
+					#  failed dependency searches
+					if have_new_rule:
+						self._queue_previously_failed
+						if USING_DEPRES_CACHE:
+							self._dep_unresolvable.clear() #?
+					"""
+				# --- end if channel_id in self._depqueue_done
+
+				self._depqueue.task_done()
+			# --- end while
+
+		except ( Exception, KeyboardInterrupt ) as e:
+			if jobcount > 0 and hasattr ( self, 'err_queue' ):
+				self.err_queue.put_nowait ( id ( self ), e )
+				return
+			else:
+				raise e
 
 
 	# --- end of _thread_run_resolve (...) ---

diff --git a/roverlay/ebuild/creation.py b/roverlay/ebuild/creation.py
index 056b1a2..b606ae9 100644
--- a/roverlay/ebuild/creation.py
+++ b/roverlay/ebuild/creation.py
@@ -22,7 +22,7 @@ FALLBACK_DESCRIPTION = "<none>"
 class EbuildCreation ( object ):
 	"""Used to create an ebuild using DESCRIPTION data."""
 
-	def __init__ ( self, package_info, depres_channel_spawner=None ):
+	def __init__ ( self, package_info, err_queue, depres_channel_spawner=None ):
 		"""Initializes the creation of an ebuild.
 
 		arguments:
@@ -39,6 +39,8 @@ class EbuildCreation ( object ):
 
 		self.depres_channel_spawner = depres_channel_spawner
 
+		self.err_queue = err_queue
+
 		self.package_info.set_readonly()
 	# --- end of __init__ (...) ---
 
@@ -64,10 +66,10 @@ class EbuildCreation ( object ):
 				self.logger.info ( "Cannot create an ebuild for this package." )
 				self.status = -1
 
-		except Exception as e:
-			# log this and set status to fail
+		except ( Exception, KeyboardInterrupt ):
+			# set status to fail
 			self.status = -10
-			self.logger.exception ( e )
+			raise
 	# --- end of run (...) ---
 
 	def _lazyimport_desc_data ( self ):
@@ -82,8 +84,8 @@ class EbuildCreation ( object ):
 				logger=self.logger,
 				read_now=True
 			)
-			self.package_info.set_writeable()
-			self.package_info.update (
+
+			self.package_info.update_now (
 				desc_data=reader.get_desc ( run_if_unset=False )
 			)
 			del reader
@@ -133,7 +135,8 @@ class EbuildCreation ( object ):
 		_dep_resolution = depres.EbuildDepRes (
 			self.package_info, self.logger,
 			create_iuse=True, run_now=True,
-			depres_channel_spawner=self.depres_channel_spawner
+			depres_channel_spawner=self.depres_channel_spawner,
+			err_queue=self.err_queue
 		)
 		if not _dep_resolution.success():
 			# log here? (FIXME)

diff --git a/roverlay/ebuild/depres.py b/roverlay/ebuild/depres.py
index 383831e..1c19a00 100644
--- a/roverlay/ebuild/depres.py
+++ b/roverlay/ebuild/depres.py
@@ -22,8 +22,8 @@ class EbuildDepRes ( object ):
 	"""Handles dependency resolution for a single ebuild."""
 
 	def __init__ (
-		self, package_info, logger, depres_channel_spawner,
-		create_iuse=True, run_now=True
+		self, package_info, logger, depres_channel_spawner, err_queue,
+		create_iuse=True, run_now=True,
 	):
 		"""Initializes an EbuildDepRes.
 
@@ -45,6 +45,8 @@ class EbuildDepRes ( object ):
 		self.has_suggests = None
 		self.create_iuse  = create_iuse
 
+		self.err_queue    = err_queue
+
 		self._channels    = None
 
 		if run_now:
@@ -90,7 +92,8 @@ class EbuildDepRes ( object ):
 		if dependency_type not in self._channels:
 			self._channels [dependency_type] = self.request_resolver (
 				name=dependency_type,
-				logger=self.logger
+				logger=self.logger,
+				err_queue=self.err_queue
 			)
 		return self._channels [dependency_type]
 	# --- end of get_channel (...) ---

diff --git a/roverlay/overlay/__init__.py b/roverlay/overlay/__init__.py
index 9ca6abe..a8a939d 100644
--- a/roverlay/overlay/__init__.py
+++ b/roverlay/overlay/__init__.py
@@ -77,14 +77,16 @@ class Overlay ( object ):
 		"""
 		if not category in self._categories:
 			self._catlock.acquire()
-			if not category in self._categories:
-				self._categories [category] = Category (
-					category,
-					self.logger,
-					None if self.physical_location is None else \
-						os.path.join ( self.physical_location, category )
-				)
-			self._catlock.release()
+			try:
+				if not category in self._categories:
+					self._categories [category] = Category (
+						category,
+						self.logger,
+						None if self.physical_location is None else \
+							os.path.join ( self.physical_location, category )
+					)
+			finally:
+				self._catlock.release()
 
 		return self._categories [category]
 	# --- end of _get_category (...) ---

diff --git a/roverlay/overlay/category.py b/roverlay/overlay/category.py
index 910f194..e938dbf 100644
--- a/roverlay/overlay/category.py
+++ b/roverlay/overlay/category.py
@@ -5,12 +5,19 @@
 import threading
 import os.path
 
+try:
+	import queue
+except ImportError:
+	import Queue as queue
+
 from roverlay.overlay.package import PackageDir
 
 import roverlay.util
 
 class Category ( object ):
 
+	WRITE_JOBCOUNT = 3
+
 	def __init__ ( self, name, logger, directory ):
 		"""Initializes a overlay/portage category (such as 'app-text', 'sci-R').
 
@@ -45,14 +52,16 @@ class Category ( object ):
 
 		if not pkg_name in self._subdirs:
 			self._lock.acquire()
-			if not pkg_name in self._subdirs:
-				self._subdirs [pkg_name] = PackageDir (
-					pkg_name,
-					self.logger,
-					None if self.physical_location is None else \
-						os.path.join ( self.physical_location, pkg_name )
-				)
-			self._lock.release()
+			try:
+				if not pkg_name in self._subdirs:
+					self._subdirs [pkg_name] = PackageDir (
+						pkg_name,
+						self.logger,
+						None if self.physical_location is None else \
+							os.path.join ( self.physical_location, pkg_name )
+					)
+			finally:
+				self._lock.release()
 
 		self._subdirs [pkg_name].add ( package_info )
 	# --- end of add (...) ---
@@ -92,13 +101,56 @@ class Category ( object ):
 			package.show ( **show_kw )
 	# --- end of show (...) ---
 
+	def _run_write_queue ( self, q, write_kw ):
+		try:
+			while not q.empty():
+				pkg = q.get_nowait()
+				pkg.write ( **write_kw )
+
+		except queue.Empty:
+			pass
+		except ( Exception, KeyboardInterrupt ) as e:
+			self.RERAISE_EXCEPTION = e
+
+	# --- end of _run_write_queue (...) ---
+
 	def write ( self, **write_kw ):
 		"""Writes this category to its filesystem location.
 
 		returns: None (implicit)
 		"""
-		for package in self._subdirs.values():
-			if package.physical_location and not package.empty():
-				roverlay.util.dodir ( package.physical_location )
-				package.write ( **write_kw )
+
+		max_jobs = self.__class__.WRITE_JOBCOUNT
+
+		# todo len.. > 3: what's an reasonable number of min package dirs to
+		#                  start threaded writing?
+		if max_jobs > 1 and len ( self._subdirs ) > 3:
+
+			# writing 1..self.__class__.WRITE_JOBCOUNT package dirs at once
+
+			write_queue = queue.Queue()
+			for package in self._subdirs.values():
+				if package.physical_location and not package.empty():
+					roverlay.util.dodir ( package.physical_location )
+					write_queue.put_nowait ( package )
+
+
+			if not write_queue.empty():
+				workers = (
+					threading.Thread (
+						target=self._run_write_queue,
+						args=( write_queue, write_kw )
+					) for n in range ( max_jobs )
+				)
+
+				for w in workers: w.start()
+				for w in workers: w.join()
+
+				if hasattr ( self, 'RERAISE_EXCEPTION' ):
+					raise self.RERAISE_EXCEPTION
+		else:
+			for package in self._subdirs.values():
+				if package.physical_location and not package.empty():
+					roverlay.util.dodir ( package.physical_location )
+					package.write ( **write_kw )
 	# --- end of write (...) ---

diff --git a/roverlay/overlay/creator.py b/roverlay/overlay/creator.py
index 608ba77..aa1a3a8 100644
--- a/roverlay/overlay/creator.py
+++ b/roverlay/overlay/creator.py
@@ -5,6 +5,9 @@
 import time
 import logging
 import threading
+import signal
+import traceback
+import sys
 
 try:
 	import queue
@@ -40,18 +43,26 @@ class OverlayCreator ( object ):
 
 		self.depresolver = easyresolver.setup()
 
-		self.NUMTHREADS  = config.get ( 'EBUILD.jobcount', 0 )
+		self.NUMTHREADS  = config.get ( 'EBUILD.jobcount', 10 )
 
 		# --
-		self._pkg_queue  = queue.Queue()
-		self._workers    = None
-		self._runlock    = threading.RLock()
+		self._pkg_queue = queue.Queue()
+
+		# this queue is used to propagate exceptions from threads
+		#  it's
+		self._err_queue = queue.Queue()
+
+
+		self._workers   = None
+		self._runlock   = threading.RLock()
 
 		self.can_write_overlay = OVERLAY_WRITE_ALLOWED
 
 		# this is a method that adds PackageInfo objects to the pkg queue
 		self.add_package = self._pkg_queue.put
 
+		self.depresolver.set_exception_queue ( self._err_queue )
+
 	# --- end of __init__ (...) ---
 
 	def _timestamp ( self, description, start, stop=None ):
@@ -109,17 +120,24 @@ class OverlayCreator ( object ):
 	def run ( self ):
 		"""Starts ebuild creation and waits until done."""
 		self._runlock.acquire()
-		self.start()
-		self.join()
-		self._runlock.release()
+		try:
+			self.start()
+			self.join()
+		finally:
+			self._runlock.release()
 	# --- end of run (...) ---
 
 	def start ( self ):
 		"""Starts ebuild creation."""
 		self._runlock.acquire()
-		self.join()
-		self._make_workers()
-		self._runlock.release()
+		try:
+			self.join()
+			self._make_workers()
+		except:
+			self._err_queue.put_nowait ( ( -1, None ) )
+			raise
+		finally:
+			self._runlock.release()
 	# --- end of start (...) ---
 
 	def join ( self ):
@@ -142,13 +160,14 @@ class OverlayCreator ( object ):
 		if self.depresolver is None: return
 
 		self._runlock.acquire()
-		if self.depresolver is None: return
 
-		self.depresolver.close()
-		del self.depresolver
-		self.depresolver = None
-
-		self._runlock.release()
+		try:
+			if self.depresolver is not None:
+				self.depresolver.close()
+				del self.depresolver
+				self.depresolver = None
+		finally:
+			self._runlock.release()
 	# --- end of _close_resolver (...) ---
 
 	def _waitfor_workers ( self, do_close ):
@@ -157,26 +176,65 @@ class OverlayCreator ( object ):
 		arguments:
 		* do_close -- close (exit) workers if True, else wait until done.
 		"""
-		if self._workers is None: return
 
-		self._runlock.acquire()
 		if self._workers is None: return
+		start = None
+		self._runlock.acquire()
 
-		if self.NUMTHREADS > 0: start = time.time()
+		try:
+			if self._workers is not None:
+				if self.NUMTHREADS > 0: start = time.time()
+
+				if do_close:
+					self._err_queue.put_nowait ( ( -1, None ) )
+					# fixme: remove enabled?
+					for w in self._workers: w.enabled = False
+				else:
+					for w in self._workers: w.stop_when_empty()
+
+				while True in ( w.active() for w in self._workers ):
+					self._pkg_queue.put ( None )
+
+				del self._workers
+				self._workers = None
+
+				while not self._err_queue.empty():
+					e = self._err_queue.get_nowait()
+					self._err_queue.put_nowait ( ( -2, None ) )
+					if isinstance ( e [1], ( Exception, KeyboardInterrupt ) ):
+						self._err_queue.put ( e )
+						self.logger.warning ( "Reraising thread exception." )
+						raise e [1]
+
+
+
+		except ( Exception, KeyboardInterrupt ) as err:
+			# catch interrupt here: still wait until all workers have been closed
+			# and reraise after that
+#			SIGINT_RESTORE = signal.signal (
+#				signal.SIGINT,
+#				lambda sig, frame : sys.stderr.write ( "Please wait ...\n" )
+#			)
+
+			try:
+				self._err_queue.put_nowait ( ( -1, None ) )
+				self.depresolver.unblock_channels()
+				while hasattr ( self, '_workers' ) and self._workers is not None:
 
-		if do_close:
-			for w in self._workers: w.enabled = False
-		else:
-			for w in self._workers: w.stop_when_empty()
 
-		while True in ( w.active() for w in self._workers ):
-			self._pkg_queue.put ( None )
+					if True in ( w.active() for w in self._workers ):
+						self._pkg_queue.put_nowait ( None )
+					else:
+						del self._workers
+						self._workers = None
+			finally:
+#				signal.signal ( signal.SIGINT, SIGINT_RESTORE )
+				raise
 
-		del self._workers
-		self._workers = None
+		finally:
+			self._runlock.release()
 
-		self._runlock.release()
-		if self.NUMTHREADS > 0: return start
+		return start
 	# --- end of _waitfor_workers (...) ---
 
 	def _join_workers ( self ):
@@ -221,7 +279,8 @@ class OverlayCreator ( object ):
 		"""
 		w = OverlayWorker (
 			self._pkg_queue, self.depresolver, self.logger, self._pkg_done,
-			use_threads=use_threads
+			use_threads=use_threads,
+			err_queue=self._err_queue
 		)
 		if start_now: w.start()
 		return w

diff --git a/roverlay/overlay/package.py b/roverlay/overlay/package.py
index e77d4a2..77fcbee 100644
--- a/roverlay/overlay/package.py
+++ b/roverlay/overlay/package.py
@@ -69,64 +69,60 @@ class PackageDir ( object ):
 			raise Exception ( "cannot write - no directory assigned!" )
 
 		self._lock.acquire()
-		self._regen_metadata()
+		try:
+			self._regen_metadata()
 
-		# mkdir not required here, overlay.Category does this
+			# mkdir not required here, overlay.Category does this
 
-		# write ebuilds
-		for ver, p_info in self._packages.items():
-			fh = None
-			try:
-				efile  = self._get_ebuild_filepath ( ver )
+			# write ebuilds
+			for ver, p_info in self._packages.items():
+				fh = None
+				try:
+					efile  = self._get_ebuild_filepath ( ver )
 
-				ebuild = p_info ['ebuild']
+					ebuild = p_info ['ebuild']
 
-				fh = open ( efile, 'w' )
-				if isinstance ( ebuild, str ):
-					if default_header is not None:
-						fh.write ( default_header )
-						fh.write ( '\n\n' )
-					fh.write ( ebuild )
+					fh = open ( efile, 'w' )
+					fh.write ( default_header )
+					fh.write ( '\n\n' )
+					fh.write ( str ( ebuild ) )
 					fh.write ( '\n' )
-				else:
-					ebuild.write (
-						fh,
-						header=default_header, header_is_fallback=True
-					)
-				if fh: fh.close()
 
-				# adjust owner/perm? TODO
-				# chmod 0644 or 0444
-				# chown 250.250
+					if fh: fh.close()
 
-				# this marks the package as 'written to fs'
-				p_info.set_writeable()
-				p_info ['ebuild_file'] = efile
-				p_info.set_readonly()
+					# adjust owner/perm? TODO
+					# chmod 0644 or 0444
+					# chown 250.250
 
-				self.logger.info ( "Wrote ebuild %s." % efile )
-			except IOError as e:
-				if fh: fh.close()
-				self.logger.error ( "Couldn't write ebuild %s." % efile )
-				self.logger.exception ( e )
+					# this marks the package as 'written to fs'
+					p_info.set_writeable()
+					p_info ['ebuild_file'] = efile
+					p_info.set_readonly()
 
-		# write metadata
-		fh = None
-		try:
-			mfile = self._get_metadata_filepath()
+					self.logger.info ( "Wrote ebuild %s." % efile )
+				except IOError as e:
+					if fh: fh.close()
+					self.logger.error ( "Couldn't write ebuild %s." % efile )
+					self.logger.exception ( e )
 
-			fh    = open ( mfile, 'w' )
-			self._metadata.write ( fh )
-			if fh: fh.close()
+			# write metadata
+			fh = None
+			try:
+				mfile = self._get_metadata_filepath()
 
-		except IOError as e:
-			if fh: fh.close()
-			self.logger.error ( "Failed to write metadata at %s." % mfile )
-			self.logger.exception ( e )
+				fh    = open ( mfile, 'w' )
+				self._metadata.write ( fh )
+				if fh: fh.close()
 
-		self.generate_manifest()
+			except IOError as e:
+				if fh: fh.close()
+				self.logger.error ( "Failed to write metadata at %s." % mfile )
+				self.logger.exception ( e )
 
-		self._lock.release()
+			self.generate_manifest()
+
+		finally:
+			self._lock.release()
 	# --- end of write (...) ---
 
 	def show ( self, stream=sys.stderr, default_header=None ):
@@ -141,35 +137,31 @@ class PackageDir ( object ):
 		* IOError
 		"""
 		self._lock.acquire()
-		self._regen_metadata()
+		try:
+			self._regen_metadata()
 
 
-		for ver, p_info in self._packages.items():
-			efile  = self._get_ebuild_filepath ( ver )
-			ebuild = p_info ['ebuild']
+			for ver, p_info in self._packages.items():
+				efile  = self._get_ebuild_filepath ( ver )
+				ebuild = p_info ['ebuild']
 
-			stream.write ( "[BEGIN ebuild %s]\n" % efile )
-			if isinstance ( ebuild, str ):
-				if default_header is not None:
-					stream.write ( default_header )
-					stream.write ( '\n\n' )
-				stream.write ( ebuild )
+				stream.write ( "[BEGIN ebuild %s]\n" % efile )
+
+				stream.write ( default_header )
+				stream.write ( '\n\n' )
+				stream.write ( str ( ebuild ) )
 				stream.write ( '\n' )
-			else:
-				ebuild.write (
-					stream,
-					header=default_header, header_is_fallback=True
-				)
-			stream.write ( "[END ebuild %s]\n" % efile )
 
-		mfile = self._get_metadata_filepath()
+				stream.write ( "[END ebuild %s]\n" % efile )
 
-		stream.write ( "[BEGIN %s]\n" % mfile )
-		self._metadata.write ( stream )
-		stream.write ( "[END %s]\n" % mfile )
+			mfile = self._get_metadata_filepath()
 
+			stream.write ( "[BEGIN %s]\n" % mfile )
+			self._metadata.write ( stream )
+			stream.write ( "[END %s]\n" % mfile )
 
-		self._lock.release()
+		finally:
+			self._lock.release()
 	# --- end of show (...) ---
 
 	def _latest_package ( self, pkg_filter=None, use_lock=False ):
@@ -186,15 +178,16 @@ class PackageDir ( object ):
 		retpkg = None
 
 		if use_lock: self._lock.acquire()
-		for p in self._packages.values():
-			if pkg_filter is None or pkg_filter ( p ):
-				newver = p ['version']
-				if first or newver > retver:
-					retver = newver
-					retpkg = p
-					first  = False
-
-		if use_lock: self._lock.release()
+		try:
+			for p in self._packages.values():
+				if pkg_filter is None or pkg_filter ( p ):
+					newver = p ['version']
+					if first or newver > retver:
+						retver = newver
+						retpkg = p
+						first  = False
+		finally:
+			if use_lock: self._lock.release()
 		return retpkg
 	# --- end of _latest_package (...) ---
 
@@ -219,7 +212,7 @@ class PackageDir ( object ):
 					self.name, shortver
 				)
 				if SUPPRESS_EXCEPTIONS:
-					self.logger.warning ( msg )
+					self.logger.info ( msg )
 				else:
 					raise Exception ( msg )
 
@@ -265,18 +258,20 @@ class PackageDir ( object ):
 			return
 
 		self._lock.acquire()
+		try:
 
-		if self._metadata is None or not use_old_metadata:
-			del self._metadata
-			self._metadata = MetadataJob ( self.logger )
+			if self._metadata is None or not use_old_metadata:
+				del self._metadata
+				self._metadata = MetadataJob ( self.logger )
 
-		if use_all_packages:
-			for p_info in self._packages:
-				self._metadata.update ( p_info )
-		else:
-			self._metadata.update ( self._latest_package() )
+			if use_all_packages:
+				for p_info in self._packages:
+					self._metadata.update ( p_info )
+			else:
+				self._metadata.update ( self._latest_package() )
 
-		self._lock.release()
+		finally:
+			self._lock.release()
 	# --- end of generate_metadata (...) ---
 
 	def generate_manifest ( self ):

diff --git a/roverlay/overlay/worker.py b/roverlay/overlay/worker.py
index 471cb94..541de7f 100644
--- a/roverlay/overlay/worker.py
+++ b/roverlay/overlay/worker.py
@@ -3,16 +3,21 @@
 # Distributed under the terms of the GNU General Public License v2
 
 #import time
+import sys
 import threading
 
 from roverlay.depres.channels    import EbuildJobChannel
 from roverlay.ebuild.creation    import EbuildCreation
 
+# this controls whether debug message from OverlayWorker.run() are printed
+# to stderr or suppressed
+DEBUG = True
+
 class OverlayWorker ( object ):
 	"""Overlay package queue worker."""
 
 	def __init__ ( self,
-		pkg_queue, depresolver, logger, pkg_done, use_threads
+		pkg_queue, depresolver, logger, pkg_done, use_threads, err_queue
 	):
 		"""Initializes a worker.
 
@@ -29,11 +34,12 @@ class OverlayWorker ( object ):
 		self.pkg_done    = pkg_done
 		self.depresolver = depresolver
 
+		self.err_queue   = err_queue
+
 		self._use_thread = use_threads
 		self._thread     = None
-
-		self.enabled     = True
 		self.running     = False
+		self.enabled     = True
 		self.halting     = False
 	# --- end of __init__ (...) ---
 
@@ -81,7 +87,8 @@ class OverlayWorker ( object ):
 		"""
 		job = EbuildCreation (
 			package_info,
-			depres_channel_spawner=self._get_resolver_channel
+			depres_channel_spawner=self._get_resolver_channel,
+			err_queue=self.err_queue
 		)
 		job.run()
 		self.pkg_done ( package_info )
@@ -89,42 +96,66 @@ class OverlayWorker ( object ):
 
 	def _run ( self ):
 		"""Runs the worker (thread mode)."""
-		self.running = True
-		self.halting = False
-		while self.enabled or (
-			self.halting and not self.pkg_queue.empty()
-		):
-			if not self.running:
-				# exit now
-				break
-			p = self.pkg_queue.get()
 
-			# drop empty requests that are used to unblock get()
-			if p is not None:
-				self._process ( p )
-			elif self.halting:
-				# receiving an empty request while halting means 'stop now',
-				self.enabled = False
-				self.halting = False
+		def debug ( msg ):
+			if DEBUG:
+				sys.stderr.write (
+					"%i WORKER: %s\n" % ( id ( self ), msg )
+				)
+
+		try:
+			self.running = True
+			self.halting = False
+			while self.enabled or (
+				self.halting and not self.pkg_queue.empty()
+			):
+				if not self.err_queue.empty():
+					# other workers died (or exit request sent)
+					debug ( "STOPPING #1" )
+					break
+
+				debug ( "WAITING" )
+				p = self.pkg_queue.get()
+				debug ( "RECEIVED A TASK, " + str ( p ) )
+
+				if not self.err_queue.empty():
+					debug ( "STOPPING #2" )
+					break
+
+				# drop empty requests that are used to unblock get()
+				if p is not None:
+					debug ( "ENTER PROC" )
+					if self.err_queue.empty():
+						debug ( "__ empty exception/error queue!" )
+					self._process ( p )
+				elif self.halting:
+					# receiving an empty request while halting means 'stop now',
+					self.enabled = False
+					self.halting = False
+
+				self.pkg_queue.task_done()
+
+			debug ( "STOPPING - DONE" )
+		except ( Exception, KeyboardInterrupt ) as e:
+			self.logger.exception ( e )
+			self.err_queue.put_nowait ( ( id ( self ), e ) )
 
-			self.pkg_queue.task_done()
 		self.running = False
+
 	# --- end of run (...) ---
 
 	def _run_nothread ( self ):
 		"""Runs the worker (no-thread mode)."""
 		self.running = True
 		while self.enabled and not self.pkg_queue.empty():
-			if not self.running:
-				# exit now
-				break
 
-			p = self.pkg_queue.get()
+			p = self.pkg_queue.get_nowait()
 
 			# drop empty requests that are used to unblock get()
 			if p is not None:
 				self._process ( p )
 
 			self.pkg_queue.task_done()
+
 		self.running = False
 	# --- end of _run_nothread (...) ---



^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2012-07-02 16:52 UTC | newest]

Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2012-07-02 16:52 [gentoo-commits] proj/R_overlay:master commit in: roverlay/ebuild/, roverlay/depres/, roverlay/overlay/ André Erdmann

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox