From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from lists.gentoo.org (pigeon.gentoo.org [208.92.234.80]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by finch.gentoo.org (Postfix) with ESMTPS id 351E01382C5 for ; Tue, 17 Apr 2018 10:06:18 +0000 (UTC) Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id B0ABBE0954; Tue, 17 Apr 2018 10:06:16 +0000 (UTC) Received: from smtp.gentoo.org (mail.gentoo.org [IPv6:2001:470:ea4a:1:5054:ff:fec7:86e4]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by pigeon.gentoo.org (Postfix) with ESMTPS id 6A53DE0953 for ; Tue, 17 Apr 2018 10:06:16 +0000 (UTC) Received: from localhost.localdomain (unknown [100.42.98.196]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) (Authenticated sender: zmedico) by smtp.gentoo.org (Postfix) with ESMTPSA id 9E2E5335C2C; Tue, 17 Apr 2018 10:06:14 +0000 (UTC) From: Zac Medico To: gentoo-portage-dev@lists.gentoo.org Cc: Zac Medico Subject: [gentoo-portage-dev] [PATCH] Add async_iter_completed for asyncio migration (bug 591760) Date: Tue, 17 Apr 2018 03:05:07 -0700 Message-Id: <20180417100507.1248-1-zmedico@gentoo.org> X-Mailer: git-send-email 2.13.6 Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-portage-dev@lists.gentoo.org Reply-to: gentoo-portage-dev@lists.gentoo.org X-Archives-Salt: 65226d4a-65de-470c-b059-8cf3cd836618 X-Archives-Hash: 89eaf3d2cf654ad6b6a3300a3d97ffca This serves as a wrapper around portage's internal TaskScheduler class, allowing TaskScheduler API consumers to be migrated to use asyncio interfaces. Bug: https://bugs.gentoo.org/591760 --- .../tests/util/futures/test_iter_completed.py | 37 ++++++++++++- pym/portage/util/futures/iter_completed.py | 61 +++++++++++++++++++--- 2 files changed, 91 insertions(+), 7 deletions(-) diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py index 9c23aefb1..1344523c6 100644 --- a/pym/portage/tests/util/futures/test_iter_completed.py +++ b/pym/portage/tests/util/futures/test_iter_completed.py @@ -5,7 +5,11 @@ import time from portage.tests import TestCase from portage.util._async.ForkProcess import ForkProcess from portage.util._eventloop.global_event_loop import global_event_loop -from portage.util.futures.iter_completed import iter_completed +from portage.util.futures import asyncio +from portage.util.futures.iter_completed import ( + iter_completed, + async_iter_completed, +) class SleepProcess(ForkProcess): @@ -48,3 +52,34 @@ class IterCompletedTestCase(TestCase): for seconds, future in zip(expected_order, iter_completed(future_generator(), max_jobs=True, max_load=None, loop=loop)): self.assertEqual(seconds, future.result()) + + def testAsyncCancel(self): + + loop = global_event_loop()._asyncio_wrapper + input_futures = set() + future_count = 3 + + def future_generator(): + for i in range(future_count): + future = loop.create_future() + loop.call_soon(lambda future: None if future.done() + else future.set_result(None), future) + input_futures.add(future) + yield future + + for future_done_set in async_iter_completed(future_generator(), + max_jobs=True, max_load=None, loop=loop): + future_done_set.cancel() + break + + # With max_jobs=True, async_iter_completed should have executed + # the generator until it raised StopIteration. + self.assertEqual(future_count, len(input_futures)) + + loop.run_until_complete(asyncio.wait(input_futures, loop=loop)) + + # The futures may have results or they may have been cancelled + # by TaskScheduler, and behavior varies depending on the python + # interpreter. + for future in input_futures: + future.cancelled() or future.result() diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py index 8d324de84..5ad075305 100644 --- a/pym/portage/util/futures/iter_completed.py +++ b/pym/portage/util/futures/iter_completed.py @@ -1,6 +1,7 @@ # Copyright 2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +import functools import multiprocessing from portage.util._async.AsyncTaskFuture import AsyncTaskFuture @@ -31,6 +32,38 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None): """ loop = loop or global_event_loop() loop = getattr(loop, '_asyncio_wrapper', loop) + + for future_done_set in async_iter_completed(futures, + max_jobs=max_jobs, max_load=max_load, loop=loop): + for future in loop.run_until_complete(future_done_set): + yield future + + +def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): + """ + An asynchronous version of iter_completed. This yields futures, which + when done, result in a set of input futures that are done. This serves + as a wrapper around portage's internal TaskScheduler class, using + standard asyncio interfaces. + + @param futures: iterator of asyncio.Future (or compatible) + @type futures: iterator + @param max_jobs: max number of futures to process concurrently (default + is multiprocessing.cpu_count()) + @type max_jobs: int + @param max_load: max load allowed when scheduling a new future, + otherwise schedule no more than 1 future at a time (default + is multiprocessing.cpu_count()) + @type max_load: int or float + @param loop: event loop + @type loop: EventLoop + @return: iterator of futures, which when done, result in a set of + input futures that are done + @rtype: iterator + """ + loop = loop or global_event_loop() + loop = getattr(loop, '_asyncio_wrapper', loop) + max_jobs = max_jobs or multiprocessing.cpu_count() max_load = max_load or multiprocessing.cpu_count() @@ -46,19 +79,35 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None): max_load=max_load, event_loop=loop._loop) + def done_callback(future_done_set, wait_result): + """Propagate results from wait_result to future_done_set.""" + if future_done_set.cancelled(): + return + done, pending = wait_result.result() + for future in done: + del future_map[id(future)] + future_done_set.set_result(done) + + def cancel_callback(wait_result, future_done_set): + """Cancel wait_result if future_done_set has been cancelled.""" + if future_done_set.cancelled() and not wait_result.done(): + wait_result.cancel() + try: scheduler.start() # scheduler should ensure that future_map is non-empty until # task_generator is exhausted while future_map: - done, pending = loop.run_until_complete( + wait_result = asyncio.ensure_future( asyncio.wait(list(future_map.values()), - return_when=asyncio.FIRST_COMPLETED, loop=loop)) - for future in done: - del future_map[id(future)] - yield future - + return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop) + future_done_set = loop.create_future() + future_done_set.add_done_callback( + functools.partial(cancel_callback, wait_result)) + wait_result.add_done_callback( + functools.partial(done_callback, future_done_set)) + yield future_done_set finally: # cleanup in case of interruption by SIGINT, etc scheduler.cancel() -- 2.13.6