* [gentoo-portage-dev] [PATCH] FetchIterator: fix event loop recursion (bug 654038)
@ 2018-04-25 9:20 99% Zac Medico
0 siblings, 0 replies; 1+ results
From: Zac Medico @ 2018-04-25 9:20 UTC (permalink / raw
To: gentoo-portage-dev; +Cc: Zac Medico
Since construction of FetchTask instances requires results from
aux_get calls that would trigger event loop recursion when executed
synchronously, add a _fetch_tasks_future function to construct
FetchTask instances asynchronously and return a Future. Use an
_EbuildFetchTasks class to wait for the FetchTask instances to
become available, and then execute them.
Bug: https://bugs.gentoo.org/654038
---
pym/portage/_emirrordist/FetchIterator.py | 311 ++++++++++++++++++++----------
1 file changed, 206 insertions(+), 105 deletions(-)
diff --git a/pym/portage/_emirrordist/FetchIterator.py b/pym/portage/_emirrordist/FetchIterator.py
index 38419799d..bd3b98cd0 100644
--- a/pym/portage/_emirrordist/FetchIterator.py
+++ b/pym/portage/_emirrordist/FetchIterator.py
@@ -7,14 +7,18 @@ from portage import os
from portage.checksum import (_apply_hash_filter,
_filter_unaccelarated_hashes, _hash_filter)
from portage.dep import use_reduce
-from portage.exception import PortageException
+from portage.exception import PortageException, PortageKeyError
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
+from portage.util._async.TaskScheduler import TaskScheduler
+from portage.util.futures.iter_completed import iter_gather
from .FetchTask import FetchTask
+from _emerge.CompositeTask import CompositeTask
+
class FetchIterator(object):
def __init__(self, config):
self._config = config
- self._log_failure = config.log_failure
self._terminated = threading.Event()
def terminate(self):
@@ -41,9 +45,6 @@ class FetchIterator(object):
portdb = self._config.portdb
get_repo_for_location = portdb.repositories.get_repo_for_location
- file_owners = self._config.file_owners
- file_failures = self._config.file_failures
- restrict_mirror_exemptions = self._config.restrict_mirror_exemptions
hash_filter = _hash_filter(
portdb.settings.get("PORTAGE_CHECKSUM_FILTER", ""))
@@ -59,110 +60,210 @@ class FetchIterator(object):
# Reset state so the Manifest is pulled once
# for this cp / tree combination.
- digests = None
repo_config = get_repo_for_location(tree)
+ digests_future = portdb._event_loop.create_future()
for cpv in portdb.cp_list(cp, mytree=tree):
if self._terminated.is_set():
return
- try:
- restrict, = portdb.aux_get(cpv, ("RESTRICT",),
- mytree=tree)
- except (KeyError, PortageException) as e:
- self._log_failure("%s\t\taux_get exception %s" %
- (cpv, e))
- continue
-
- # Here we use matchnone=True to ignore conditional parts
- # of RESTRICT since they don't apply unconditionally.
- # Assume such conditionals only apply on the client side.
- try:
- restrict = frozenset(use_reduce(restrict,
- flat=True, matchnone=True))
- except PortageException as e:
- self._log_failure("%s\t\tuse_reduce exception %s" %
- (cpv, e))
- continue
-
- if "fetch" in restrict:
- continue
-
- try:
- uri_map = portdb.getFetchMap(cpv)
- except PortageException as e:
- self._log_failure("%s\t\tgetFetchMap exception %s" %
- (cpv, e))
- continue
-
- if not uri_map:
- continue
-
- if "mirror" in restrict:
- skip = False
- if restrict_mirror_exemptions is not None:
- new_uri_map = {}
- for filename, uri_tuple in uri_map.items():
- for uri in uri_tuple:
- if uri[:9] == "mirror://":
- i = uri.find("/", 9)
- if i != -1 and uri[9:i].strip("/") in \
- restrict_mirror_exemptions:
- new_uri_map[filename] = uri_tuple
- break
- if new_uri_map:
- uri_map = new_uri_map
- else:
- skip = True
- else:
- skip = True
-
- if skip:
- continue
-
- # Parse Manifest for this cp if we haven't yet.
- if digests is None:
- try:
- digests = repo_config.load_manifest(
- os.path.join(repo_config.location, cp)
- ).getTypeDigests("DIST")
- except (EnvironmentError, PortageException) as e:
- for filename in uri_map:
- self._log_failure(
- "%s\t%s\tManifest exception %s" %
- (cpv, filename, e))
- file_failures[filename] = cpv
- continue
-
- if not digests:
- for filename in uri_map:
- self._log_failure("%s\t%s\tdigest entry missing" %
- (cpv, filename))
- file_failures[filename] = cpv
- continue
-
- for filename, uri_tuple in uri_map.items():
- file_digests = digests.get(filename)
- if file_digests is None:
- self._log_failure("%s\t%s\tdigest entry missing" %
- (cpv, filename))
- file_failures[filename] = cpv
- continue
- if filename in file_owners:
- continue
- file_owners[filename] = cpv
-
- file_digests = \
- _filter_unaccelarated_hashes(file_digests)
- if hash_filter is not None:
- file_digests = _apply_hash_filter(
- file_digests, hash_filter)
-
- yield FetchTask(cpv=cpv,
- background=True,
- digests=file_digests,
- distfile=filename,
- restrict=restrict,
- uri_tuple=uri_tuple,
- config=self._config)
+ yield _EbuildFetchTasks(
+ fetch_tasks_future=_fetch_tasks_future(
+ self._config,
+ hash_filter,
+ repo_config,
+ digests_future,
+ cpv)
+ )
+
+
+class _EbuildFetchTasks(CompositeTask):
+ """
+ This executes asynchronously constructed FetchTask instances for
+ each of the files referenced by an ebuild.
+ """
+ __slots__ = ('fetch_tasks_future',)
+ def _start(self):
+ self._start_task(AsyncTaskFuture(future=self.fetch_tasks_future),
+ self._start_fetch_tasks)
+
+ def _start_fetch_tasks(self, task):
+ if self._default_exit(task) != os.EX_OK:
+ self._async_wait()
+ return
+
+ self._start_task(
+ TaskScheduler(
+ iter(self.fetch_tasks_future.result()),
+ max_jobs=1,
+ event_loop=self.scheduler),
+ self._default_final_exit)
+
+
+def _fetch_tasks_future(config, hash_filter, repo_config, digests_future, cpv):
+ """
+ Asynchronously construct FetchTask instances for each of the files
+ referenced by an ebuild.
+
+ @param config: emirrordist config
+ @type config: portage._emirrordist.Config.Config
+ @param hash_filter: PORTAGE_CHECKSUM_FILTER settings
+ @type hash_filter: portage.checksum._hash_filter
+ @param repo_config: repository configuration
+ @type repo_config: RepoConfig
+ @param digests_future: future that contains cached distfiles digests
+ for the current cp if available
+ @type digests_future: asyncio.Future
+ @param cpv: current ebuild cpv
+ @type cpv: portage.versions._pkg_str
+
+ @return: A future that results in a list containing FetchTask
+ instances for each of the files referenced by an ebuild.
+ @rtype: asyncio.Future (or compatible)
+ """
+
+ loop = config.portdb._event_loop
+ result = loop.create_future()
+ fetch_tasks = []
+
+ def aux_get_done(gather_result):
+ if result.cancelled():
+ return
+
+ aux_get_result, fetch_map_result = gather_result.result()
+ try:
+ restrict, = aux_get_result.result()
+ except (PortageKeyError, PortageException) as e:
+ config.log_failure("%s\t\taux_get exception %s" %
+ (cpv, e))
+ result.set_result(fetch_tasks)
+ return
+
+ # Here we use matchnone=True to ignore conditional parts
+ # of RESTRICT since they don't apply unconditionally.
+ # Assume such conditionals only apply on the client side.
+ try:
+ restrict = frozenset(use_reduce(restrict,
+ flat=True, matchnone=True))
+ except PortageException as e:
+ config.log_failure("%s\t\tuse_reduce exception %s" %
+ (cpv, e))
+ result.set_result(fetch_tasks)
+ return
+
+ if "fetch" in restrict:
+ result.set_result(fetch_tasks)
+ return
+
+ try:
+ uri_map = fetch_map_result.result()
+ except PortageException as e:
+ config.log_failure("%s\t\tgetFetchMap exception %s" %
+ (cpv, e))
+ result.set_result(fetch_tasks)
+ return
+
+ if not uri_map:
+ result.set_result(fetch_tasks)
+ return
+
+ if "mirror" in restrict:
+ skip = False
+ if config.restrict_mirror_exemptions is not None:
+ new_uri_map = {}
+ for filename, uri_tuple in uri_map.items():
+ for uri in uri_tuple:
+ if uri[:9] == "mirror://":
+ i = uri.find("/", 9)
+ if i != -1 and uri[9:i].strip("/") in \
+ config.restrict_mirror_exemptions:
+ new_uri_map[filename] = uri_tuple
+ break
+ if new_uri_map:
+ uri_map = new_uri_map
+ else:
+ skip = True
+ else:
+ skip = True
+
+ if skip:
+ result.set_result(fetch_tasks)
+ return
+
+ # Parse Manifest for this cp if we haven't yet.
+ if not digests_future.done():
+ try:
+ digests = repo_config.load_manifest(
+ os.path.join(repo_config.location, cpv.cp)).\
+ getTypeDigests("DIST")
+ except (EnvironmentError, PortageException) as e:
+ digests_future.set_exception(e)
+ for filename in uri_map:
+ config.log_failure(
+ "%s\t%s\tManifest exception %s" %
+ (cpv, filename, e))
+ config.file_failures[filename] = cpv
+ result.set_result(fetch_tasks)
+ return
+ else:
+ digests_future.set_result(digests)
+
+ digests = digests_future.result()
+ if not digests:
+ for filename in uri_map:
+ config.log_failure("%s\t%s\tdigest entry missing" %
+ (cpv, filename))
+ config.file_failures[filename] = cpv
+ result.set_result(fetch_tasks)
+ return
+
+ for filename, uri_tuple in uri_map.items():
+ file_digests = digests.get(filename)
+ if file_digests is None:
+ config.log_failure("%s\t%s\tdigest entry missing" %
+ (cpv, filename))
+ config.file_failures[filename] = cpv
+ continue
+ if filename in config.file_owners:
+ continue
+ config.file_owners[filename] = cpv
+
+ file_digests = \
+ _filter_unaccelarated_hashes(file_digests)
+ if hash_filter is not None:
+ file_digests = _apply_hash_filter(
+ file_digests, hash_filter)
+
+ fetch_tasks.append(FetchTask(
+ cpv=cpv,
+ background=True,
+ digests=file_digests,
+ distfile=filename,
+ restrict=restrict,
+ uri_tuple=uri_tuple,
+ config=config))
+
+ result.set_result(fetch_tasks)
+
+ def future_generator():
+ yield config.portdb.async_aux_get(cpv, ("RESTRICT",),
+ myrepo=repo_config.name, loop=loop)
+ yield config.portdb.async_fetch_map(cpv,
+ mytree=repo_config.location, loop=loop)
+
+ # Use iter_gather(max_jobs=1) to limit the number of processes per
+ # _EbuildFetchTask instance, and also to avoid spawning two bash
+ # processes for the same cpv simultaneously (the second one can
+ # use metadata cached by the first one).
+ gather_result = iter_gather(
+ future_generator(),
+ max_jobs=1,
+ loop=loop,
+ )
+ gather_result.add_done_callback(aux_get_done)
+ result.add_done_callback(lambda result:
+ gather_result.cancel() if result.cancelled() and
+ not gather_result.done() else None)
+
+ return result
--
2.13.6
^ permalink raw reply related [relevance 99%]
Results 1-1 of 1 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2018-04-25 9:20 99% [gentoo-portage-dev] [PATCH] FetchIterator: fix event loop recursion (bug 654038) Zac Medico
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox