public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
From: "André Erdmann" <dywi@mailerd.de>
To: gentoo-commits@lists.gentoo.org
Subject: [gentoo-commits] proj/R_overlay:master commit in: roverlay/depres/
Date: Mon, 16 Jul 2012 16:15:46 +0000 (UTC)	[thread overview]
Message-ID: <1342175981.bedc1a5d0029f17c255422310bd5ef98aa7d3fb8.dywi@gentoo> (raw)
Message-ID: <20120716161546.WmYmc5hFsXsj_pYVzKVh7HxFyGXpzfl0VEk_MO5bSZs@z> (raw)

commit:     bedc1a5d0029f17c255422310bd5ef98aa7d3fb8
Author:     André Erdmann <dywi <AT> mailerd <DOT> de>
AuthorDate: Fri Jul 13 10:39:41 2012 +0000
Commit:     André Erdmann <dywi <AT> mailerd <DOT> de>
CommitDate: Fri Jul 13 10:39:41 2012 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/R_overlay.git;a=commit;h=bedc1a5d

depresolver: separate run method for threads

the run-method code for the depresolver is being split up into
a threaded and not-threaded variant, to increase readability,
stability and performance.

* threaded resolving temporarily removed

---
 roverlay/depres/depresolver.py |  355 ++++++++++++++++++++--------------------
 1 files changed, 179 insertions(+), 176 deletions(-)

diff --git a/roverlay/depres/depresolver.py b/roverlay/depres/depresolver.py
index bbe58b4..5f85ad1 100644
--- a/roverlay/depres/depresolver.py
+++ b/roverlay/depres/depresolver.py
@@ -16,10 +16,8 @@ from roverlay        import config
 from roverlay.depres import communication, deptype, events
 #from roverlay.depres import simpledeprule
 
-#from roverlay.depres.depenv import DepEnv (implicit)
 
-
-# if false: do not using the "negative" result caching which stores
+# if false: do not use the "negative" result caching which stores
 # unresolvable deps in a set for should-be faster lookups
 USING_DEPRES_CACHE = True
 
@@ -29,9 +27,6 @@ SAFE_CHANNEL_IDS = True
 class DependencyResolver ( object ):
 	"""Main object for dependency resolution."""
 
-
-	NUMTHREADS = config.get ( "DEPRES.jobcount", 0 )
-
 	def __init__ ( self, err_queue ):
 		"""Initializes a DependencyResolver."""
 
@@ -44,16 +39,17 @@ class DependencyResolver ( object ):
 			'RESOLVED', 'UNRESOLVABLE'
 		)
 
-		# this lock tells whether a dep res 'master' thread is running (locked)
-		self.runlock     = threading.Lock()
-		self.startlock   = threading.Lock()
-		# the dep res main thread
-		self._mainthread = None
-		# the dep res worker threads
-		self._threads    = None
+		self._jobs = config.get ( "DEPRES.jobcount", 0 )
 
-		self.err_queue   = err_queue
+		# used to lock the run methods,
+		self._runlock = threading.Lock()
 
+		if self._jobs > 0:
+			# the dep res main thread
+			self._mainthread = None
+
+
+		self.err_queue = err_queue
 
 		# the list of registered listener modules
 		self.listeners = list ()
@@ -139,13 +135,13 @@ class DependencyResolver ( object ):
 			# log this event
 			if event_type == events.DEPRES_EVENTS ['RESOLVED']:
 				self.logger_resolved.info (
-					"'%s' as '%s'" % ( dep_env.dep_str, dep_env.resolved_by )
+					"{!r} as {!r}".format ( dep_env.dep_str, dep_env.resolved_by )
 				)
 			elif event_type == events.DEPRES_EVENTS ['UNRESOLVABLE']:
-				self.logger_unresolvable.info ( "'%s'" % dep_env.dep_str )
+				self.logger_unresolvable.info ( "{!r}".format ( dep_env.dep_str ) )
 			else:
 				# "generic" event, expects that kw msg is set
-				self.logger.debug ( "event %s : %s" % ( event, msg ) )
+				self.logger.debug ( "event {}: {}".format ( event, msg ) )
 		# --- if
 
 		if self.listenermask & event_type:
@@ -274,48 +270,180 @@ class DependencyResolver ( object ):
 	# --- end of _queue_previously_failed (...) ---
 
 	def start ( self ):
-		# -- verify whether resolver has to be started
-		if self._depqueue.empty():
-			# nothing to resolve
-			return
+		if self._jobs == 0:
+			if not self._depqueue.empty():
+				self._run_resolver()
+			if not self.err_queue.really_empty():
+				self.err_queue.unblock_queues()
+		else:
+			# new resolver threads run async and
+			# can be started with an empty depqueue
+			if self._runlock.acquire ( False ):
+				# else resolver is running
+
+				self._mainthread = threading.Thread (
+					target=self._thread_run_resolver
+				)
+				self._mainthread.start()
+				# _thread_run_resolver has to release the lock when done
+	# --- end of start (...) ---
+
+	def _process_unresolvable_queue ( self ):
+		# iterate over _depqueue_failed and report unresolved
+		while not self._depqueue_failed.empty() and self.err_queue.empty:
+			try:
+				channel_id, dep_env = self._depqueue_failed.get_nowait()
+			except queue.Empty:
+				# race cond empty() <-> get_nowait()
+				return
+
+			dep_env.set_unresolvable()
+			self._report_event ( 'UNRESOLVABLE', dep_env )
+
+			try:
+				if channel_id in self._depqueue_done:
+					self._depqueue_done [channel_id].put_nowait ( dep_env )
+			except KeyError:
+				# channel has been closed before calling put, ignore this
+				pass
+	# --- end of _process_unresolvable_queue (...) ---
+
+	def _process_dep ( self, queue_item ):
+		channel_id, dep_env = queue_item
+
+		# drop dep if channel closed
+		if not channel_id in self._depqueue_done: return
+
+		self.logger.debug (
+			"Trying to resolve {!r}.".format ( dep_env.dep_str )
+		)
 
-		if not self.startlock.acquire ( False ):
-			# another channel/.. is starting the resolver
-			return
-		elif self._depqueue.empty():
-			self.startlock.release()
-			return
+		resolved = None
+		# resolved can be None, so use a tri-state int for checking
+		#  0 -> unresolved, but resolvable
+		#  1 -> unresolved and (currently, new rules may change this)
+		#        not resolvable
+		#  2 -> resolved
+		is_resolved = 0
 
-		# -- verify...
+		if USING_DEPRES_CACHE and dep_env.dep_str_low in self._dep_unresolvable:
+			# cannot resolve
+			is_resolved = 1
 
-		# acquire the run lock (that locks _run_main)
+		else:
+			# search for a match in the rule pools that accept the dep type
+			for rulepool in (
+				p for p in self.static_rule_pools \
+					if p.deptype_mask & dep_env.deptype_mask
+			):
+				result = rulepool.matches ( dep_env )
+				if result [0] > 0:
+					resolved    = result [1]
+					is_resolved = 2
+					break
+
+			if is_resolved == 0 and dep_env.deptype_mask & deptype.try_other:
+				## TRY_OTHER bit is set
+				# search for a match in the rule pools
+				#  that (normally) don't accept the dep type
+				for rulepool in (
+					p for p in self.static_rule_pools \
+						if p.deptype_mask & ~dep_env.deptype_mask
+				):
+					result = rulepool.matches ( dep_env )
+					if result [0] > 0:
+						resolved    = result [1]
+						is_resolved = 2
+						break
+			# --
+
+
+		# -- done with resolving
+
+		if is_resolved != 2:
+			# could not resolve dep_env
+			self._depqueue_failed.put ( queue_item )
+			if USING_DEPRES_CACHE:
+				# does not work when adding new rules is possible
+				self._dep_unresolvable.add ( dep_env.dep_str_low )
+		else:
+			# successfully resolved
+			dep_env.set_resolved ( resolved, append=False )
+			self._report_event ( 'RESOLVED', dep_env )
+			try:
+				self._depqueue_done [channel_id].put ( dep_env )
+			except KeyError:
+				# channel gone while resolving
+				pass
+
+			"""
+			## only useful if new rules can be created
+			# new rule found, requeue all previously
+			#  failed dependency searches
+			if have_new_rule:
+				self._queue_previously_failed
+				if USING_DEPRES_CACHE:
+					self._dep_unresolvable.clear() #?
+			"""
+	# --- end of _process_dep (...) ---
+
+	def _run_resolver ( self ):
+		# single-threaded variant of run
+		#  still checking err_queue 'cause other modules
+		#  could be run with threads
+		if self._depqueue.empty(): return
 		try:
-			self.runlock.acquire()
+			self._runlock.acquire()
+			while not self._depqueue.empty() and self.err_queue.empty:
+				to_resolve = self._depqueue.get_nowait()
+				self._process_dep ( queue_item=to_resolve )
+				self._depqueue.task_done()
+
+			self._process_unresolvable_queue()
+		except ( Exception, KeyboardInterrupt ) as e:
+			# single-threaded exception catcher:
+			# * push exception to inform other threads (if any)
+			# * unblock queues (automatically when calling push)
+			# * reraise
+			self.err_queue.push ( id ( self ), e )
+			raise e
 		finally:
-			self.startlock.release()
+			self._runlock.release()
+	# --- end of _run_resolver (...) ---
 
-		if self._depqueue.empty():
-			self.runlock.release()
-			return
+	def _thread_run_resolver ( self ):
+		raise Exception ( "method stub" )
+	# --- end of _thread_run_resolver (...) ---
 
-		if DependencyResolver.NUMTHREADS > 0:
-			# no need to wait for the old thread
-			# FIXME: could remove the following block
-			if self._mainthread is not None:
-				self._mainthread.join()
-				del self._mainthread
+	def enqueue ( self, dep_env, channel_id ):
+		"""Adds a DepEnv to the queue of deps to resolve.
 
-			self._mainthread = threading.Thread ( target=self._thread_run_main )
-			self._mainthread.start()
+		arguments:
+		* dep_env -- to add
+		* channel_id -- identifier of the channel associated with the dep_env
 
-		else:
-			self._thread_run_main()
+		returns: None (implicit)
+		"""
+		self._depqueue.put ( ( channel_id, dep_env ) )
+
+	# --- end of enqueue (...) ---
+
+	def close ( self ):
+		if self._jobs > 0:
+			if self._mainthread:
+				self._mainthread.join()
+		for lis in self.listeners: lis.close()
+		del self.listeners
+		if SAFE_CHANNEL_IDS:
+			self.logger.debug (
+				"{} channels were in use.".format ( len ( self.all_channel_ids ) )
+			)
+	# --- end of close (...) ---
 
-		# self.runlock is released when _thread_run_main is done
-	# --- end of start (...) ---
 
 	def _thread_run_main ( self ):
 		"""Tells the resolver to run."""
+		raise Exception ( "to be removed" )
 		jobcount = self.__class__.NUMTHREADS
 
 		try:
@@ -351,26 +479,7 @@ class DependencyResolver ( object ):
 
 			# iterate over _depqueue_failed and report unresolved
 			## todo can thread this
-			while not self._depqueue_failed.empty() and self.err_queue.empty:
-				try:
-					channel_id, dep_env = self._depqueue_failed.get_nowait()
-
-				except queue.Empty:
-					# race cond empty() <-> get_nowait()
-					return
-
-
-				dep_env.set_unresolvable()
-
-				self._report_event ( 'UNRESOLVABLE', dep_env )
-
-
-				try:
-					if channel_id in self._depqueue_done:
-						self._depqueue_done [channel_id].put_nowait ( dep_env )
-				except KeyError:
-					# channel has been closed before calling put, ignore this
-					pass
+			self._process_unresolvable_queue()
 
 			if not self.err_queue.really_empty():
 				self.err_queue.unblock_queues()
@@ -384,7 +493,7 @@ class DependencyResolver ( object ):
 
 		finally:
 			# release the lock
-			self.runlock.release()
+			self._runlock.release()
 
 	# --- end of _thread_run_main (...) ---
 
@@ -393,100 +502,18 @@ class DependencyResolver ( object ):
 
 		returns: None (implicit)
 		"""
+		raise Exception ( "to be removed" )
 		try:
 			while self.err_queue.empty and not self._depqueue.empty():
 				try:
 					to_resolve = self._depqueue.get_nowait()
+					self._process_dep ( queue_item=to_resolve )
+					self._depqueue.task_done()
 				except queue.Empty:
 					# this thread is done when the queue is empty, so this is
 					# no error, but just the result of the race condition between
 					# queue.empty() and queue.get(False)
 					return
-
-				channel_id, dep_env = to_resolve
-
-				if channel_id in self._depqueue_done:
-					# else channel has been closed, drop dep
-
-					self.logger.debug (
-						"Trying to resolve '%s'." % dep_env.dep_str
-					)
-
-					#have_new_rule = False
-
-					resolved = None
-					# resolved can be None, so use a tri-state int for checking
-					#  0 -> unresolved, but resolvable
-					#  1 -> unresolved and (currently, new rules may change this)
-					#        not resolvable
-					#  2 -> resolved
-					is_resolved = 0
-
-					# TODO:
-					#  (threading: could search the pools in parallel)
-
-					if USING_DEPRES_CACHE:
-						if dep_env.dep_str_low in self._dep_unresolvable:
-							# cannot resolve
-							is_resolved = 1
-
-					if is_resolved == 0:
-						# search for a match in the rule pools that accept
-						# the dep type
-						for rulepool in ( p for p in self.static_rule_pools \
-							if p.deptype_mask & dep_env.deptype_mask
-						):
-							result = rulepool.matches ( dep_env )
-							if not result is None and result [0] > 0:
-								resolved    = result [1]
-								is_resolved = 2
-								break
-
-					if is_resolved == 0 and \
-						dep_env.deptype_mask & deptype.try_other \
-					:
-						# search for a match in the rule pools that (normally)
-						# don't accept the dep type
-						for rulepool in ( p for p in self.static_rule_pools \
-							if p.deptype_mask & ~dep_env.deptype_mask
-						):
-							result = rulepool.matches ( dep_env )
-							if not result is None and result [0] > 0:
-								resolved    = result [1]
-								is_resolved = 2
-								break
-
-
-
-
-
-					if is_resolved == 2:
-						dep_env.set_resolved ( resolved, append=False )
-						self._report_event ( 'RESOLVED', dep_env )
-						try:
-							self._depqueue_done [channel_id].put ( dep_env )
-						except KeyError:
-							# channel gone while resolving
-							pass
-					else:
-						self._depqueue_failed.put ( to_resolve )
-
-						if USING_DEPRES_CACHE:
-							# does not work when adding new rules is possible
-							self._dep_unresolvable.add ( dep_env.dep_str_low )
-
-					"""
-					## only useful if new rules can be created
-					# new rule found, requeue all previously
-					#  failed dependency searches
-					if have_new_rule:
-						self._queue_previously_failed
-						if USING_DEPRES_CACHE:
-							self._dep_unresolvable.clear() #?
-					"""
-				# --- end if channel_id in self._depqueue_done
-
-				self._depqueue.task_done()
 			# --- end while
 
 		except ( Exception, KeyboardInterrupt ) as e:
@@ -498,27 +525,3 @@ class DependencyResolver ( object ):
 
 
 	# --- end of _thread_run_resolve (...) ---
-
-	def enqueue ( self, dep_env, channel_id ):
-		"""Adds a DepEnv to the queue of deps to resolve.
-
-		arguments:
-		* dep_env -- to add
-		* channel_id -- identifier of the channel associated with the dep_env
-
-		returns: None (implicit)
-		"""
-		self._depqueue.put ( ( channel_id, dep_env ) )
-
-	# --- end of enqueue (...) ---
-
-	def close ( self ):
-		if isinstance ( self._mainthread, threading.Thread ):
-			self._mainthread.join()
-		for lis in self.listeners: lis.close()
-		del self.listeners
-		if SAFE_CHANNEL_IDS:
-			self.logger.debug (
-				"%i channels were in use." % len ( self.all_channel_ids )
-			)
-	# --- end of close (...) ---



             reply	other threads:[~2012-07-16 16:17 UTC|newest]

Thread overview: 4+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2012-07-13 10:44 André Erdmann [this message]
2012-07-16 16:15 ` [gentoo-commits] proj/R_overlay:master commit in: roverlay/depres/ André Erdmann
  -- strict thread matches above, loose matches on Subject: below --
2012-07-16 16:15 André Erdmann
2012-07-16 16:15 ` [gentoo-commits] proj/R_overlay:depres_wip " André Erdmann
2012-07-13 10:44 André Erdmann

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1342175981.bedc1a5d0029f17c255422310bd5ef98aa7d3fb8.dywi@gentoo \
    --to=dywi@mailerd.de \
    --cc=gentoo-commits@lists.gentoo.org \
    --cc=gentoo-dev@lists.gentoo.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox