From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from lists.gentoo.org (pigeon.gentoo.org [208.92.234.80]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by finch.gentoo.org (Postfix) with ESMTPS id 52689138350 for ; Thu, 20 Feb 2020 09:42:16 +0000 (UTC) Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id 93B18E0921; Thu, 20 Feb 2020 09:42:15 +0000 (UTC) Received: from smtp.gentoo.org (smtp.gentoo.org [140.211.166.183]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by pigeon.gentoo.org (Postfix) with ESMTPS id 65A5CE0921 for ; Thu, 20 Feb 2020 09:42:15 +0000 (UTC) Received: from oystercatcher.gentoo.org (unknown [IPv6:2a01:4f8:202:4333:225:90ff:fed9:fc84]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by smtp.gentoo.org (Postfix) with ESMTPS id F3D6C34EE47 for ; Thu, 20 Feb 2020 09:42:13 +0000 (UTC) Received: from localhost.localdomain (localhost [IPv6:::1]) by oystercatcher.gentoo.org (Postfix) with ESMTP id 7CC3A109 for ; Thu, 20 Feb 2020 09:42:11 +0000 (UTC) From: "Zac Medico" To: gentoo-commits@lists.gentoo.org Content-Transfer-Encoding: 8bit Content-type: text/plain; charset=UTF-8 Reply-To: gentoo-dev@lists.gentoo.org, "Zac Medico" Message-ID: <1582190152.8f47d3fe1190d4476ae9eebfafcebdfb1794fc05.zmedico@gentoo> Subject: [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/ebuild/, ... X-VCS-Repository: proj/portage X-VCS-Files: lib/portage/tests/ebuild/test_doebuild_fd_pipes.py lib/portage/tests/util/futures/test_iter_completed.py lib/portage/util/_async/AsyncScheduler.py lib/portage/util/futures/iter_completed.py X-VCS-Directories: lib/portage/util/futures/ lib/portage/util/_async/ lib/portage/tests/ebuild/ lib/portage/tests/util/futures/ X-VCS-Committer: zmedico X-VCS-Committer-Name: Zac Medico X-VCS-Revision: 8f47d3fe1190d4476ae9eebfafcebdfb1794fc05 X-VCS-Branch: master Date: Thu, 20 Feb 2020 09:42:11 +0000 (UTC) Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-commits@lists.gentoo.org X-Auto-Response-Suppress: DR, RN, NRN, OOF, AutoReply X-Archives-Salt: d5c14327-1a94-4c02-8fd9-7031a506cf4f X-Archives-Hash: 677d8932bd2ff168ba5e2dc3f536c601 commit: 8f47d3fe1190d4476ae9eebfafcebdfb1794fc05 Author: Zac Medico gentoo org> AuthorDate: Tue Feb 18 07:43:12 2020 +0000 Commit: Zac Medico gentoo org> CommitDate: Thu Feb 20 09:15:52 2020 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=8f47d3fe AsyncScheduler: use async_start method Convert AsyncScheduler to use the async_start method, since eventually this method will need to be a coroutine in order to write messages to the build log as discussed in bug 709746. Also fix async_iter_completed to be compatible with callback scheduling differences introduced by migration to the async_start method. Bug: https://bugs.gentoo.org/709746 Signed-off-by: Zac Medico gentoo.org> lib/portage/tests/ebuild/test_doebuild_fd_pipes.py | 8 ++--- .../tests/util/futures/test_iter_completed.py | 2 ++ lib/portage/util/_async/AsyncScheduler.py | 20 ++++++++++-- lib/portage/util/futures/iter_completed.py | 38 +++++++++++++++++----- 4 files changed, 53 insertions(+), 15 deletions(-) diff --git a/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py b/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py index 05ea24c4b..50fc5fe1c 100644 --- a/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py +++ b/lib/portage/tests/ebuild/test_doebuild_fd_pipes.py @@ -109,18 +109,16 @@ class DoebuildFdPipesTestCase(TestCase): output_fd: pw, }, "prev_mtimes": {}}) + producer.addStartListener(lambda producer: os.close(pw)) + # PipeReader closes pr consumer = PipeReader( input_files={"producer" : pr}) task_scheduler = TaskScheduler(iter([producer, consumer]), max_jobs=2) - try: - loop.run_until_complete(task_scheduler.async_start()) - finally: - # PipeReader closes pr - os.close(pw) + loop.run_until_complete(task_scheduler.async_start()) task_scheduler.wait() output = portage._unicode_decode( diff --git a/lib/portage/tests/util/futures/test_iter_completed.py b/lib/portage/tests/util/futures/test_iter_completed.py index aa24f5685..03ace915a 100644 --- a/lib/portage/tests/util/futures/test_iter_completed.py +++ b/lib/portage/tests/util/futures/test_iter_completed.py @@ -76,6 +76,8 @@ class IterCompletedTestCase(TestCase): for future_done_set in async_iter_completed(future_generator(), max_jobs=True, max_load=True, loop=loop): + while not input_futures: + loop.run_until_complete(asyncio.sleep(0, loop=loop)) future_done_set.cancel() break diff --git a/lib/portage/util/_async/AsyncScheduler.py b/lib/portage/util/_async/AsyncScheduler.py index c6b523eaa..b9070061a 100644 --- a/lib/portage/util/_async/AsyncScheduler.py +++ b/lib/portage/util/_async/AsyncScheduler.py @@ -1,7 +1,11 @@ # Copyright 2012-2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +import functools + from portage import os +from portage.util.futures import asyncio +from portage.util.futures.compat_coroutine import coroutine from _emerge.AsynchronousTask import AsynchronousTask from _emerge.PollScheduler import PollScheduler @@ -62,8 +66,8 @@ class AsyncScheduler(AsynchronousTask, PollScheduler): else: self._running_tasks.add(task) task.scheduler = self._sched_iface - task.addExitListener(self._task_exit) - task.start() + future = asyncio.ensure_future(self._task_coroutine(task), loop=self._sched_iface) + future.add_done_callback(functools.partial(self._task_coroutine_done, task)) if self._loadavg_check_id is not None: self._loadavg_check_id.cancel() @@ -73,6 +77,18 @@ class AsyncScheduler(AsynchronousTask, PollScheduler): # Triggers cleanup and exit listeners if there's nothing left to do. self.poll() + @coroutine + def _task_coroutine(self, task): + yield task.async_start() + yield task.async_wait() + + def _task_coroutine_done(self, task, future): + try: + future.result() + except asyncio.CancelledError: + self.cancel() + self._task_exit(task) + def _task_exit(self, task): self._running_tasks.discard(task) if task.returncode != os.EX_OK: diff --git a/lib/portage/util/futures/iter_completed.py b/lib/portage/util/futures/iter_completed.py index 9554b4338..1fb30eb70 100644 --- a/lib/portage/util/futures/iter_completed.py +++ b/lib/portage/util/futures/iter_completed.py @@ -6,6 +6,7 @@ import functools from portage.util._async.AsyncTaskFuture import AsyncTaskFuture from portage.util._async.TaskScheduler import TaskScheduler from portage.util.futures import asyncio +from portage.util.futures.compat_coroutine import coroutine, coroutine_return from portage.util.cpuinfo import get_cpu_count @@ -90,21 +91,42 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): if future_done_set.cancelled() and not wait_result.done(): wait_result.cancel() + @coroutine + def fetch_wait_result(scheduler, first, loop=None): + if first: + yield scheduler.async_start() + + # If the current coroutine awakens just after a call to + # done_callback but before scheduler has been notified of + # corresponding done future(s), then wait here until scheduler + # is notified (which will cause future_map to populate). + while not future_map and scheduler.poll() is None: + yield asyncio.sleep(0, loop=loop) + + if not future_map: + if scheduler.poll() is not None: + coroutine_return((set(), set())) + else: + raise AssertionError('expected non-empty future_map') + + wait_result = yield asyncio.wait(list(future_map.values()), + return_when=asyncio.FIRST_COMPLETED, loop=loop) + + coroutine_return(wait_result) + + first = True try: - scheduler.start() - - # scheduler should ensure that future_map is non-empty until - # task_generator is exhausted - while future_map: - wait_result = asyncio.ensure_future( - asyncio.wait(list(future_map.values()), - return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop) + while True: + wait_result = asyncio.ensure_future(fetch_wait_result(scheduler, first, loop=loop), loop=loop) + first = False future_done_set = loop.create_future() future_done_set.add_done_callback( functools.partial(cancel_callback, wait_result)) wait_result.add_done_callback( functools.partial(done_callback, future_done_set)) yield future_done_set + if not future_map and scheduler.poll() is not None: + break finally: # cleanup in case of interruption by SIGINT, etc scheduler.cancel()