public inbox for gentoo-portage-dev@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-portage-dev] [PATCH] Add async_iter_completed for asyncio migration (bug 591760)
@ 2018-04-17 10:05 Zac Medico
  0 siblings, 0 replies; only message in thread
From: Zac Medico @ 2018-04-17 10:05 UTC (permalink / raw
  To: gentoo-portage-dev; +Cc: Zac Medico

This serves as a wrapper around portage's internal TaskScheduler
class, allowing TaskScheduler API consumers to be migrated to use
asyncio interfaces.

Bug: https://bugs.gentoo.org/591760
---
 .../tests/util/futures/test_iter_completed.py      | 37 ++++++++++++-
 pym/portage/util/futures/iter_completed.py         | 61 +++++++++++++++++++---
 2 files changed, 91 insertions(+), 7 deletions(-)

diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py
index 9c23aefb1..1344523c6 100644
--- a/pym/portage/tests/util/futures/test_iter_completed.py
+++ b/pym/portage/tests/util/futures/test_iter_completed.py
@@ -5,7 +5,11 @@ import time
 from portage.tests import TestCase
 from portage.util._async.ForkProcess import ForkProcess
 from portage.util._eventloop.global_event_loop import global_event_loop
-from portage.util.futures.iter_completed import iter_completed
+from portage.util.futures import asyncio
+from portage.util.futures.iter_completed import (
+	iter_completed,
+	async_iter_completed,
+)
 
 
 class SleepProcess(ForkProcess):
@@ -48,3 +52,34 @@ class IterCompletedTestCase(TestCase):
 		for seconds, future in zip(expected_order, iter_completed(future_generator(),
 			max_jobs=True, max_load=None, loop=loop)):
 			self.assertEqual(seconds, future.result())
+
+	def testAsyncCancel(self):
+
+		loop = global_event_loop()._asyncio_wrapper
+		input_futures = set()
+		future_count = 3
+
+		def future_generator():
+			for i in range(future_count):
+				future = loop.create_future()
+				loop.call_soon(lambda future: None if future.done()
+					else future.set_result(None), future)
+				input_futures.add(future)
+				yield future
+
+		for future_done_set in async_iter_completed(future_generator(),
+			max_jobs=True, max_load=None, loop=loop):
+			future_done_set.cancel()
+			break
+
+		# With max_jobs=True, async_iter_completed should have executed
+		# the generator until it raised StopIteration.
+		self.assertEqual(future_count, len(input_futures))
+
+		loop.run_until_complete(asyncio.wait(input_futures, loop=loop))
+
+		# The futures may have results or they may have been cancelled
+		# by TaskScheduler, and behavior varies depending on the python
+		# interpreter.
+		for future in input_futures:
+			future.cancelled() or future.result()
diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py
index 8d324de84..5ad075305 100644
--- a/pym/portage/util/futures/iter_completed.py
+++ b/pym/portage/util/futures/iter_completed.py
@@ -1,6 +1,7 @@
 # Copyright 2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
+import functools
 import multiprocessing
 
 from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
@@ -31,6 +32,38 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
 	"""
 	loop = loop or global_event_loop()
 	loop = getattr(loop, '_asyncio_wrapper', loop)
+
+	for future_done_set in async_iter_completed(futures,
+		max_jobs=max_jobs, max_load=max_load, loop=loop):
+		for future in loop.run_until_complete(future_done_set):
+			yield future
+
+
+def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
+	"""
+	An asynchronous version of iter_completed. This yields futures, which
+	when done, result in a set of input futures that are done. This serves
+	as a wrapper around portage's internal TaskScheduler class, using
+	standard asyncio interfaces.
+
+	@param futures: iterator of asyncio.Future (or compatible)
+	@type futures: iterator
+	@param max_jobs: max number of futures to process concurrently (default
+		is multiprocessing.cpu_count())
+	@type max_jobs: int
+	@param max_load: max load allowed when scheduling a new future,
+		otherwise schedule no more than 1 future at a time (default
+		is multiprocessing.cpu_count())
+	@type max_load: int or float
+	@param loop: event loop
+	@type loop: EventLoop
+	@return: iterator of futures, which when done, result in a set of
+		input futures that are done
+	@rtype: iterator
+	"""
+	loop = loop or global_event_loop()
+	loop = getattr(loop, '_asyncio_wrapper', loop)
+
 	max_jobs = max_jobs or multiprocessing.cpu_count()
 	max_load = max_load or multiprocessing.cpu_count()
 
@@ -46,19 +79,35 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
 		max_load=max_load,
 		event_loop=loop._loop)
 
+	def done_callback(future_done_set, wait_result):
+		"""Propagate results from wait_result to future_done_set."""
+		if future_done_set.cancelled():
+			return
+		done, pending = wait_result.result()
+		for future in done:
+			del future_map[id(future)]
+		future_done_set.set_result(done)
+
+	def cancel_callback(wait_result, future_done_set):
+		"""Cancel wait_result if future_done_set has been cancelled."""
+		if future_done_set.cancelled() and not wait_result.done():
+			wait_result.cancel()
+
 	try:
 		scheduler.start()
 
 		# scheduler should ensure that future_map is non-empty until
 		# task_generator is exhausted
 		while future_map:
-			done, pending = loop.run_until_complete(
+			wait_result = asyncio.ensure_future(
 				asyncio.wait(list(future_map.values()),
-				return_when=asyncio.FIRST_COMPLETED, loop=loop))
-			for future in done:
-				del future_map[id(future)]
-				yield future
-
+				return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop)
+			future_done_set = loop.create_future()
+			future_done_set.add_done_callback(
+				functools.partial(cancel_callback, wait_result))
+			wait_result.add_done_callback(
+				functools.partial(done_callback, future_done_set))
+			yield future_done_set
 	finally:
 		# cleanup in case of interruption by SIGINT, etc
 		scheduler.cancel()
-- 
2.13.6



^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2018-04-17 10:06 UTC | newest]

Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2018-04-17 10:05 [gentoo-portage-dev] [PATCH] Add async_iter_completed for asyncio migration (bug 591760) Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox