public inbox for gentoo-portage-dev@lists.gentoo.org
 help / color / mirror / Atom feed
From: Zac Medico <zmedico@gentoo.org>
To: gentoo-portage-dev@lists.gentoo.org
Cc: Zac Medico <zmedico@gentoo.org>
Subject: [gentoo-portage-dev] [PATCH v2] emerge: enable parallel-fetch during pkg_pretend (bug 710432)
Date: Sun, 20 Sep 2020 22:06:56 -0700	[thread overview]
Message-ID: <20200921050656.460304-1-zmedico@gentoo.org> (raw)
In-Reply-To: <20200920020527.355385-1-zmedico@gentoo.org>

Execute pkg_pretend phases in a coroutine while parallel-fetch
is running concurrently. When it's time to execute the pkg_pretend
phase for a remote binary package, use a Scheduler _get_prefetcher
method to get a running prefetcher if available, and otherwise
start a new fetcher.

Since pkg_pretend phases now run inside of the --keep-going retry
loop, --keep-going is now able to recover from pkg_pretend
failures, which fixes bug 404157.

Bug: https://bugs.gentoo.org/404157
Bug: https://bugs.gentoo.org/710432
Signed-off-by: Zac Medico <zmedico@gentoo.org>
---
[PATCH v2] records failed packages for correct interaction with
emerge --keep-going, which fixes bug 404157

 lib/_emerge/Scheduler.py | 142 +++++++++++++++++++++++++++------------
 1 file changed, 99 insertions(+), 43 deletions(-)

diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py
index a69421288..465f928a0 100644
--- a/lib/_emerge/Scheduler.py
+++ b/lib/_emerge/Scheduler.py
@@ -25,6 +25,7 @@ from portage._sets import SETPREFIX
 from portage._sets.base import InternalPackageSet
 from portage.util import ensure_dirs, writemsg, writemsg_level
 from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
 from portage.util.SlotObject import SlotObject
 from portage.util._async.SchedulerInterface import SchedulerInterface
 from portage.package.ebuild.digestcheck import digestcheck
@@ -766,7 +767,8 @@ class Scheduler(PollScheduler):
 
 		return prefetcher
 
-	def _run_pkg_pretend(self):
+	@coroutine
+	def _run_pkg_pretend(self, loop=None):
 		"""
 		Since pkg_pretend output may be important, this method sends all
 		output directly to stdout (regardless of options like --quiet or
@@ -774,7 +776,7 @@ class Scheduler(PollScheduler):
 		"""
 
 		failures = 0
-		sched_iface = self._sched_iface
+		sched_iface = loop = asyncio._wrap_loop(loop or self._sched_iface)
 
 		for x in self._mergelist:
 			if not isinstance(x, Package):
@@ -789,18 +791,28 @@ class Scheduler(PollScheduler):
 			if "pretend" not in x.defined_phases:
 				continue
 
-			out_str =">>> Running pre-merge checks for " + colorize("INFORM", x.cpv) + "\n"
-			portage.util.writemsg_stdout(out_str, noiselevel=-1)
+			out_str = "Running pre-merge checks for " + colorize("INFORM", x.cpv)
+			self._status_msg(out_str)
 
 			root_config = x.root_config
-			settings = self.pkgsettings[root_config.root]
+			settings = self._allocate_config(root_config.root)
 			settings.setcpv(x)
+			if not x.built:
+				# Get required SRC_URI metadata (it's not cached in x.metadata
+				# because some packages have an extremely large SRC_URI value).
+				portdb = root_config.trees["porttree"].dbapi
+				(settings.configdict["pkg"]["SRC_URI"],) = yield portdb.async_aux_get(
+					x.cpv, ["SRC_URI"], myrepo=x.repo, loop=loop
+				)
 
 			# setcpv/package.env allows for per-package PORTAGE_TMPDIR so we
 			# have to validate it for each package
 			rval = _check_temp_dir(settings)
 			if rval != os.EX_OK:
-				return rval
+				failures += 1
+				self._record_pkg_failure(x, settings, FAILURE)
+				self._deallocate_config(settings)
+				continue
 
 			build_dir_path = os.path.join(
 				os.path.realpath(settings["PORTAGE_TMPDIR"]),
@@ -809,7 +821,7 @@ class Scheduler(PollScheduler):
 			settings["PORTAGE_BUILDDIR"] = build_dir_path
 			build_dir = EbuildBuildDir(scheduler=sched_iface,
 				settings=settings)
-			sched_iface.run_until_complete(build_dir.async_lock())
+			yield build_dir.async_lock()
 			current_task = None
 
 			try:
@@ -835,7 +847,7 @@ class Scheduler(PollScheduler):
 						phase='clean', scheduler=sched_iface, settings=settings)
 					current_task = clean_phase
 					clean_phase.start()
-					clean_phase.wait()
+					yield clean_phase.async_wait()
 
 				if x.built:
 					tree = "bintree"
@@ -845,13 +857,19 @@ class Scheduler(PollScheduler):
 					# Display fetch on stdout, so that it's always clear what
 					# is consuming time here.
 					if bintree.isremote(x.cpv):
-						fetcher = BinpkgFetcher(pkg=x,
-							scheduler=sched_iface)
-						fetcher.start()
-						if fetcher.wait() != os.EX_OK:
+						fetcher = self._get_prefetcher(x)
+						if fetcher is None:
+							fetcher = BinpkgFetcher(pkg=x, scheduler=loop)
+							fetcher.start()
+							# We only set the fetched value when fetcher
+							# is a BinpkgFetcher, since BinpkgPrefetcher
+							# handles fetch, verification, and the
+							# bintree.inject call which moves the file.
+							fetched = fetcher.pkg_path
+						if (yield fetcher.async_wait()) != os.EX_OK:
 							failures += 1
+							self._record_pkg_failure(x, settings, fetcher.returncode)
 							continue
-						fetched = fetcher.pkg_path
 
 					if fetched is False:
 						filename = bintree.getname(x.cpv)
@@ -861,8 +879,9 @@ class Scheduler(PollScheduler):
 						scheduler=sched_iface, _pkg_path=filename)
 					current_task = verifier
 					verifier.start()
-					if verifier.wait() != os.EX_OK:
+					if (yield verifier.async_wait()) != os.EX_OK:
 						failures += 1
+						self._record_pkg_failure(x, settings, verifier.returncode)
 						continue
 
 					if fetched:
@@ -870,8 +889,7 @@ class Scheduler(PollScheduler):
 
 					infloc = os.path.join(build_dir_path, "build-info")
 					ensure_dirs(infloc)
-					self._sched_iface.run_until_complete(
-						bintree.dbapi.unpack_metadata(settings, infloc, loop=self._sched_iface))
+					yield bintree.dbapi.unpack_metadata(settings, infloc, loop=loop)
 					ebuild_path = os.path.join(infloc, x.pf + ".ebuild")
 					settings.configdict["pkg"]["EMERGE_FROM"] = "binary"
 					settings.configdict["pkg"]["MERGE_TYPE"] = "binary"
@@ -905,28 +923,45 @@ class Scheduler(PollScheduler):
 
 				current_task = pretend_phase
 				pretend_phase.start()
-				ret = pretend_phase.wait()
+				ret = yield pretend_phase.async_wait()
 				if ret != os.EX_OK:
 					failures += 1
+					self._record_pkg_failure(x, settings, ret)
 				portage.elog.elog_process(x.cpv, settings)
 			finally:
 
 				if current_task is not None:
 					if current_task.isAlive():
 						current_task.cancel()
-						current_task.wait()
 					if current_task.returncode == os.EX_OK:
 						clean_phase = EbuildPhase(background=False,
 							phase='clean', scheduler=sched_iface,
 							settings=settings)
 						clean_phase.start()
-						clean_phase.wait()
+						yield clean_phase.async_wait()
 
-				sched_iface.run_until_complete(build_dir.async_unlock())
+				yield build_dir.async_unlock()
+				self._deallocate_config(settings)
 
 		if failures:
-			return FAILURE
-		return os.EX_OK
+			return coroutine_return(FAILURE)
+		coroutine_return(os.EX_OK)
+
+	def _record_pkg_failure(self, pkg, settings, ret):
+		"""Record a package failure. This eliminates the package
+		from the --keep-going merge list, and immediately calls
+		_failed_pkg_msg if we have not been terminated."""
+		self._failed_pkgs.append(
+			self._failed_pkg(
+				build_dir=settings.get("PORTAGE_BUILDDIR"),
+				build_log=settings.get("PORTAGE_LOG_FILE"),
+				pkg=pkg,
+				returncode=ret,
+			)
+		)
+		if not self._terminated_tasks:
+			self._failed_pkg_msg(self._failed_pkgs[-1], "emerge", "for")
+			self._status_display.failed = len(self._failed_pkgs)
 
 	def merge(self):
 		if "--resume" in self.myopts:
@@ -988,11 +1023,6 @@ class Scheduler(PollScheduler):
 		if rval != os.EX_OK and not keep_going:
 			return rval
 
-		if not fetchonly:
-			rval = self._run_pkg_pretend()
-			if rval != os.EX_OK:
-				return rval
-
 		while True:
 
 			received_signal = []
@@ -1389,8 +1419,6 @@ class Scheduler(PollScheduler):
 		if self._opts_no_background.intersection(self.myopts):
 			self._set_max_jobs(1)
 
-		self._add_prefetchers()
-		self._add_packages()
 		failed_pkgs = self._failed_pkgs
 		portage.locks._quiet = self._background
 		portage.elog.add_listener(self._elog_listener)
@@ -1406,6 +1434,30 @@ class Scheduler(PollScheduler):
 		rval = os.EX_OK
 
 		try:
+			self._add_prefetchers()
+			if not self._build_opts.fetchonly:
+				# Run pkg_pretend concurrently with parallel-fetch, and be careful
+				# to respond appropriately to termination, so that we don't start
+				# any new tasks after we've been terminated. Temporarily make the
+				# status display quiet so that its output is not interleaved with
+				# pkg_pretend output.
+				status_quiet = self._status_display.quiet
+				self._status_display.quiet = True
+				try:
+					rval = self._sched_iface.run_until_complete(
+						self._run_pkg_pretend(loop=self._sched_iface)
+					)
+				except asyncio.CancelledError:
+					self.terminate()
+				finally:
+					self._status_display.quiet = status_quiet
+				self._termination_check()
+				if self._terminated_tasks:
+					rval = 128 + signal.SIGINT
+				if rval != os.EX_OK:
+					return rval
+
+			self._add_packages()
 			self._main_loop()
 		finally:
 			self._main_loop_cleanup()
@@ -1742,6 +1794,23 @@ class Scheduler(PollScheduler):
 
 		return bool(state_change)
 
+	def _get_prefetcher(self, pkg):
+		try:
+			prefetcher = self._prefetchers.pop(pkg, None)
+		except KeyError:
+			# KeyError observed with PyPy 1.8, despite None given as default.
+			# Note that PyPy 1.8 has the same WeakValueDictionary code as
+			# CPython 2.7, so it may be possible for CPython to raise KeyError
+			# here as well.
+			prefetcher = None
+		if prefetcher is not None and not prefetcher.isAlive():
+			try:
+				self._task_queues.fetch._task_queue.remove(prefetcher)
+			except ValueError:
+				pass
+			prefetcher = None
+		return prefetcher
+
 	def _task(self, pkg):
 
 		pkg_to_replace = None
@@ -1758,20 +1827,7 @@ class Scheduler(PollScheduler):
 					"installed", pkg.root_config, installed=True,
 					operation="uninstall")
 
-		try:
-			prefetcher = self._prefetchers.pop(pkg, None)
-		except KeyError:
-			# KeyError observed with PyPy 1.8, despite None given as default.
-			# Note that PyPy 1.8 has the same WeakValueDictionary code as
-			# CPython 2.7, so it may be possible for CPython to raise KeyError
-			# here as well.
-			prefetcher = None
-		if prefetcher is not None and not prefetcher.isAlive():
-			try:
-				self._task_queues.fetch._task_queue.remove(prefetcher)
-			except ValueError:
-				pass
-			prefetcher = None
+		prefetcher = self._get_prefetcher(pkg)
 
 		task = MergeListItem(args_set=self._args_set,
 			background=self._background, binpkg_opts=self._binpkg_opts,
-- 
2.25.3



      reply	other threads:[~2020-09-21  5:09 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-09-20  2:05 [gentoo-portage-dev] [PATCH] emerge: enable parallel-fetch during pkg_pretend (bug 710432) Zac Medico
2020-09-21  5:06 ` Zac Medico [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200921050656.460304-1-zmedico@gentoo.org \
    --to=zmedico@gentoo.org \
    --cc=gentoo-portage-dev@lists.gentoo.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox