From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from lists.gentoo.org (pigeon.gentoo.org [208.92.234.80]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by finch.gentoo.org (Postfix) with ESMTPS id C19011382C5 for ; Tue, 24 Apr 2018 08:09:56 +0000 (UTC) Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id C20ECE0924; Tue, 24 Apr 2018 08:09:55 +0000 (UTC) Received: from smtp.gentoo.org (smtp.gentoo.org [140.211.166.183]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by pigeon.gentoo.org (Postfix) with ESMTPS id 854A7E0923 for ; Tue, 24 Apr 2018 08:09:55 +0000 (UTC) Received: from localhost.localdomain (unknown [100.42.98.196]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) (Authenticated sender: zmedico) by smtp.gentoo.org (Postfix) with ESMTPSA id 6E3C8335C43; Tue, 24 Apr 2018 08:09:54 +0000 (UTC) From: Zac Medico To: gentoo-portage-dev@lists.gentoo.org Cc: Zac Medico Subject: [gentoo-portage-dev] [PATCH] ManifestScheduler: async fetchlist_dict (bug 653946) Date: Tue, 24 Apr 2018 01:09:28 -0700 Message-Id: <20180424080928.12067-1-zmedico@gentoo.org> X-Mailer: git-send-email 2.13.6 Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-portage-dev@lists.gentoo.org Reply-to: gentoo-portage-dev@lists.gentoo.org X-Archives-Salt: 866df14a-dcac-47ad-b101-39337d6d2286 X-Archives-Hash: d1725e1b736a504ff70dc34030546352 In order to avoid event loop recursion, pass fetchlist_dict to ManifestTask as a Future. Bug: https://bugs.gentoo.org/653946 --- .../ebuild/_parallel_manifest/ManifestScheduler.py | 70 +++++++++++++++++----- .../ebuild/_parallel_manifest/ManifestTask.py | 22 +++++++ pym/portage/tests/dbapi/test_portdb_cache.py | 1 + 3 files changed, 79 insertions(+), 14 deletions(-) diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py index 38ac4825e..42854b05b 100644 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py @@ -4,9 +4,9 @@ import portage from portage import os from portage.dep import _repo_separator -from portage.exception import InvalidDependString from portage.localization import _ from portage.util._async.AsyncScheduler import AsyncScheduler +from portage.util.futures import asyncio from .ManifestTask import ManifestTask class ManifestScheduler(AsyncScheduler): @@ -63,21 +63,11 @@ class ManifestScheduler(AsyncScheduler): cpv_list = portdb.cp_list(cp, mytree=[repo_config.location]) if not cpv_list: continue - fetchlist_dict = {} - try: - for cpv in cpv_list: - fetchlist_dict[cpv] = \ - list(portdb.getFetchMap(cpv, mytree=mytree)) - except InvalidDependString as e: - portage.writemsg( - _("!!! %s%s%s: SRC_URI: %s\n") % - (cp, _repo_separator, repo_config.name, e), - noiselevel=-1) - self._error_count += 1 - continue yield ManifestTask(cp=cp, distdir=distdir, - fetchlist_dict=fetchlist_dict, repo_config=repo_config, + fetchlist_dict=_future_fetchlist( + self._event_loop, portdb, repo_config, cp, cpv_list), + repo_config=repo_config, gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars, force_sign_key=self._force_sign_key) @@ -91,3 +81,55 @@ class ManifestScheduler(AsyncScheduler): noiselevel=-1) AsyncScheduler._task_exit(self, task) + + +def _future_fetchlist(loop, portdb, repo_config, cp, cpv_list): + """ + Asynchronous form of FetchlistDict. + + @param loop: event loop + @type loop: EventLoop + @param portdb: portdbapi instance + @type portdb: portdbapi + @param repo_config: repository configuration for a Manifest + @type repo_config: RepoConfig + @param cp: cp for a Manifest + @type cp: str + @param cpv_list: list of ebuild cpv values for a Manifest + @type cpv_list: list + @return: a Future resulting in a Mapping compatible with FetchlistDict + @rtype: asyncio.Future (or compatible) + """ + loop = getattr(loop, '_asyncio_wrapper', loop) + result = loop.create_future() + futures = {} + for cpv in cpv_list: + futures[cpv] = portdb.async_fetch_map( + cpv, mytree=repo_config.location, loop=loop) + + def futures_done(wait_result): + if result.cancelled(): + return + e = None + for future in futures.values(): + if (future.done() and future.exception() is not None): + # Retrieve exceptions from all futures in order to + # avoid triggering the event loop's error handler. + e = future.exception() + + if e is None: + result.set_result(dict((k, list(v.result())) + for k, v in futures.items())) + else: + result.set_exception(e) + + wait_result = asyncio.ensure_future( + asyncio.wait(list(futures.values()), + loop=loop), + loop=loop) + + wait_result.add_done_callback(futures_done) + result.add_done_callback(lambda result: + wait_result.cancel() if result.cancelled() else None) + + return result diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py index 0ee2b910d..6f5fe5b16 100644 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py @@ -8,8 +8,12 @@ import subprocess from portage import os from portage import _unicode_encode, _encodings from portage.const import MANIFEST2_IDENTIFIERS +from portage.dep import _repo_separator +from portage.exception import InvalidDependString +from portage.localization import _ from portage.util import (atomic_ofstream, grablines, shlex_split, varexpand, writemsg) +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture from portage.util._async.PipeLogger import PipeLogger from portage.util._async.PopenProcess import PopenProcess from _emerge.CompositeTask import CompositeTask @@ -29,6 +33,24 @@ class ManifestTask(CompositeTask): def _start(self): self._manifest_path = os.path.join(self.repo_config.location, self.cp, "Manifest") + + self._start_task( + AsyncTaskFuture(future=self.fetchlist_dict), + self._start_with_fetchlist) + + def _start_with_fetchlist(self, fetchlist_task): + if self._default_exit(fetchlist_task) != os.EX_OK: + if not self.fetchlist_dict.cancelled(): + try: + self.fetchlist_dict.result() + except InvalidDependString as e: + writemsg( + _("!!! %s%s%s: SRC_URI: %s\n") % + (self.cp, _repo_separator, self.repo_config.name, e), + noiselevel=-1) + self._async_wait() + return + self.fetchlist_dict = self.fetchlist_dict.result() manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir, fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config, scheduler=self.scheduler) diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py b/pym/portage/tests/dbapi/test_portdb_cache.py index bd934460a..1f139b256 100644 --- a/pym/portage/tests/dbapi/test_portdb_cache.py +++ b/pym/portage/tests/dbapi/test_portdb_cache.py @@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase): portage_python = portage._python_interpreter egencache_cmd = (portage_python, "-b", "-Wd", os.path.join(self.bindir, "egencache"), + "--update-manifests", "--sign-manifests=n", "--repo", "test_repo", "--repositories-configuration", settings.repositories.config_string()) python_cmd = (portage_python, "-b", "-Wd", "-c") -- 2.13.6