public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/portage:master commit in: pym/portage/tests/dbapi/, pym/portage/package/ebuild/_parallel_manifest/, ...
@ 2018-04-27 23:42 Zac Medico
  0 siblings, 0 replies; only message in thread
From: Zac Medico @ 2018-04-27 23:42 UTC (permalink / raw
  To: gentoo-commits

commit:     3e77f0199cb401acf974089fb6aa378fd45d0e90
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Apr 24 06:54:05 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Apr 27 22:56:02 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=3e77f019

ManifestScheduler: async fetchlist_dict (bug 653946)

In order to avoid event loop recursion, pass fetchlist_dict to
ManifestTask as a Future.

Bug: https://bugs.gentoo.org/653946

 pym/portage/dbapi/porttree.py                      | 70 ++++++++++++++++++++++
 .../ebuild/_parallel_manifest/ManifestScheduler.py | 25 ++++----
 .../ebuild/_parallel_manifest/ManifestTask.py      | 24 +++++++-
 pym/portage/tests/dbapi/test_portdb_cache.py       |  3 +-
 4 files changed, 105 insertions(+), 17 deletions(-)

diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py
index 975f03d5e..3ce214cd7 100644
--- a/pym/portage/dbapi/porttree.py
+++ b/pym/portage/dbapi/porttree.py
@@ -37,6 +37,7 @@ from portage import _unicode_encode
 from portage import OrderedDict
 from portage.util._eventloop.EventLoop import EventLoop
 from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures.iter_completed import iter_gather
 from _emerge.EbuildMetadataPhase import EbuildMetadataPhase
 
 import os as _os
@@ -1393,6 +1394,75 @@ class FetchlistDict(Mapping):
 	if sys.hexversion >= 0x3000000:
 		keys = __iter__
 
+
+def _async_manifest_fetchlist(portdb, repo_config, cp, cpv_list=None,
+	max_jobs=None, max_load=None, loop=None):
+	"""
+	Asynchronous form of FetchlistDict, with max_jobs and max_load
+	parameters in order to control async_aux_get concurrency.
+
+	@param portdb: portdbapi instance
+	@type portdb: portdbapi
+	@param repo_config: repository configuration for a Manifest
+	@type repo_config: RepoConfig
+	@param cp: cp for a Manifest
+	@type cp: str
+	@param cpv_list: list of ebuild cpv values for a Manifest
+	@type cpv_list: list
+	@param max_jobs: max number of futures to process concurrently (default
+		is multiprocessing.cpu_count())
+	@type max_jobs: int
+	@param max_load: max load allowed when scheduling a new future,
+		otherwise schedule no more than 1 future at a time (default
+		is multiprocessing.cpu_count())
+	@type max_load: int or float
+	@param loop: event loop
+	@type loop: EventLoop
+	@return: a Future resulting in a Mapping compatible with FetchlistDict
+	@rtype: asyncio.Future (or compatible)
+	"""
+	loop = loop or global_event_loop()
+	loop = getattr(loop, '_asyncio_wrapper', loop)
+	result = loop.create_future()
+	cpv_list = (portdb.cp_list(cp, mytree=repo_config.location)
+		if cpv_list is None else cpv_list)
+
+	def gather_done(gather_result):
+		# All exceptions must be consumed from gather_result before this
+		# function returns, in order to avoid triggering the event loop's
+		# exception handler.
+		e = None
+		if not gather_result.cancelled():
+			for future in gather_result.result():
+				if (future.done() and not future.cancelled() and
+					future.exception() is not None):
+					e = future.exception()
+
+		if result.cancelled():
+			return
+		elif e is None:
+			result.set_result(dict((k, list(v.result()))
+				for k, v in zip(cpv_list, gather_result.result())))
+		else:
+			result.set_exception(e)
+
+	gather_result = iter_gather(
+		# Use a generator expression for lazy evaluation, so that iter_gather
+		# controls the number of concurrent async_fetch_map calls.
+		(portdb.async_fetch_map(cpv, mytree=repo_config.location, loop=loop)
+			for cpv in cpv_list),
+		max_jobs=max_jobs,
+		max_load=max_load,
+		loop=loop,
+	)
+
+	gather_result.add_done_callback(gather_done)
+	result.add_done_callback(lambda result:
+		gather_result.cancel() if result.cancelled() else None)
+
+	return result
+
+
 def _parse_uri_map(cpv, metadata, use=None):
 
 	myuris = use_reduce(metadata.get('SRC_URI', ''),

diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
index 38ac4825e..fabea9bc1 100644
--- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
@@ -1,10 +1,10 @@
-# Copyright 2012-2013 Gentoo Foundation
+# Copyright 2012-2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 import portage
 from portage import os
+from portage.dbapi.porttree import _async_manifest_fetchlist
 from portage.dep import _repo_separator
-from portage.exception import InvalidDependString
 from portage.localization import _
 from portage.util._async.AsyncScheduler import AsyncScheduler
 from .ManifestTask import ManifestTask
@@ -63,21 +63,14 @@ class ManifestScheduler(AsyncScheduler):
 				cpv_list = portdb.cp_list(cp, mytree=[repo_config.location])
 				if not cpv_list:
 					continue
-				fetchlist_dict = {}
-				try:
-					for cpv in cpv_list:
-						fetchlist_dict[cpv] = \
-							list(portdb.getFetchMap(cpv, mytree=mytree))
-				except InvalidDependString as e:
-					portage.writemsg(
-						_("!!! %s%s%s: SRC_URI: %s\n") %
-						(cp, _repo_separator, repo_config.name, e),
-						noiselevel=-1)
-					self._error_count += 1
-					continue
 
+				# Use _async_manifest_fetchlist(max_jobs=1), since we
+				# spawn concurrent ManifestTask instances.
 				yield ManifestTask(cp=cp, distdir=distdir,
-					fetchlist_dict=fetchlist_dict, repo_config=repo_config,
+					fetchlist_dict=_async_manifest_fetchlist(
+						portdb, repo_config, cp, cpv_list=cpv_list,
+						max_jobs=1, loop=self._event_loop),
+					repo_config=repo_config,
 					gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars,
 					force_sign_key=self._force_sign_key)
 
@@ -91,3 +84,5 @@ class ManifestScheduler(AsyncScheduler):
 					noiselevel=-1)
 
 		AsyncScheduler._task_exit(self, task)
+
+

diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
index 0ee2b910d..6bf5e82ef 100644
--- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2013 Gentoo Foundation
+# Copyright 2012-2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 import errno
@@ -8,8 +8,12 @@ import subprocess
 from portage import os
 from portage import _unicode_encode, _encodings
 from portage.const import MANIFEST2_IDENTIFIERS
+from portage.dep import _repo_separator
+from portage.exception import InvalidDependString
+from portage.localization import _
 from portage.util import (atomic_ofstream, grablines,
 	shlex_split, varexpand, writemsg)
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
 from portage.util._async.PipeLogger import PipeLogger
 from portage.util._async.PopenProcess import PopenProcess
 from _emerge.CompositeTask import CompositeTask
@@ -29,6 +33,24 @@ class ManifestTask(CompositeTask):
 	def _start(self):
 		self._manifest_path = os.path.join(self.repo_config.location,
 			self.cp, "Manifest")
+
+		self._start_task(
+			AsyncTaskFuture(future=self.fetchlist_dict),
+			self._start_with_fetchlist)
+
+	def _start_with_fetchlist(self, fetchlist_task):
+		if self._default_exit(fetchlist_task) != os.EX_OK:
+			if not self.fetchlist_dict.cancelled():
+				try:
+					self.fetchlist_dict.result()
+				except InvalidDependString as e:
+					writemsg(
+						_("!!! %s%s%s: SRC_URI: %s\n") %
+						(self.cp, _repo_separator, self.repo_config.name, e),
+						noiselevel=-1)
+			self._async_wait()
+			return
+		self.fetchlist_dict = self.fetchlist_dict.result()
 		manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir,
 			fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config,
 			scheduler=self.scheduler)

diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py b/pym/portage/tests/dbapi/test_portdb_cache.py
index bd934460a..d3101b120 100644
--- a/pym/portage/tests/dbapi/test_portdb_cache.py
+++ b/pym/portage/tests/dbapi/test_portdb_cache.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2015 Gentoo Foundation
+# Copyright 2012-2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 import subprocess
@@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase):
 		portage_python = portage._python_interpreter
 		egencache_cmd = (portage_python, "-b", "-Wd",
 			os.path.join(self.bindir, "egencache"),
+			"--update-manifests", "--sign-manifests=n",
 			"--repo", "test_repo",
 			"--repositories-configuration", settings.repositories.config_string())
 		python_cmd = (portage_python, "-b", "-Wd", "-c")


^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2018-04-27 23:42 UTC | newest]

Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2018-04-27 23:42 [gentoo-commits] proj/portage:master commit in: pym/portage/tests/dbapi/, pym/portage/package/ebuild/_parallel_manifest/, Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox