public inbox for gentoo-portage-dev@lists.gentoo.org
 help / color / mirror / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download: 
* [gentoo-portage-dev] [PATCH] FetchIterator: fix event loop recursion (bug 654038)
@ 2018-04-25  9:20 99% Zac Medico
  0 siblings, 0 replies; 1+ results
From: Zac Medico @ 2018-04-25  9:20 UTC (permalink / raw
  To: gentoo-portage-dev; +Cc: Zac Medico

Since construction of FetchTask instances requires results from
aux_get calls that would trigger event loop recursion when executed
synchronously, add a _fetch_tasks_future function to construct
FetchTask instances asynchronously and return a Future. Use an
_EbuildFetchTasks class to wait for the FetchTask instances to
become available, and then execute them.

Bug: https://bugs.gentoo.org/654038
---
 pym/portage/_emirrordist/FetchIterator.py | 311 ++++++++++++++++++++----------
 1 file changed, 206 insertions(+), 105 deletions(-)

diff --git a/pym/portage/_emirrordist/FetchIterator.py b/pym/portage/_emirrordist/FetchIterator.py
index 38419799d..bd3b98cd0 100644
--- a/pym/portage/_emirrordist/FetchIterator.py
+++ b/pym/portage/_emirrordist/FetchIterator.py
@@ -7,14 +7,18 @@ from portage import os
 from portage.checksum import (_apply_hash_filter,
 	_filter_unaccelarated_hashes, _hash_filter)
 from portage.dep import use_reduce
-from portage.exception import PortageException
+from portage.exception import PortageException, PortageKeyError
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
+from portage.util._async.TaskScheduler import TaskScheduler
+from portage.util.futures.iter_completed import iter_gather
 from .FetchTask import FetchTask
+from _emerge.CompositeTask import CompositeTask
+
 
 class FetchIterator(object):
 
 	def __init__(self, config):
 		self._config = config
-		self._log_failure = config.log_failure
 		self._terminated = threading.Event()
 
 	def terminate(self):
@@ -41,9 +45,6 @@ class FetchIterator(object):
 
 		portdb = self._config.portdb
 		get_repo_for_location = portdb.repositories.get_repo_for_location
-		file_owners = self._config.file_owners
-		file_failures = self._config.file_failures
-		restrict_mirror_exemptions = self._config.restrict_mirror_exemptions
 
 		hash_filter = _hash_filter(
 			portdb.settings.get("PORTAGE_CHECKSUM_FILTER", ""))
@@ -59,110 +60,210 @@ class FetchIterator(object):
 
 				# Reset state so the Manifest is pulled once
 				# for this cp / tree combination.
-				digests = None
 				repo_config = get_repo_for_location(tree)
+				digests_future = portdb._event_loop.create_future()
 
 				for cpv in portdb.cp_list(cp, mytree=tree):
 
 					if self._terminated.is_set():
 						return
 
-					try:
-						restrict, = portdb.aux_get(cpv, ("RESTRICT",),
-							mytree=tree)
-					except (KeyError, PortageException) as e:
-						self._log_failure("%s\t\taux_get exception %s" %
-							(cpv, e))
-						continue
-
-					# Here we use matchnone=True to ignore conditional parts
-					# of RESTRICT since they don't apply unconditionally.
-					# Assume such conditionals only apply on the client side.
-					try:
-						restrict = frozenset(use_reduce(restrict,
-							flat=True, matchnone=True))
-					except PortageException as e:
-						self._log_failure("%s\t\tuse_reduce exception %s" %
-							(cpv, e))
-						continue
-
-					if "fetch" in restrict:
-						continue
-
-					try:
-						uri_map = portdb.getFetchMap(cpv)
-					except PortageException as e:
-						self._log_failure("%s\t\tgetFetchMap exception %s" %
-							(cpv, e))
-						continue
-
-					if not uri_map:
-						continue
-
-					if "mirror" in restrict:
-						skip = False
-						if restrict_mirror_exemptions is not None:
-							new_uri_map = {}
-							for filename, uri_tuple in uri_map.items():
-								for uri in uri_tuple:
-									if uri[:9] == "mirror://":
-										i = uri.find("/", 9)
-										if i != -1 and uri[9:i].strip("/") in \
-											restrict_mirror_exemptions:
-											new_uri_map[filename] = uri_tuple
-											break
-							if new_uri_map:
-								uri_map = new_uri_map
-							else:
-								skip = True
-						else:
-							skip = True
-
-						if skip:
-							continue
-
-					# Parse Manifest for this cp if we haven't yet.
-					if digests is None:
-						try:
-							digests = repo_config.load_manifest(
-								os.path.join(repo_config.location, cp)
-								).getTypeDigests("DIST")
-						except (EnvironmentError, PortageException) as e:
-							for filename in uri_map:
-								self._log_failure(
-									"%s\t%s\tManifest exception %s" %
-									(cpv, filename, e))
-								file_failures[filename] = cpv
-							continue
-
-					if not digests:
-						for filename in uri_map:
-							self._log_failure("%s\t%s\tdigest entry missing" %
-								(cpv, filename))
-							file_failures[filename] = cpv
-						continue
-
-					for filename, uri_tuple in uri_map.items():
-						file_digests = digests.get(filename)
-						if file_digests is None:
-							self._log_failure("%s\t%s\tdigest entry missing" %
-								(cpv, filename))
-							file_failures[filename] = cpv
-							continue
-						if filename in file_owners:
-							continue
-						file_owners[filename] = cpv
-
-						file_digests = \
-							_filter_unaccelarated_hashes(file_digests)
-						if hash_filter is not None:
-							file_digests = _apply_hash_filter(
-								file_digests, hash_filter)
-
-						yield FetchTask(cpv=cpv,
-							background=True,
-							digests=file_digests,
-							distfile=filename,
-							restrict=restrict,
-							uri_tuple=uri_tuple,
-							config=self._config)
+					yield _EbuildFetchTasks(
+						fetch_tasks_future=_fetch_tasks_future(
+							self._config,
+							hash_filter,
+							repo_config,
+							digests_future,
+							cpv)
+					)
+
+
+class _EbuildFetchTasks(CompositeTask):
+	"""
+	This executes asynchronously constructed FetchTask instances for
+	each of the files referenced by an ebuild.
+	"""
+	__slots__ = ('fetch_tasks_future',)
+	def _start(self):
+		self._start_task(AsyncTaskFuture(future=self.fetch_tasks_future),
+			self._start_fetch_tasks)
+
+	def _start_fetch_tasks(self, task):
+		if self._default_exit(task) != os.EX_OK:
+			self._async_wait()
+			return
+
+		self._start_task(
+			TaskScheduler(
+				iter(self.fetch_tasks_future.result()),
+				max_jobs=1,
+				event_loop=self.scheduler),
+			self._default_final_exit)
+
+
+def _fetch_tasks_future(config, hash_filter, repo_config, digests_future, cpv):
+	"""
+	Asynchronously construct FetchTask instances for each of the files
+	referenced by an ebuild.
+
+	@param config: emirrordist config
+	@type config: portage._emirrordist.Config.Config
+	@param hash_filter: PORTAGE_CHECKSUM_FILTER settings
+	@type hash_filter: portage.checksum._hash_filter
+	@param repo_config: repository configuration
+	@type repo_config: RepoConfig
+	@param digests_future: future that contains cached distfiles digests
+		for the current cp if available
+	@type digests_future: asyncio.Future
+	@param cpv: current ebuild cpv
+	@type cpv: portage.versions._pkg_str
+
+	@return: A future that results in a list containing FetchTask
+		instances for each of the files referenced by an ebuild.
+	@rtype: asyncio.Future (or compatible)
+	"""
+
+	loop = config.portdb._event_loop
+	result = loop.create_future()
+	fetch_tasks = []
+
+	def aux_get_done(gather_result):
+		if result.cancelled():
+			return
+
+		aux_get_result, fetch_map_result = gather_result.result()
+		try:
+			restrict, = aux_get_result.result()
+		except (PortageKeyError, PortageException) as e:
+			config.log_failure("%s\t\taux_get exception %s" %
+				(cpv, e))
+			result.set_result(fetch_tasks)
+			return
+
+		# Here we use matchnone=True to ignore conditional parts
+		# of RESTRICT since they don't apply unconditionally.
+		# Assume such conditionals only apply on the client side.
+		try:
+			restrict = frozenset(use_reduce(restrict,
+				flat=True, matchnone=True))
+		except PortageException as e:
+			config.log_failure("%s\t\tuse_reduce exception %s" %
+				(cpv, e))
+			result.set_result(fetch_tasks)
+			return
+
+		if "fetch" in restrict:
+			result.set_result(fetch_tasks)
+			return
+
+		try:
+			uri_map = fetch_map_result.result()
+		except PortageException as e:
+			config.log_failure("%s\t\tgetFetchMap exception %s" %
+				(cpv, e))
+			result.set_result(fetch_tasks)
+			return
+
+		if not uri_map:
+			result.set_result(fetch_tasks)
+			return
+
+		if "mirror" in restrict:
+			skip = False
+			if config.restrict_mirror_exemptions is not None:
+				new_uri_map = {}
+				for filename, uri_tuple in uri_map.items():
+					for uri in uri_tuple:
+						if uri[:9] == "mirror://":
+							i = uri.find("/", 9)
+							if i != -1 and uri[9:i].strip("/") in \
+								config.restrict_mirror_exemptions:
+								new_uri_map[filename] = uri_tuple
+								break
+				if new_uri_map:
+					uri_map = new_uri_map
+				else:
+					skip = True
+			else:
+				skip = True
+
+			if skip:
+				result.set_result(fetch_tasks)
+				return
+
+		# Parse Manifest for this cp if we haven't yet.
+		if not digests_future.done():
+			try:
+				digests = repo_config.load_manifest(
+					os.path.join(repo_config.location, cpv.cp)).\
+					getTypeDigests("DIST")
+			except (EnvironmentError, PortageException) as e:
+				digests_future.set_exception(e)
+				for filename in uri_map:
+					config.log_failure(
+						"%s\t%s\tManifest exception %s" %
+						(cpv, filename, e))
+					config.file_failures[filename] = cpv
+				result.set_result(fetch_tasks)
+				return
+			else:
+				digests_future.set_result(digests)
+
+		digests = digests_future.result()
+		if not digests:
+			for filename in uri_map:
+				config.log_failure("%s\t%s\tdigest entry missing" %
+					(cpv, filename))
+				config.file_failures[filename] = cpv
+			result.set_result(fetch_tasks)
+			return
+
+		for filename, uri_tuple in uri_map.items():
+			file_digests = digests.get(filename)
+			if file_digests is None:
+				config.log_failure("%s\t%s\tdigest entry missing" %
+					(cpv, filename))
+				config.file_failures[filename] = cpv
+				continue
+			if filename in config.file_owners:
+				continue
+			config.file_owners[filename] = cpv
+
+			file_digests = \
+				_filter_unaccelarated_hashes(file_digests)
+			if hash_filter is not None:
+				file_digests = _apply_hash_filter(
+					file_digests, hash_filter)
+
+			fetch_tasks.append(FetchTask(
+				cpv=cpv,
+				background=True,
+				digests=file_digests,
+				distfile=filename,
+				restrict=restrict,
+				uri_tuple=uri_tuple,
+				config=config))
+
+		result.set_result(fetch_tasks)
+
+	def future_generator():
+		yield config.portdb.async_aux_get(cpv, ("RESTRICT",),
+			myrepo=repo_config.name, loop=loop)
+		yield config.portdb.async_fetch_map(cpv,
+			mytree=repo_config.location, loop=loop)
+
+	# Use iter_gather(max_jobs=1) to limit the number of processes per
+	# _EbuildFetchTask instance, and also to avoid spawning two bash
+	# processes for the same cpv simultaneously (the second one can
+	# use metadata cached by the first one).
+	gather_result = iter_gather(
+		future_generator(),
+		max_jobs=1,
+		loop=loop,
+	)
+	gather_result.add_done_callback(aux_get_done)
+	result.add_done_callback(lambda result:
+		gather_result.cancel() if result.cancelled() and
+		not gather_result.done() else None)
+
+	return result
-- 
2.13.6



^ permalink raw reply related	[relevance 99%]

Results 1-1 of 1 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2018-04-25  9:20 99% [gentoo-portage-dev] [PATCH] FetchIterator: fix event loop recursion (bug 654038) Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox