public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/portage:master commit in: pym/portage/tests/process/, pym/portage/util/_async/, pym/portage/tests/ebuild/, ...
@ 2012-10-05 20:49 Zac Medico
  0 siblings, 0 replies; only message in thread
From: Zac Medico @ 2012-10-05 20:49 UTC (permalink / raw
  To: gentoo-commits

commit:     e4b64dd7dc7c2217055f110990b2496b71976681
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Fri Oct  5 20:48:53 2012 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Oct  5 20:48:53 2012 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=e4b64dd7

TaskScheduler: inherit AsyncScheduler

This allows the QueueScheduler class to be eliminated.

---
 pym/_emerge/FifoIpcDaemon.py                |    2 +
 pym/_emerge/QueueScheduler.py               |  105 ---------------------------
 pym/_emerge/TaskScheduler.py                |   26 -------
 pym/portage/tests/ebuild/test_ipc_daemon.py |   58 +++++++++------
 pym/portage/tests/process/test_poll.py      |   17 ++---
 pym/portage/util/_async/TaskScheduler.py    |   22 ++++++
 6 files changed, 68 insertions(+), 162 deletions(-)

diff --git a/pym/_emerge/FifoIpcDaemon.py b/pym/_emerge/FifoIpcDaemon.py
index fcc4ab4..de9dc67 100644
--- a/pym/_emerge/FifoIpcDaemon.py
+++ b/pym/_emerge/FifoIpcDaemon.py
@@ -47,6 +47,8 @@ class FifoIpcDaemon(AbstractPollTask):
 		if self.returncode is None:
 			self.returncode = 1
 		self._unregister()
+		# notify exit listeners
+		self.wait()
 
 	def _wait(self):
 		if self.returncode is not None:

diff --git a/pym/_emerge/QueueScheduler.py b/pym/_emerge/QueueScheduler.py
deleted file mode 100644
index 206087c..0000000
--- a/pym/_emerge/QueueScheduler.py
+++ /dev/null
@@ -1,105 +0,0 @@
-# Copyright 1999-2012 Gentoo Foundation
-# Distributed under the terms of the GNU General Public License v2
-
-from _emerge.PollScheduler import PollScheduler
-
-class QueueScheduler(PollScheduler):
-
-	"""
-	Add instances of SequentialTaskQueue and then call run(). The
-	run() method returns when no tasks remain.
-	"""
-
-	def __init__(self, main=True, max_jobs=None, max_load=None):
-		PollScheduler.__init__(self, main=main)
-
-		if max_jobs is None:
-			max_jobs = 1
-
-		self._max_jobs = max_jobs
-		self._max_load = max_load
-
-		self._queues = []
-		self._schedule_listeners = []
-
-	def add(self, q):
-		self._queues.append(q)
-
-	def remove(self, q):
-		self._queues.remove(q)
-
-	def clear(self):
-		for q in self._queues:
-			q.clear()
-
-	def run(self, timeout=None):
-
-		timeout_callback = None
-		if timeout is not None:
-			def timeout_callback():
-				timeout_callback.timed_out = True
-				return False
-			timeout_callback.timed_out = False
-			timeout_callback.timeout_id = self.sched_iface.timeout_add(
-				timeout, timeout_callback)
-
-		term_check_id = self.sched_iface.idle_add(self._termination_check)
-		try:
-			while not (timeout_callback is not None and
-				timeout_callback.timed_out):
-				# We don't have any callbacks to trigger _schedule(),
-				# so we have to call it explicitly here.
-				self._schedule()
-				if self._keep_scheduling():
-					self.sched_iface.iteration()
-				else:
-					break
-
-			while self._is_work_scheduled() and \
-				not (timeout_callback is not None and
-				timeout_callback.timed_out):
-				self.sched_iface.iteration()
-		finally:
-			self.sched_iface.source_remove(term_check_id)
-			if timeout_callback is not None:
-				self.sched_iface.unregister(timeout_callback.timeout_id)
-
-	def _schedule_tasks(self):
-		"""
-		@rtype: bool
-		@return: True if there may be remaining tasks to schedule,
-			False otherwise.
-		"""
-		if self._terminated_tasks:
-			return
-
-		while self._can_add_job():
-			n = self._max_jobs - self._running_job_count()
-			if n < 1:
-				break
-
-			if not self._start_next_job(n):
-				return
-
-	def _keep_scheduling(self):
-		return not self._terminated_tasks and any(self._queues)
-
-	def _running_job_count(self):
-		job_count = 0
-		for q in self._queues:
-			job_count += len(q.running_tasks)
-		self._jobs = job_count
-		return job_count
-
-	def _start_next_job(self, n=1):
-		started_count = 0
-		for q in self._queues:
-			initial_job_count = len(q.running_tasks)
-			q.schedule()
-			final_job_count = len(q.running_tasks)
-			if final_job_count > initial_job_count:
-				started_count += (final_job_count - initial_job_count)
-			if started_count >= n:
-				break
-		return started_count
-

diff --git a/pym/_emerge/TaskScheduler.py b/pym/_emerge/TaskScheduler.py
deleted file mode 100644
index 583bfe3..0000000
--- a/pym/_emerge/TaskScheduler.py
+++ /dev/null
@@ -1,26 +0,0 @@
-# Copyright 1999-2012 Gentoo Foundation
-# Distributed under the terms of the GNU General Public License v2
-
-from _emerge.QueueScheduler import QueueScheduler
-from _emerge.SequentialTaskQueue import SequentialTaskQueue
-
-class TaskScheduler(object):
-
-	"""
-	A simple way to handle scheduling of AsynchrousTask instances. Simply
-	add tasks and call run(). The run() method returns when no tasks remain.
-	"""
-
-	def __init__(self, main=True, max_jobs=None, max_load=None):
-		self._queue = SequentialTaskQueue(max_jobs=max_jobs)
-		self._scheduler = QueueScheduler(main=main,
-			max_jobs=max_jobs, max_load=max_load)
-		self.sched_iface = self._scheduler.sched_iface
-		self.run = self._scheduler.run
-		self.clear = self._scheduler.clear
-		self.wait = self._queue.wait
-		self._scheduler.add(self._queue)
-
-	def add(self, task):
-		self._queue.add(task)
-

diff --git a/pym/portage/tests/ebuild/test_ipc_daemon.py b/pym/portage/tests/ebuild/test_ipc_daemon.py
index 77277fe..d4328a1 100644
--- a/pym/portage/tests/ebuild/test_ipc_daemon.py
+++ b/pym/portage/tests/ebuild/test_ipc_daemon.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2011 Gentoo Foundation
+# Copyright 2010-2012 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 import tempfile
@@ -14,10 +14,12 @@ from portage.locks import hardlock_cleanup
 from portage.package.ebuild._ipc.ExitCommand import ExitCommand
 from portage.util import ensure_dirs
 from portage.util._async.ForkProcess import ForkProcess
+from portage.util._async.TaskScheduler import TaskScheduler
+from portage.util._eventloop.global_event_loop import global_event_loop
+from _emerge.PollScheduler import PollScheduler
 from _emerge.SpawnProcess import SpawnProcess
 from _emerge.EbuildBuildDir import EbuildBuildDir
 from _emerge.EbuildIpcDaemon import EbuildIpcDaemon
-from _emerge.TaskScheduler import TaskScheduler
 
 class SleepProcess(ForkProcess):
 	"""
@@ -33,6 +35,7 @@ class IpcDaemonTestCase(TestCase):
 	_SCHEDULE_TIMEOUT = 40000 # 40 seconds
 
 	def testIpcDaemon(self):
+		event_loop = global_event_loop()
 		tmpdir = tempfile.mkdtemp()
 		build_dir = None
 		try:
@@ -54,9 +57,8 @@ class IpcDaemonTestCase(TestCase):
 				env["__PORTAGE_TEST_HARDLINK_LOCKS"] = \
 					os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"]
 
-			task_scheduler = TaskScheduler(max_jobs=2)
 			build_dir = EbuildBuildDir(
-				scheduler=task_scheduler.sched_iface,
+				scheduler=PollScheduler(event_loop=event_loop).sched_iface,
 				settings=env)
 			build_dir.lock()
 			ensure_dirs(env['PORTAGE_BUILDDIR'])
@@ -71,26 +73,23 @@ class IpcDaemonTestCase(TestCase):
 				commands = {'exit' : exit_command}
 				daemon = EbuildIpcDaemon(commands=commands,
 					input_fifo=input_fifo,
-					output_fifo=output_fifo,
-					scheduler=task_scheduler.sched_iface)
+					output_fifo=output_fifo)
 				proc = SpawnProcess(
 					args=[BASH_BINARY, "-c",
 					'"$PORTAGE_BIN_PATH"/ebuild-ipc exit %d' % exitcode],
-					env=env, scheduler=task_scheduler.sched_iface)
+					env=env)
+				task_scheduler = TaskScheduler(iter([daemon, proc]),
+					max_jobs=2, event_loop=event_loop)
 
 				self.received_command = False
 				def exit_command_callback():
 					self.received_command = True
-					task_scheduler.clear()
-					task_scheduler.wait()
+					task_scheduler.cancel()
 
 				exit_command.reply_hook = exit_command_callback
 				start_time = time.time()
-				task_scheduler.add(daemon)
-				task_scheduler.add(proc)
-				task_scheduler.run(timeout=self._SCHEDULE_TIMEOUT)
-				task_scheduler.clear()
-				task_scheduler.wait()
+				self._run(event_loop, task_scheduler, self._SCHEDULE_TIMEOUT)
+
 				hardlock_cleanup(env['PORTAGE_BUILDDIR'],
 					remove_all_locks=True)
 
@@ -101,7 +100,7 @@ class IpcDaemonTestCase(TestCase):
 				self.assertEqual(daemon.isAlive(), False)
 				self.assertEqual(exit_command.exitcode, exitcode)
 
-			# Intentionally short timeout test for QueueScheduler.run()
+			# Intentionally short timeout test for EventLoop/AsyncScheduler.
 			# Use a ridiculously long sleep_time_s in case the user's
 			# system is heavily loaded (see bug #436334).
 			sleep_time_s = 600     #600.000 seconds
@@ -116,20 +115,18 @@ class IpcDaemonTestCase(TestCase):
 					scheduler=task_scheduler.sched_iface)
 				proc = SleepProcess(seconds=sleep_time_s,
 					scheduler=task_scheduler.sched_iface)
+				task_scheduler = TaskScheduler(iter([daemon, proc]),
+					max_jobs=2, event_loop=event_loop)
 
 				self.received_command = False
 				def exit_command_callback():
 					self.received_command = True
-					task_scheduler.clear()
-					task_scheduler.wait()
+					task_scheduler.cancel()
 
 				exit_command.reply_hook = exit_command_callback
 				start_time = time.time()
-				task_scheduler.add(daemon)
-				task_scheduler.add(proc)
-				task_scheduler.run(timeout=short_timeout_ms)
-				task_scheduler.clear()
-				task_scheduler.wait()
+				self._run(event_loop, task_scheduler, short_timeout_ms)
+
 				hardlock_cleanup(env['PORTAGE_BUILDDIR'],
 					remove_all_locks=True)
 
@@ -144,3 +141,20 @@ class IpcDaemonTestCase(TestCase):
 			if build_dir is not None:
 				build_dir.unlock()
 			shutil.rmtree(tmpdir)
+
+	def _timeout_callback(self):
+		self._timed_out = True
+
+	def _run(self, event_loop, task_scheduler, timeout):
+		self._timed_out = False
+		timeout_id = event_loop.timeout_add(timeout, self._timeout_callback)
+
+		try:
+			task_scheduler.start()
+			while not self._timed_out and task_scheduler.poll() is None:
+				event_loop.iteration()
+			if self._timed_out:
+				task_scheduler.cancel()
+			task_scheduler.wait()
+		finally:
+			event_loop.source_remove(timeout_id)

diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py
index d6667b4..3772d79 100644
--- a/pym/portage/tests/process/test_poll.py
+++ b/pym/portage/tests/process/test_poll.py
@@ -4,7 +4,7 @@
 from portage import os
 from portage.tests import TestCase
 from portage.util._pty import _create_pty_or_pipe
-from _emerge.TaskScheduler import TaskScheduler
+from portage.util._async.TaskScheduler import TaskScheduler
 from _emerge.PipeReader import PipeReader
 from _emerge.SpawnProcess import SpawnProcess
 
@@ -37,25 +37,24 @@ class PipeReaderTestCase(TestCase):
 		# in order to avoid issue 5380 with python3.
 		master_file = os.fdopen(master_fd, 'rb', 0)
 		slave_file = os.fdopen(slave_fd, 'wb', 0)
-		task_scheduler = TaskScheduler(max_jobs=2)
 		producer = SpawnProcess(
 			args=["bash", "-c", self._echo_cmd % test_string],
-			env=os.environ, fd_pipes={1:slave_fd},
-			scheduler=task_scheduler.sched_iface)
-		task_scheduler.add(producer)
-		slave_file.close()
+			env=os.environ, fd_pipes={1:slave_fd})
 
 		consumer = PipeReader(
 			input_files={"producer" : master_file},
-			scheduler=task_scheduler.sched_iface, _use_array=self._use_array)
+			_use_array=self._use_array)
 
-		task_scheduler.add(consumer)
+		task_scheduler = TaskScheduler(iter([producer, consumer]), max_jobs=2)
 
 		# This will ensure that both tasks have exited, which
 		# is necessary to avoid "ResourceWarning: unclosed file"
 		# warnings since Python 3.2 (and also ensures that we
 		# don't leave any zombie child processes).
-		task_scheduler.run()
+		task_scheduler.start()
+		slave_file.close()
+		task_scheduler.wait()
+
 		self.assertEqual(producer.returncode, os.EX_OK)
 		self.assertEqual(consumer.returncode, os.EX_OK)
 

diff --git a/pym/portage/util/_async/TaskScheduler.py b/pym/portage/util/_async/TaskScheduler.py
new file mode 100644
index 0000000..b0ec7af
--- /dev/null
+++ b/pym/portage/util/_async/TaskScheduler.py
@@ -0,0 +1,22 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from .AsyncScheduler import AsyncScheduler
+
+class TaskScheduler(AsyncScheduler):
+
+	__slots__ = ('_task_iter',)
+
+	"""
+	A simple way to handle scheduling of AbstractPollTask instances. Simply
+	pass a task iterator into the constructor and call start(). Use the
+	poll, wait, or addExitListener methods to be notified when all of the
+	tasks have completed.
+	"""
+
+	def __init__(self, task_iter, **kwargs):
+		AsyncScheduler.__init__(self, **kwargs)
+		self._task_iter = task_iter
+
+	def _next_task(self):
+		return next(self._task_iter)


^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2012-10-05 20:49 UTC | newest]

Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2012-10-05 20:49 [gentoo-commits] proj/portage:master commit in: pym/portage/tests/process/, pym/portage/util/_async/, pym/portage/tests/ebuild/, Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox