From: "Zac Medico" <zmedico@gentoo.org>
To: gentoo-commits@lists.gentoo.org
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/_emirrordist/
Date: Sat, 28 Apr 2018 14:01:58 +0000 (UTC) [thread overview]
Message-ID: <1524923802.7c652d1b967f9c4c6a7fbd9fc5e46a0e57438a16.zmedico@gentoo> (raw)
commit: 7c652d1b967f9c4c6a7fbd9fc5e46a0e57438a16
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Apr 25 06:42:10 2018 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sat Apr 28 13:56:42 2018 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=7c652d1b
FetchIterator: fix event loop recursion (bug 654038)
Since construction of FetchTask instances requires results from
aux_get calls that would trigger event loop recursion when executed
synchronously, add an _async_fetch_tasks function to construct
FetchTask instances asynchronously and return a Future. Use an
_EbuildFetchTasks class to wait for the FetchTask instances to
become available, and then execute them.
Bug: https://bugs.gentoo.org/654038
pym/portage/_emirrordist/FetchIterator.py | 324 ++++++++++++++++++++----------
1 file changed, 218 insertions(+), 106 deletions(-)
diff --git a/pym/portage/_emirrordist/FetchIterator.py b/pym/portage/_emirrordist/FetchIterator.py
index 38419799d..366453c12 100644
--- a/pym/portage/_emirrordist/FetchIterator.py
+++ b/pym/portage/_emirrordist/FetchIterator.py
@@ -1,4 +1,4 @@
-# Copyright 2013 Gentoo Foundation
+# Copyright 2013-2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import threading
@@ -7,14 +7,18 @@ from portage import os
from portage.checksum import (_apply_hash_filter,
_filter_unaccelarated_hashes, _hash_filter)
from portage.dep import use_reduce
-from portage.exception import PortageException
+from portage.exception import PortageException, PortageKeyError
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
+from portage.util._async.TaskScheduler import TaskScheduler
+from portage.util.futures.iter_completed import iter_gather
from .FetchTask import FetchTask
+from _emerge.CompositeTask import CompositeTask
+
class FetchIterator(object):
def __init__(self, config):
self._config = config
- self._log_failure = config.log_failure
self._terminated = threading.Event()
def terminate(self):
@@ -41,9 +45,6 @@ class FetchIterator(object):
portdb = self._config.portdb
get_repo_for_location = portdb.repositories.get_repo_for_location
- file_owners = self._config.file_owners
- file_failures = self._config.file_failures
- restrict_mirror_exemptions = self._config.restrict_mirror_exemptions
hash_filter = _hash_filter(
portdb.settings.get("PORTAGE_CHECKSUM_FILTER", ""))
@@ -59,110 +60,221 @@ class FetchIterator(object):
# Reset state so the Manifest is pulled once
# for this cp / tree combination.
- digests = None
repo_config = get_repo_for_location(tree)
+ digests_future = portdb._event_loop.create_future()
for cpv in portdb.cp_list(cp, mytree=tree):
if self._terminated.is_set():
return
- try:
- restrict, = portdb.aux_get(cpv, ("RESTRICT",),
- mytree=tree)
- except (KeyError, PortageException) as e:
- self._log_failure("%s\t\taux_get exception %s" %
- (cpv, e))
- continue
-
- # Here we use matchnone=True to ignore conditional parts
- # of RESTRICT since they don't apply unconditionally.
- # Assume such conditionals only apply on the client side.
- try:
- restrict = frozenset(use_reduce(restrict,
- flat=True, matchnone=True))
- except PortageException as e:
- self._log_failure("%s\t\tuse_reduce exception %s" %
- (cpv, e))
- continue
-
- if "fetch" in restrict:
- continue
-
- try:
- uri_map = portdb.getFetchMap(cpv)
- except PortageException as e:
- self._log_failure("%s\t\tgetFetchMap exception %s" %
- (cpv, e))
- continue
-
- if not uri_map:
- continue
-
- if "mirror" in restrict:
- skip = False
- if restrict_mirror_exemptions is not None:
- new_uri_map = {}
- for filename, uri_tuple in uri_map.items():
- for uri in uri_tuple:
- if uri[:9] == "mirror://":
- i = uri.find("/", 9)
- if i != -1 and uri[9:i].strip("/") in \
- restrict_mirror_exemptions:
- new_uri_map[filename] = uri_tuple
- break
- if new_uri_map:
- uri_map = new_uri_map
- else:
- skip = True
- else:
- skip = True
-
- if skip:
- continue
-
- # Parse Manifest for this cp if we haven't yet.
- if digests is None:
- try:
- digests = repo_config.load_manifest(
- os.path.join(repo_config.location, cp)
- ).getTypeDigests("DIST")
- except (EnvironmentError, PortageException) as e:
- for filename in uri_map:
- self._log_failure(
- "%s\t%s\tManifest exception %s" %
- (cpv, filename, e))
- file_failures[filename] = cpv
- continue
-
- if not digests:
- for filename in uri_map:
- self._log_failure("%s\t%s\tdigest entry missing" %
- (cpv, filename))
- file_failures[filename] = cpv
- continue
-
- for filename, uri_tuple in uri_map.items():
- file_digests = digests.get(filename)
- if file_digests is None:
- self._log_failure("%s\t%s\tdigest entry missing" %
- (cpv, filename))
- file_failures[filename] = cpv
- continue
- if filename in file_owners:
- continue
- file_owners[filename] = cpv
-
- file_digests = \
- _filter_unaccelarated_hashes(file_digests)
- if hash_filter is not None:
- file_digests = _apply_hash_filter(
- file_digests, hash_filter)
-
- yield FetchTask(cpv=cpv,
- background=True,
- digests=file_digests,
- distfile=filename,
- restrict=restrict,
- uri_tuple=uri_tuple,
- config=self._config)
+ yield _EbuildFetchTasks(
+ fetch_tasks_future=_async_fetch_tasks(
+ self._config,
+ hash_filter,
+ repo_config,
+ digests_future,
+ cpv,
+ portdb._event_loop)
+ )
+
+
+class _EbuildFetchTasks(CompositeTask):
+ """
+ Executes FetchTask instances (which are asynchronously constructed)
+ for each of the files referenced by an ebuild.
+ """
+ __slots__ = ('fetch_tasks_future',)
+ def _start(self):
+ self._start_task(AsyncTaskFuture(future=self.fetch_tasks_future),
+ self._start_fetch_tasks)
+
+ def _start_fetch_tasks(self, task):
+ if self._default_exit(task) != os.EX_OK:
+ self._async_wait()
+ return
+
+ self._start_task(
+ TaskScheduler(
+ iter(self.fetch_tasks_future.result()),
+ max_jobs=1,
+ event_loop=self.scheduler),
+ self._default_final_exit)
+
+
+def _async_fetch_tasks(config, hash_filter, repo_config, digests_future, cpv,
+ loop):
+ """
+ Asynchronously construct FetchTask instances for each of the files
+ referenced by an ebuild.
+
+ @param config: emirrordist config
+ @type config: portage._emirrordist.Config.Config
+ @param hash_filter: PORTAGE_CHECKSUM_FILTER settings
+ @type hash_filter: portage.checksum._hash_filter
+ @param repo_config: repository configuration
+ @type repo_config: RepoConfig
+ @param digests_future: future that contains cached distfiles digests
+ for the current cp if available
+ @type digests_future: asyncio.Future
+ @param cpv: current ebuild cpv
+ @type cpv: portage.versions._pkg_str
+ @param loop: event loop
+ @type loop: EventLoop
+ @return: A future that results in a list containing FetchTask
+ instances for each of the files referenced by an ebuild.
+ @rtype: asyncio.Future (or compatible)
+ """
+ loop = getattr(loop, '_asyncio_wrapper', loop)
+ result = loop.create_future()
+ fetch_tasks = []
+
+ def aux_get_done(gather_result):
+ # All exceptions must be consumed from gather_result before this
+ # function returns, in order to avoid triggering the event loop's
+ # exception handler.
+ if not gather_result.cancelled():
+ list(future.exception() for future in gather_result.result()
+ if not future.cancelled())
+
+ if result.cancelled():
+ return
+
+ aux_get_result, fetch_map_result = gather_result.result()
+ try:
+ restrict, = aux_get_result.result()
+ except (PortageKeyError, PortageException) as e:
+ config.log_failure("%s\t\taux_get exception %s" %
+ (cpv, e))
+ result.set_result(fetch_tasks)
+ return
+
+ # Here we use matchnone=True to ignore conditional parts
+ # of RESTRICT since they don't apply unconditionally.
+ # Assume such conditionals only apply on the client side.
+ try:
+ restrict = frozenset(use_reduce(restrict,
+ flat=True, matchnone=True))
+ except PortageException as e:
+ config.log_failure("%s\t\tuse_reduce exception %s" %
+ (cpv, e))
+ result.set_result(fetch_tasks)
+ return
+
+ if "fetch" in restrict:
+ result.set_result(fetch_tasks)
+ return
+
+ try:
+ uri_map = fetch_map_result.result()
+ except PortageException as e:
+ config.log_failure("%s\t\tgetFetchMap exception %s" %
+ (cpv, e))
+ result.set_result(fetch_tasks)
+ return
+
+ if not uri_map:
+ result.set_result(fetch_tasks)
+ return
+
+ if "mirror" in restrict:
+ skip = False
+ if config.restrict_mirror_exemptions is not None:
+ new_uri_map = {}
+ for filename, uri_tuple in uri_map.items():
+ for uri in uri_tuple:
+ if uri[:9] == "mirror://":
+ i = uri.find("/", 9)
+ if i != -1 and uri[9:i].strip("/") in \
+ config.restrict_mirror_exemptions:
+ new_uri_map[filename] = uri_tuple
+ break
+ if new_uri_map:
+ uri_map = new_uri_map
+ else:
+ skip = True
+ else:
+ skip = True
+
+ if skip:
+ result.set_result(fetch_tasks)
+ return
+
+ # Parse Manifest for this cp if we haven't yet.
+ try:
+ if digests_future.done():
+ # If there's an exception then raise it.
+ digests = digests_future.result()
+ else:
+ digests = repo_config.load_manifest(
+ os.path.join(repo_config.location, cpv.cp)).\
+ getTypeDigests("DIST")
+ except (EnvironmentError, PortageException) as e:
+ digests_future.done() or digests_future.set_exception(e)
+ for filename in uri_map:
+ config.log_failure(
+ "%s\t%s\tManifest exception %s" %
+ (cpv, filename, e))
+ config.file_failures[filename] = cpv
+ result.set_result(fetch_tasks)
+ return
+ else:
+ digests_future.done() or digests_future.set_result(digests)
+
+ if not digests:
+ for filename in uri_map:
+ config.log_failure("%s\t%s\tdigest entry missing" %
+ (cpv, filename))
+ config.file_failures[filename] = cpv
+ result.set_result(fetch_tasks)
+ return
+
+ for filename, uri_tuple in uri_map.items():
+ file_digests = digests.get(filename)
+ if file_digests is None:
+ config.log_failure("%s\t%s\tdigest entry missing" %
+ (cpv, filename))
+ config.file_failures[filename] = cpv
+ continue
+ if filename in config.file_owners:
+ continue
+ config.file_owners[filename] = cpv
+
+ file_digests = \
+ _filter_unaccelarated_hashes(file_digests)
+ if hash_filter is not None:
+ file_digests = _apply_hash_filter(
+ file_digests, hash_filter)
+
+ fetch_tasks.append(FetchTask(
+ cpv=cpv,
+ background=True,
+ digests=file_digests,
+ distfile=filename,
+ restrict=restrict,
+ uri_tuple=uri_tuple,
+ config=config))
+
+ result.set_result(fetch_tasks)
+
+ def future_generator():
+ yield config.portdb.async_aux_get(cpv, ("RESTRICT",),
+ myrepo=repo_config.name, loop=loop)
+ yield config.portdb.async_fetch_map(cpv,
+ mytree=repo_config.location, loop=loop)
+
+ # Use iter_gather(max_jobs=1) to limit the number of processes per
+ # _EbuildFetchTask instance, and also to avoid spawning two bash
+ # processes for the same cpv simultaneously (the second one can
+ # use metadata cached by the first one).
+ gather_result = iter_gather(
+ future_generator(),
+ max_jobs=1,
+ loop=loop,
+ )
+ gather_result.add_done_callback(aux_get_done)
+ result.add_done_callback(lambda result:
+ gather_result.cancel() if result.cancelled() and
+ not gather_result.done() else None)
+
+ return result
next reply other threads:[~2018-04-28 14:02 UTC|newest]
Thread overview: 20+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-04-28 14:01 Zac Medico [this message]
-- strict thread matches above, loose matches on Subject: below --
2018-04-28 23:08 [gentoo-commits] proj/portage:master commit in: pym/portage/_emirrordist/ Zac Medico
2018-04-28 21:57 Zac Medico
2018-04-26 8:46 Zac Medico
2018-04-25 7:30 Zac Medico
2017-03-24 20:33 Zac Medico
2017-03-24 20:33 Zac Medico
2016-01-07 16:50 Zac Medico
2014-02-02 3:15 Arfrever Frehtes Taifersar Arahesis
2013-08-12 23:09 Zac Medico
2013-08-02 23:27 Zac Medico
2013-06-20 1:23 Zac Medico
2013-01-10 10:35 Zac Medico
2013-01-10 9:40 Zac Medico
2013-01-10 9:18 Zac Medico
2013-01-10 9:05 Zac Medico
2013-01-10 8:41 Zac Medico
2013-01-10 7:54 Zac Medico
2013-01-10 3:57 Zac Medico
2013-01-09 22:23 Zac Medico
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1524923802.7c652d1b967f9c4c6a7fbd9fc5e46a0e57438a16.zmedico@gentoo \
--to=zmedico@gentoo.org \
--cc=gentoo-commits@lists.gentoo.org \
--cc=gentoo-dev@lists.gentoo.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox