public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/ebuild/, ...
@ 2020-02-20  9:42 Zac Medico
  0 siblings, 0 replies; only message in thread
From: Zac Medico @ 2020-02-20  9:42 UTC (permalink / raw
  To: gentoo-commits

commit:     8f47d3fe1190d4476ae9eebfafcebdfb1794fc05
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Feb 18 07:43:12 2020 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Thu Feb 20 09:15:52 2020 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=8f47d3fe

AsyncScheduler: use async_start method

Convert AsyncScheduler to use the async_start method, since
eventually this method will need to be a coroutine in order to write
messages to the build log as discussed in bug 709746.

Also fix async_iter_completed to be compatible with callback
scheduling differences introduced by migration to the async_start
method.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/ebuild/test_doebuild_fd_pipes.py |  8 ++---
 .../tests/util/futures/test_iter_completed.py      |  2 ++
 lib/portage/util/_async/AsyncScheduler.py          | 20 ++++++++++--
 lib/portage/util/futures/iter_completed.py         | 38 +++++++++++++++++-----
 4 files changed, 53 insertions(+), 15 deletions(-)

diff --git a/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py b/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
index 05ea24c4b..50fc5fe1c 100644
--- a/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
+++ b/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py
@@ -109,18 +109,16 @@ class DoebuildFdPipesTestCase(TestCase):
 							output_fd: pw,
 						},
 						"prev_mtimes": {}})
+				producer.addStartListener(lambda producer: os.close(pw))
 
+				# PipeReader closes pr
 				consumer = PipeReader(
 					input_files={"producer" : pr})
 
 				task_scheduler = TaskScheduler(iter([producer, consumer]),
 					max_jobs=2)
 
-				try:
-					loop.run_until_complete(task_scheduler.async_start())
-				finally:
-					# PipeReader closes pr
-					os.close(pw)
+				loop.run_until_complete(task_scheduler.async_start())
 
 				task_scheduler.wait()
 				output = portage._unicode_decode(

diff --git a/lib/portage/tests/util/futures/test_iter_completed.py b/lib/portage/tests/util/futures/test_iter_completed.py
index aa24f5685..03ace915a 100644
--- a/lib/portage/tests/util/futures/test_iter_completed.py
+++ b/lib/portage/tests/util/futures/test_iter_completed.py
@@ -76,6 +76,8 @@ class IterCompletedTestCase(TestCase):
 
 		for future_done_set in async_iter_completed(future_generator(),
 			max_jobs=True, max_load=True, loop=loop):
+			while not input_futures:
+				loop.run_until_complete(asyncio.sleep(0, loop=loop))
 			future_done_set.cancel()
 			break
 

diff --git a/lib/portage/util/_async/AsyncScheduler.py b/lib/portage/util/_async/AsyncScheduler.py
index c6b523eaa..b9070061a 100644
--- a/lib/portage/util/_async/AsyncScheduler.py
+++ b/lib/portage/util/_async/AsyncScheduler.py
@@ -1,7 +1,11 @@
 # Copyright 2012-2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
+import functools
+
 from portage import os
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
 from _emerge.AsynchronousTask import AsynchronousTask
 from _emerge.PollScheduler import PollScheduler
 
@@ -62,8 +66,8 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
 			else:
 				self._running_tasks.add(task)
 				task.scheduler = self._sched_iface
-				task.addExitListener(self._task_exit)
-				task.start()
+				future = asyncio.ensure_future(self._task_coroutine(task), loop=self._sched_iface)
+				future.add_done_callback(functools.partial(self._task_coroutine_done, task))
 
 		if self._loadavg_check_id is not None:
 			self._loadavg_check_id.cancel()
@@ -73,6 +77,18 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
 		# Triggers cleanup and exit listeners if there's nothing left to do.
 		self.poll()
 
+	@coroutine
+	def _task_coroutine(self, task):
+		yield task.async_start()
+		yield task.async_wait()
+
+	def _task_coroutine_done(self, task, future):
+		try:
+			future.result()
+		except asyncio.CancelledError:
+			self.cancel()
+		self._task_exit(task)
+
 	def _task_exit(self, task):
 		self._running_tasks.discard(task)
 		if task.returncode != os.EX_OK:

diff --git a/lib/portage/util/futures/iter_completed.py b/lib/portage/util/futures/iter_completed.py
index 9554b4338..1fb30eb70 100644
--- a/lib/portage/util/futures/iter_completed.py
+++ b/lib/portage/util/futures/iter_completed.py
@@ -6,6 +6,7 @@ import functools
 from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
 from portage.util._async.TaskScheduler import TaskScheduler
 from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
 from portage.util.cpuinfo import get_cpu_count
 
 
@@ -90,21 +91,42 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
 		if future_done_set.cancelled() and not wait_result.done():
 			wait_result.cancel()
 
+	@coroutine
+	def fetch_wait_result(scheduler, first, loop=None):
+		if first:
+			yield scheduler.async_start()
+
+		# If the current coroutine awakens just after a call to
+		# done_callback but before scheduler has been notified of
+		# corresponding done future(s), then wait here until scheduler
+		# is notified (which will cause future_map to populate).
+		while not future_map and scheduler.poll() is None:
+			yield asyncio.sleep(0, loop=loop)
+
+		if not future_map:
+			if scheduler.poll() is not None:
+				coroutine_return((set(), set()))
+			else:
+				raise AssertionError('expected non-empty future_map')
+
+		wait_result = yield asyncio.wait(list(future_map.values()),
+			return_when=asyncio.FIRST_COMPLETED, loop=loop)
+
+		coroutine_return(wait_result)
+
+	first = True
 	try:
-		scheduler.start()
-
-		# scheduler should ensure that future_map is non-empty until
-		# task_generator is exhausted
-		while future_map:
-			wait_result = asyncio.ensure_future(
-				asyncio.wait(list(future_map.values()),
-				return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop)
+		while True:
+			wait_result = asyncio.ensure_future(fetch_wait_result(scheduler, first, loop=loop), loop=loop)
+			first = False
 			future_done_set = loop.create_future()
 			future_done_set.add_done_callback(
 				functools.partial(cancel_callback, wait_result))
 			wait_result.add_done_callback(
 				functools.partial(done_callback, future_done_set))
 			yield future_done_set
+			if not future_map and scheduler.poll() is not None:
+				break
 	finally:
 		# cleanup in case of interruption by SIGINT, etc
 		scheduler.cancel()


^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2020-02-20  9:42 UTC | newest]

Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2020-02-20  9:42 [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/ebuild/, Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox