* [gentoo-commits] portage r10918 - main/trunk/pym/_emerge
@ 2008-07-04 0:11 Zac Medico (zmedico)
0 siblings, 0 replies; only message in thread
From: Zac Medico (zmedico) @ 2008-07-04 0:11 UTC (permalink / raw
To: gentoo-commits
Author: zmedico
Date: 2008-07-04 00:11:03 +0000 (Fri, 04 Jul 2008)
New Revision: 10918
Modified:
main/trunk/pym/_emerge/__init__.py
Log:
* Split out a SequentialTaskQueue class to encapsulate the parallel-fetch
prefetcher queue.
* Fix broken return value status handling in Scheduler.merge().
Modified: main/trunk/pym/_emerge/__init__.py
===================================================================
--- main/trunk/pym/_emerge/__init__.py 2008-07-03 22:48:21 UTC (rev 10917)
+++ main/trunk/pym/_emerge/__init__.py 2008-07-04 00:11:03 UTC (rev 10918)
@@ -6966,6 +6966,49 @@
poll_events.append((fd, select.POLLIN))
return poll_events
+class SequentialTaskQueue(SlotObject):
+
+ __slots__ = ("max_jobs", "running_tasks", "_task_queue")
+
+ def __init__(self, **kwargs):
+ SlotObject.__init__(self, **kwargs)
+ from collections import deque
+ self._task_queue = deque()
+ self.running_tasks = set()
+ if self.max_jobs is None:
+ self.max_jobs = 1
+
+ def add(self, task):
+ self._task_queue.append(task)
+
+ def schedule(self):
+ task_queue = self._task_queue
+ running_tasks = self.running_tasks
+ max_jobs = self.max_jobs
+ state_changed = False
+
+ for task in list(running_tasks):
+ if not task.registered and task.poll() is not None:
+ running_tasks.remove(task)
+ state_changed = True
+
+ while task_queue and (len(running_tasks) < max_jobs):
+ task = task_queue.popleft()
+ cancelled = getattr(task, "cancelled", None)
+ if not cancelled:
+ task.start()
+ running_tasks.add(task)
+ state_changed = True
+
+ return state_changed
+
+ def clear(self):
+ self._task_queue.clear()
+ running_tasks = self.running_tasks
+ while running_tasks:
+ task = running_tasks.pop()
+ task.cancel()
+
class Scheduler(object):
_opts_ignore_blockers = \
@@ -7038,10 +7081,10 @@
except AttributeError:
self._poll = PollSelectFallback()
- from collections import deque
- self._task_queue = deque()
- self._running_tasks = set()
- self._max_jobs = 1
+ self._prefetch_queue = SequentialTaskQueue()
+ self._add_task = self._prefetch_queue.add
+ self._schedule_tasks = self._prefetch_queue.schedule
+
self._prefetchers = weakref.WeakValueDictionary()
self._failed_fetches = []
self._parallel_fetch = False
@@ -7071,9 +7114,6 @@
except EnvironmentError:
pass
- def _add_task(self, task):
- self._task_queue.append(task)
-
class _pkg_failure(portage.exception.PortageException):
"""
An instance of this class is raised by unmerge() when
@@ -7282,7 +7322,7 @@
mtimedb = self._mtimedb
while True:
- self._merge()
+ rval = self._merge()
self._show_failed_fetches()
del self._failed_fetches[:]
@@ -7350,11 +7390,7 @@
return e.status
finally:
# clean up child process if necessary
- self._task_queue.clear()
- running_tasks = self._running_tasks
- while running_tasks:
- task = running_tasks.pop()
- task.cancel()
+ self._prefetch_queue.clear()
return os.EX_OK
def _save_resume_list(self):
@@ -7411,7 +7447,7 @@
def _schedule(self):
event_handlers = self._poll_event_handlers
- running_tasks = self._running_tasks
+ running_tasks = self._prefetch_queue.running_tasks
poll = self._poll.poll
self._schedule_tasks()
@@ -7426,27 +7462,6 @@
# handler, so it's time to yield.
break
- def _schedule_tasks(self):
- task_queue = self._task_queue
- running_tasks = self._running_tasks
- max_jobs = self._max_jobs
- state_changed = False
-
- for task in list(running_tasks):
- if not task.registered and task.poll() is not None:
- running_tasks.remove(task)
- state_changed = True
-
- while task_queue and (len(running_tasks) < max_jobs):
- task = task_queue.popleft()
- cancelled = getattr(task, "cancelled", None)
- if not cancelled:
- task.start()
- running_tasks.add(task)
- state_changed = True
-
- return state_changed
-
def _world_atom(self, pkg):
"""
Add the package to the world file, but only if
--
gentoo-commits@lists.gentoo.org mailing list
^ permalink raw reply [flat|nested] only message in thread
only message in thread, other threads:[~2008-07-04 0:11 UTC | newest]
Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2008-07-04 0:11 [gentoo-commits] portage r10918 - main/trunk/pym/_emerge Zac Medico (zmedico)
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox