public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
From: "Zac Medico" <zmedico@gentoo.org>
To: gentoo-commits@lists.gentoo.org
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/_emirrordist/
Date: Sat, 28 Apr 2018 14:01:58 +0000 (UTC)	[thread overview]
Message-ID: <1524923802.7c652d1b967f9c4c6a7fbd9fc5e46a0e57438a16.zmedico@gentoo> (raw)

commit:     7c652d1b967f9c4c6a7fbd9fc5e46a0e57438a16
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Apr 25 06:42:10 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sat Apr 28 13:56:42 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=7c652d1b

FetchIterator: fix event loop recursion (bug 654038)

Since construction of FetchTask instances requires results from
aux_get calls that would trigger event loop recursion when executed
synchronously, add an _async_fetch_tasks function to construct
FetchTask instances asynchronously and return a Future. Use an
_EbuildFetchTasks class to wait for the FetchTask instances to
become available, and then execute them.

Bug: https://bugs.gentoo.org/654038

 pym/portage/_emirrordist/FetchIterator.py | 324 ++++++++++++++++++++----------
 1 file changed, 218 insertions(+), 106 deletions(-)

diff --git a/pym/portage/_emirrordist/FetchIterator.py b/pym/portage/_emirrordist/FetchIterator.py
index 38419799d..366453c12 100644
--- a/pym/portage/_emirrordist/FetchIterator.py
+++ b/pym/portage/_emirrordist/FetchIterator.py
@@ -1,4 +1,4 @@
-# Copyright 2013 Gentoo Foundation
+# Copyright 2013-2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 import threading
@@ -7,14 +7,18 @@ from portage import os
 from portage.checksum import (_apply_hash_filter,
 	_filter_unaccelarated_hashes, _hash_filter)
 from portage.dep import use_reduce
-from portage.exception import PortageException
+from portage.exception import PortageException, PortageKeyError
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
+from portage.util._async.TaskScheduler import TaskScheduler
+from portage.util.futures.iter_completed import iter_gather
 from .FetchTask import FetchTask
+from _emerge.CompositeTask import CompositeTask
+
 
 class FetchIterator(object):
 
 	def __init__(self, config):
 		self._config = config
-		self._log_failure = config.log_failure
 		self._terminated = threading.Event()
 
 	def terminate(self):
@@ -41,9 +45,6 @@ class FetchIterator(object):
 
 		portdb = self._config.portdb
 		get_repo_for_location = portdb.repositories.get_repo_for_location
-		file_owners = self._config.file_owners
-		file_failures = self._config.file_failures
-		restrict_mirror_exemptions = self._config.restrict_mirror_exemptions
 
 		hash_filter = _hash_filter(
 			portdb.settings.get("PORTAGE_CHECKSUM_FILTER", ""))
@@ -59,110 +60,221 @@ class FetchIterator(object):
 
 				# Reset state so the Manifest is pulled once
 				# for this cp / tree combination.
-				digests = None
 				repo_config = get_repo_for_location(tree)
+				digests_future = portdb._event_loop.create_future()
 
 				for cpv in portdb.cp_list(cp, mytree=tree):
 
 					if self._terminated.is_set():
 						return
 
-					try:
-						restrict, = portdb.aux_get(cpv, ("RESTRICT",),
-							mytree=tree)
-					except (KeyError, PortageException) as e:
-						self._log_failure("%s\t\taux_get exception %s" %
-							(cpv, e))
-						continue
-
-					# Here we use matchnone=True to ignore conditional parts
-					# of RESTRICT since they don't apply unconditionally.
-					# Assume such conditionals only apply on the client side.
-					try:
-						restrict = frozenset(use_reduce(restrict,
-							flat=True, matchnone=True))
-					except PortageException as e:
-						self._log_failure("%s\t\tuse_reduce exception %s" %
-							(cpv, e))
-						continue
-
-					if "fetch" in restrict:
-						continue
-
-					try:
-						uri_map = portdb.getFetchMap(cpv)
-					except PortageException as e:
-						self._log_failure("%s\t\tgetFetchMap exception %s" %
-							(cpv, e))
-						continue
-
-					if not uri_map:
-						continue
-
-					if "mirror" in restrict:
-						skip = False
-						if restrict_mirror_exemptions is not None:
-							new_uri_map = {}
-							for filename, uri_tuple in uri_map.items():
-								for uri in uri_tuple:
-									if uri[:9] == "mirror://":
-										i = uri.find("/", 9)
-										if i != -1 and uri[9:i].strip("/") in \
-											restrict_mirror_exemptions:
-											new_uri_map[filename] = uri_tuple
-											break
-							if new_uri_map:
-								uri_map = new_uri_map
-							else:
-								skip = True
-						else:
-							skip = True
-
-						if skip:
-							continue
-
-					# Parse Manifest for this cp if we haven't yet.
-					if digests is None:
-						try:
-							digests = repo_config.load_manifest(
-								os.path.join(repo_config.location, cp)
-								).getTypeDigests("DIST")
-						except (EnvironmentError, PortageException) as e:
-							for filename in uri_map:
-								self._log_failure(
-									"%s\t%s\tManifest exception %s" %
-									(cpv, filename, e))
-								file_failures[filename] = cpv
-							continue
-
-					if not digests:
-						for filename in uri_map:
-							self._log_failure("%s\t%s\tdigest entry missing" %
-								(cpv, filename))
-							file_failures[filename] = cpv
-						continue
-
-					for filename, uri_tuple in uri_map.items():
-						file_digests = digests.get(filename)
-						if file_digests is None:
-							self._log_failure("%s\t%s\tdigest entry missing" %
-								(cpv, filename))
-							file_failures[filename] = cpv
-							continue
-						if filename in file_owners:
-							continue
-						file_owners[filename] = cpv
-
-						file_digests = \
-							_filter_unaccelarated_hashes(file_digests)
-						if hash_filter is not None:
-							file_digests = _apply_hash_filter(
-								file_digests, hash_filter)
-
-						yield FetchTask(cpv=cpv,
-							background=True,
-							digests=file_digests,
-							distfile=filename,
-							restrict=restrict,
-							uri_tuple=uri_tuple,
-							config=self._config)
+					yield _EbuildFetchTasks(
+						fetch_tasks_future=_async_fetch_tasks(
+							self._config,
+							hash_filter,
+							repo_config,
+							digests_future,
+							cpv,
+							portdb._event_loop)
+					)
+
+
+class _EbuildFetchTasks(CompositeTask):
+	"""
+	Executes FetchTask instances (which are asynchronously constructed)
+	for each of the files referenced by an ebuild.
+	"""
+	__slots__ = ('fetch_tasks_future',)
+	def _start(self):
+		self._start_task(AsyncTaskFuture(future=self.fetch_tasks_future),
+			self._start_fetch_tasks)
+
+	def _start_fetch_tasks(self, task):
+		if self._default_exit(task) != os.EX_OK:
+			self._async_wait()
+			return
+
+		self._start_task(
+			TaskScheduler(
+				iter(self.fetch_tasks_future.result()),
+				max_jobs=1,
+				event_loop=self.scheduler),
+			self._default_final_exit)
+
+
+def _async_fetch_tasks(config, hash_filter, repo_config, digests_future, cpv,
+	loop):
+	"""
+	Asynchronously construct FetchTask instances for each of the files
+	referenced by an ebuild.
+
+	@param config: emirrordist config
+	@type config: portage._emirrordist.Config.Config
+	@param hash_filter: PORTAGE_CHECKSUM_FILTER settings
+	@type hash_filter: portage.checksum._hash_filter
+	@param repo_config: repository configuration
+	@type repo_config: RepoConfig
+	@param digests_future: future that contains cached distfiles digests
+		for the current cp if available
+	@type digests_future: asyncio.Future
+	@param cpv: current ebuild cpv
+	@type cpv: portage.versions._pkg_str
+	@param loop: event loop
+	@type loop: EventLoop
+	@return: A future that results in a list containing FetchTask
+		instances for each of the files referenced by an ebuild.
+	@rtype: asyncio.Future (or compatible)
+	"""
+	loop = getattr(loop, '_asyncio_wrapper', loop)
+	result = loop.create_future()
+	fetch_tasks = []
+
+	def aux_get_done(gather_result):
+		# All exceptions must be consumed from gather_result before this
+		# function returns, in order to avoid triggering the event loop's
+		# exception handler.
+		if not gather_result.cancelled():
+			list(future.exception() for future in gather_result.result()
+				if not future.cancelled())
+
+		if result.cancelled():
+			return
+
+		aux_get_result, fetch_map_result = gather_result.result()
+		try:
+			restrict, = aux_get_result.result()
+		except (PortageKeyError, PortageException) as e:
+			config.log_failure("%s\t\taux_get exception %s" %
+				(cpv, e))
+			result.set_result(fetch_tasks)
+			return
+
+		# Here we use matchnone=True to ignore conditional parts
+		# of RESTRICT since they don't apply unconditionally.
+		# Assume such conditionals only apply on the client side.
+		try:
+			restrict = frozenset(use_reduce(restrict,
+				flat=True, matchnone=True))
+		except PortageException as e:
+			config.log_failure("%s\t\tuse_reduce exception %s" %
+				(cpv, e))
+			result.set_result(fetch_tasks)
+			return
+
+		if "fetch" in restrict:
+			result.set_result(fetch_tasks)
+			return
+
+		try:
+			uri_map = fetch_map_result.result()
+		except PortageException as e:
+			config.log_failure("%s\t\tgetFetchMap exception %s" %
+				(cpv, e))
+			result.set_result(fetch_tasks)
+			return
+
+		if not uri_map:
+			result.set_result(fetch_tasks)
+			return
+
+		if "mirror" in restrict:
+			skip = False
+			if config.restrict_mirror_exemptions is not None:
+				new_uri_map = {}
+				for filename, uri_tuple in uri_map.items():
+					for uri in uri_tuple:
+						if uri[:9] == "mirror://":
+							i = uri.find("/", 9)
+							if i != -1 and uri[9:i].strip("/") in \
+								config.restrict_mirror_exemptions:
+								new_uri_map[filename] = uri_tuple
+								break
+				if new_uri_map:
+					uri_map = new_uri_map
+				else:
+					skip = True
+			else:
+				skip = True
+
+			if skip:
+				result.set_result(fetch_tasks)
+				return
+
+		# Parse Manifest for this cp if we haven't yet.
+		try:
+			if digests_future.done():
+				# If there's an exception then raise it.
+				digests = digests_future.result()
+			else:
+				digests = repo_config.load_manifest(
+					os.path.join(repo_config.location, cpv.cp)).\
+					getTypeDigests("DIST")
+		except (EnvironmentError, PortageException) as e:
+			digests_future.done() or digests_future.set_exception(e)
+			for filename in uri_map:
+				config.log_failure(
+					"%s\t%s\tManifest exception %s" %
+					(cpv, filename, e))
+				config.file_failures[filename] = cpv
+			result.set_result(fetch_tasks)
+			return
+		else:
+			digests_future.done() or digests_future.set_result(digests)
+
+		if not digests:
+			for filename in uri_map:
+				config.log_failure("%s\t%s\tdigest entry missing" %
+					(cpv, filename))
+				config.file_failures[filename] = cpv
+			result.set_result(fetch_tasks)
+			return
+
+		for filename, uri_tuple in uri_map.items():
+			file_digests = digests.get(filename)
+			if file_digests is None:
+				config.log_failure("%s\t%s\tdigest entry missing" %
+					(cpv, filename))
+				config.file_failures[filename] = cpv
+				continue
+			if filename in config.file_owners:
+				continue
+			config.file_owners[filename] = cpv
+
+			file_digests = \
+				_filter_unaccelarated_hashes(file_digests)
+			if hash_filter is not None:
+				file_digests = _apply_hash_filter(
+					file_digests, hash_filter)
+
+			fetch_tasks.append(FetchTask(
+				cpv=cpv,
+				background=True,
+				digests=file_digests,
+				distfile=filename,
+				restrict=restrict,
+				uri_tuple=uri_tuple,
+				config=config))
+
+		result.set_result(fetch_tasks)
+
+	def future_generator():
+		yield config.portdb.async_aux_get(cpv, ("RESTRICT",),
+			myrepo=repo_config.name, loop=loop)
+		yield config.portdb.async_fetch_map(cpv,
+			mytree=repo_config.location, loop=loop)
+
+	# Use iter_gather(max_jobs=1) to limit the number of processes per
+	# _EbuildFetchTask instance, and also to avoid spawning two bash
+	# processes for the same cpv simultaneously (the second one can
+	# use metadata cached by the first one).
+	gather_result = iter_gather(
+		future_generator(),
+		max_jobs=1,
+		loop=loop,
+	)
+	gather_result.add_done_callback(aux_get_done)
+	result.add_done_callback(lambda result:
+		gather_result.cancel() if result.cancelled() and
+		not gather_result.done() else None)
+
+	return result


             reply	other threads:[~2018-04-28 14:02 UTC|newest]

Thread overview: 20+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-04-28 14:01 Zac Medico [this message]
  -- strict thread matches above, loose matches on Subject: below --
2018-04-28 23:08 [gentoo-commits] proj/portage:master commit in: pym/portage/_emirrordist/ Zac Medico
2018-04-28 21:57 Zac Medico
2018-04-26  8:46 Zac Medico
2018-04-25  7:30 Zac Medico
2017-03-24 20:33 Zac Medico
2017-03-24 20:33 Zac Medico
2016-01-07 16:50 Zac Medico
2014-02-02  3:15 Arfrever Frehtes Taifersar Arahesis
2013-08-12 23:09 Zac Medico
2013-08-02 23:27 Zac Medico
2013-06-20  1:23 Zac Medico
2013-01-10 10:35 Zac Medico
2013-01-10  9:40 Zac Medico
2013-01-10  9:18 Zac Medico
2013-01-10  9:05 Zac Medico
2013-01-10  8:41 Zac Medico
2013-01-10  7:54 Zac Medico
2013-01-10  3:57 Zac Medico
2013-01-09 22:23 Zac Medico

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1524923802.7c652d1b967f9c4c6a7fbd9fc5e46a0e57438a16.zmedico@gentoo \
    --to=zmedico@gentoo.org \
    --cc=gentoo-commits@lists.gentoo.org \
    --cc=gentoo-dev@lists.gentoo.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox