public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] portage r10918 - main/trunk/pym/_emerge
@ 2008-07-04  0:11 Zac Medico (zmedico)
  0 siblings, 0 replies; only message in thread
From: Zac Medico (zmedico) @ 2008-07-04  0:11 UTC (permalink / raw
  To: gentoo-commits

Author: zmedico
Date: 2008-07-04 00:11:03 +0000 (Fri, 04 Jul 2008)
New Revision: 10918

Modified:
   main/trunk/pym/_emerge/__init__.py
Log:
* Split out a SequentialTaskQueue class to encapsulate the parallel-fetch
  prefetcher queue.

* Fix broken return value status handling in Scheduler.merge().


Modified: main/trunk/pym/_emerge/__init__.py
===================================================================
--- main/trunk/pym/_emerge/__init__.py	2008-07-03 22:48:21 UTC (rev 10917)
+++ main/trunk/pym/_emerge/__init__.py	2008-07-04 00:11:03 UTC (rev 10918)
@@ -6966,6 +6966,49 @@
 			poll_events.append((fd, select.POLLIN))
 		return poll_events
 
+class SequentialTaskQueue(SlotObject):
+
+	__slots__ = ("max_jobs", "running_tasks", "_task_queue")
+
+	def __init__(self, **kwargs):
+		SlotObject.__init__(self, **kwargs)
+		from collections import deque
+		self._task_queue = deque()
+		self.running_tasks = set()
+		if self.max_jobs is None:
+			self.max_jobs = 1
+
+	def add(self, task):
+		self._task_queue.append(task)
+
+	def schedule(self):
+		task_queue = self._task_queue
+		running_tasks = self.running_tasks
+		max_jobs = self.max_jobs
+		state_changed = False
+
+		for task in list(running_tasks):
+			if not task.registered and task.poll() is not None:
+				running_tasks.remove(task)
+				state_changed = True
+
+		while task_queue and (len(running_tasks) < max_jobs):
+			task = task_queue.popleft()
+			cancelled = getattr(task, "cancelled", None)
+			if not cancelled:
+				task.start()
+				running_tasks.add(task)
+			state_changed = True
+
+		return state_changed
+
+	def clear(self):
+		self._task_queue.clear()
+		running_tasks = self.running_tasks
+		while running_tasks:
+			task = running_tasks.pop()
+			task.cancel()
+
 class Scheduler(object):
 
 	_opts_ignore_blockers = \
@@ -7038,10 +7081,10 @@
 		except AttributeError:
 			self._poll = PollSelectFallback()
 
-		from collections import deque
-		self._task_queue = deque()
-		self._running_tasks = set()
-		self._max_jobs = 1
+		self._prefetch_queue = SequentialTaskQueue()
+		self._add_task = self._prefetch_queue.add
+		self._schedule_tasks = self._prefetch_queue.schedule
+
 		self._prefetchers = weakref.WeakValueDictionary()
 		self._failed_fetches = []
 		self._parallel_fetch = False
@@ -7071,9 +7114,6 @@
 				except EnvironmentError:
 					pass
 
-	def _add_task(self, task):
-		self._task_queue.append(task)
-
 	class _pkg_failure(portage.exception.PortageException):
 		"""
 		An instance of this class is raised by unmerge() when
@@ -7282,7 +7322,7 @@
 		mtimedb = self._mtimedb
 
 		while True:
-			self._merge()
+			rval = self._merge()
 			self._show_failed_fetches()
 			del self._failed_fetches[:]
 
@@ -7350,11 +7390,7 @@
 					return e.status
 		finally:
 			# clean up child process if necessary
-			self._task_queue.clear()
-			running_tasks = self._running_tasks
-			while running_tasks:
-				task = running_tasks.pop()
-				task.cancel()
+			self._prefetch_queue.clear()
 		return os.EX_OK
 
 	def _save_resume_list(self):
@@ -7411,7 +7447,7 @@
 
 	def _schedule(self):
 		event_handlers = self._poll_event_handlers
-		running_tasks = self._running_tasks
+		running_tasks = self._prefetch_queue.running_tasks
 		poll = self._poll.poll
 
 		self._schedule_tasks()
@@ -7426,27 +7462,6 @@
 				# handler, so it's time to yield.
 				break
 
-	def _schedule_tasks(self):
-		task_queue = self._task_queue
-		running_tasks = self._running_tasks
-		max_jobs = self._max_jobs
-		state_changed = False
-
-		for task in list(running_tasks):
-			if not task.registered and task.poll() is not None:
-				running_tasks.remove(task)
-				state_changed = True
-
-		while task_queue and (len(running_tasks) < max_jobs):
-			task = task_queue.popleft()
-			cancelled = getattr(task, "cancelled", None)
-			if not cancelled:
-				task.start()
-				running_tasks.add(task)
-			state_changed = True
-
-		return state_changed
-
 	def _world_atom(self, pkg):
 		"""
 		Add the package to the world file, but only if

-- 
gentoo-commits@lists.gentoo.org mailing list



^ permalink raw reply	[flat|nested] only message in thread

only message in thread, other threads:[~2008-07-04  0:11 UTC | newest]

Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2008-07-04  0:11 [gentoo-commits] portage r10918 - main/trunk/pym/_emerge Zac Medico (zmedico)

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox