public inbox for gentoo-portage-dev@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-portage-dev] [PATCH 0/2] Support PORTAGE_LOG_FILTER_FILE_CMD (bug 709746)
@ 2020-06-19 20:39 Zac Medico
  2020-06-19 20:39 ` [gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe " Zac Medico
  2020-06-19 20:39 ` [gentoo-portage-dev] [PATCH 2/2] Support PORTAGE_LOG_FILTER_FILE_CMD " Zac Medico
  0 siblings, 2 replies; 6+ messages in thread
From: Zac Medico @ 2020-06-19 20:39 UTC (permalink / raw
  To: gentoo-portage-dev; +Cc: Zac Medico

This variable specifies a command that filters build log output to a
log file. The plan is to extend this to support a separate filter for
tty output in the future.

Previous versions of these patches were affected by bug 716636 which
was due to unsafe remove_reader and remove_writer calls in finally
clauses of the PipeLogger _io_loop coroutine. The remove_reader and
remove_writer calls are now skipped if the corresponding file object
has already been closed (which is normal if the coroutine has been
cancelled). Since this kind of bug is not easy to reproduce, currently
Rick Farina <zerochaos@gentoo.org> is testing the patches in order to
verify that they do not trigger emerge hangs like bug 716636.

Bug: https://bugs.gentoo.org/709746
Bug: https://bugs.gentoo.org/716636

Zac Medico (2):
  PipeLogger: non-blocking write to pipe (bug 709746)
  Support PORTAGE_LOG_FILTER_FILE_CMD (bug 709746)

 lib/_emerge/AbstractEbuildProcess.py          |   3 +-
 lib/_emerge/BinpkgFetcher.py                  |   3 +-
 lib/_emerge/EbuildFetcher.py                  |   3 +-
 lib/_emerge/EbuildPhase.py                    |  47 ++++++--
 lib/_emerge/SpawnProcess.py                   |  58 +++++++---
 lib/portage/dbapi/_MergeProcess.py            |   3 +-
 .../ebuild/_config/special_env_vars.py        |   8 +-
 lib/portage/tests/process/test_PipeLogger.py  |  58 ++++++++++
 lib/portage/util/_async/BuildLogger.py        | 109 ++++++++++++++++++
 lib/portage/util/_async/PipeLogger.py         |  73 +++++++++---
 lib/portage/util/_async/SchedulerInterface.py |  32 ++++-
 man/make.conf.5                               |   7 +-
 12 files changed, 358 insertions(+), 46 deletions(-)
 create mode 100644 lib/portage/tests/process/test_PipeLogger.py
 create mode 100644 lib/portage/util/_async/BuildLogger.py

-- 
2.25.3



^ permalink raw reply	[flat|nested] 6+ messages in thread

* [gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe (bug 709746)
  2020-06-19 20:39 [gentoo-portage-dev] [PATCH 0/2] Support PORTAGE_LOG_FILTER_FILE_CMD (bug 709746) Zac Medico
@ 2020-06-19 20:39 ` Zac Medico
  2020-06-22 14:35   ` Brian Dolbec
  2020-06-19 20:39 ` [gentoo-portage-dev] [PATCH 2/2] Support PORTAGE_LOG_FILTER_FILE_CMD " Zac Medico
  1 sibling, 1 reply; 6+ messages in thread
From: Zac Medico @ 2020-06-19 20:39 UTC (permalink / raw
  To: gentoo-portage-dev; +Cc: Zac Medico

Add support to write to a non-blocking pipe instead of a
log file. This is needed for the purposes of bug 709746,
where PipeLogger will write to a pipe that is drained
by anoher PipeLogger instance which is running in the same
process.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico <zmedico@gentoo.org>
---
 lib/portage/tests/process/test_PipeLogger.py | 58 ++++++++++++++++
 lib/portage/util/_async/PipeLogger.py        | 73 +++++++++++++++-----
 2 files changed, 115 insertions(+), 16 deletions(-)
 create mode 100644 lib/portage/tests/process/test_PipeLogger.py

diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py
new file mode 100644
index 000000000..2bd94cf39
--- /dev/null
+++ b/lib/portage/tests/process/test_PipeLogger.py
@@ -0,0 +1,58 @@
+# Copyright 2020 Gentoo Authors
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._async.PipeLogger import PipeLogger
+from portage.util.futures import asyncio
+from portage.util.futures._asyncio.streams import _reader, _writer
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
+from portage.util.futures.unix_events import _set_nonblocking
+
+
+class PipeLoggerTestCase(TestCase):
+
+	@coroutine
+	def _testPipeLoggerToPipe(self, test_string, loop=None):
+		"""
+		Test PipeLogger writing to a pipe connected to a PipeReader.
+		This verifies that PipeLogger does not deadlock when writing
+		to a pipe that's drained by a PipeReader running in the same
+		process (requires non-blocking write).
+		"""
+
+		input_fd, writer_pipe = os.pipe()
+		_set_nonblocking(writer_pipe)
+		writer_pipe = os.fdopen(writer_pipe, 'wb', 0)
+		writer = asyncio.ensure_future(_writer(writer_pipe, test_string.encode('ascii'), loop=loop), loop=loop)
+		writer.add_done_callback(lambda writer: writer_pipe.close())
+
+		pr, pw = os.pipe()
+
+		consumer = PipeLogger(background=True,
+			input_fd=input_fd,
+			log_file_path=os.fdopen(pw, 'wb', 0),
+			scheduler=loop)
+		consumer.start()
+
+		# Before starting the reader, wait here for a moment, in order
+		# to exercise PipeLogger's handling of EAGAIN during write.
+		yield asyncio.wait([writer], timeout=0.01)
+
+		reader = _reader(pr, loop=loop)
+		yield writer
+		content = yield reader
+		yield consumer.async_wait()
+
+		self.assertEqual(consumer.returncode, os.EX_OK)
+
+		coroutine_return(content.decode('ascii', 'replace'))
+
+	def testPipeLogger(self):
+		loop = asyncio._wrap_loop()
+
+		for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**17, 2**17 + 1):
+			test_string = x * "a"
+			output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
+			self.assertEqual(test_string, output,
+				"x = %s, len(output) = %s" % (x, len(output)))
diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index a4258f350..ce8afb846 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,6 +8,10 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
+from portage.util.futures import asyncio
+from portage.util.futures._asyncio.streams import _writer
+from portage.util.futures.compat_coroutine import coroutine
+from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask):
 	"""
 
 	__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-		("_log_file", "_log_file_real")
+		("_io_loop_task", "_log_file", "_log_file_nb", "_log_file_real")
 
 	def _start(self):
 
 		log_file_path = self.log_file_path
-		if log_file_path is not None:
-
+		if hasattr(log_file_path, 'write'):
+			self._log_file_nb = True
+			self._log_file = log_file_path
+			_set_nonblocking(self._log_file.fileno())
+		elif log_file_path is not None:
 			self._log_file = open(_unicode_encode(log_file_path,
 				encoding=_encodings['fs'], errors='strict'), mode='ab')
 			if log_file_path.endswith('.gz'):
@@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask):
 				mode=0o660)
 
 		if isinstance(self.input_fd, int):
-			fd = self.input_fd
-		else:
-			fd = self.input_fd.fileno()
+			self.input_fd = os.fdopen(self.input_fd, 'rb', 0)
+
+		fd = self.input_fd.fileno()
 
 		fcntl.fcntl(fd, fcntl.F_SETFL,
 			fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
@@ -57,7 +64,8 @@ class PipeLogger(AbstractPollTask):
 				fcntl.fcntl(fd, fcntl.F_SETFD,
 					fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
 
-		self.scheduler.add_reader(fd, self._output_handler, fd)
+		self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd), loop=self.scheduler)
+		self._io_loop_task.add_done_callback(self._io_loop_done)
 		self._registered = True
 
 	def _cancel(self):
@@ -65,25 +73,36 @@ class PipeLogger(AbstractPollTask):
 		if self.returncode is None:
 			self.returncode = self._cancelled_returncode
 
-	def _output_handler(self, fd):
-
+	@coroutine
+	def _io_loop(self, input_file):
 		background = self.background
 		stdout_fd = self.stdout_fd
 		log_file = self._log_file 
+		fd = input_file.fileno()
 
 		while True:
 			buf = self._read_buf(fd)
 
 			if buf is None:
 				# not a POLLIN event, EAGAIN, etc...
-				break
+				future = self.scheduler.create_future()
+				self.scheduler.add_reader(fd, future.set_result, None)
+				try:
+					yield future
+				finally:
+					# The loop and input file may have been closed.
+					if not self.scheduler.is_closed():
+						future.done() or future.cancel()
+						# Do not call remove_reader in cases where fd has
+						# been closed and then re-allocated to a concurrent
+						# coroutine as in bug 716636.
+						if not input_file.closed:
+							self.scheduler.remove_reader(fd)
+				continue
 
 			if not buf:
 				# EOF
-				self._unregister()
-				self.returncode = self.returncode or os.EX_OK
-				self._async_wait()
-				break
+				return
 
 			else:
 				if not background and stdout_fd is not None:
@@ -120,8 +139,25 @@ class PipeLogger(AbstractPollTask):
 								fcntl.F_GETFL) ^ os.O_NONBLOCK)
 
 				if log_file is not None:
-					log_file.write(buf)
-					log_file.flush()
+					if self._log_file_nb:
+						# Use the _writer function which uses os.write, since the
+						# log_file.write method looses data when an EAGAIN occurs.
+						yield _writer(log_file, buf, loop=self.scheduler)
+					else:
+						# For gzip.GzipFile instances, the above _writer function
+						# will not work because data written directly to the file
+						# descriptor bypasses compression.
+						log_file.write(buf)
+						log_file.flush()
+
+	def _io_loop_done(self, future):
+		try:
+			future.result()
+		except asyncio.CancelledError:
+			self.cancel()
+			self._was_cancelled()
+		self.returncode = self.returncode or os.EX_OK
+		self._async_wait()
 
 	def _unregister(self):
 		if self.input_fd is not None:
@@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask):
 				self.input_fd.close()
 			self.input_fd = None
 
+		if self._io_loop_task is not None:
+			self._io_loop_task.done() or self._io_loop_task.cancel()
+			self._io_loop_task = None
+
 		if self.stdout_fd is not None:
 			os.close(self.stdout_fd)
 			self.stdout_fd = None
 
 		if self._log_file is not None:
+			self.scheduler.remove_writer(self._log_file.fileno())
 			self._log_file.close()
 			self._log_file = None
 
-- 
2.25.3



^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [gentoo-portage-dev] [PATCH 2/2] Support PORTAGE_LOG_FILTER_FILE_CMD (bug 709746)
  2020-06-19 20:39 [gentoo-portage-dev] [PATCH 0/2] Support PORTAGE_LOG_FILTER_FILE_CMD (bug 709746) Zac Medico
  2020-06-19 20:39 ` [gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe " Zac Medico
@ 2020-06-19 20:39 ` Zac Medico
  2020-06-22 14:46   ` Brian Dolbec
  1 sibling, 1 reply; 6+ messages in thread
From: Zac Medico @ 2020-06-19 20:39 UTC (permalink / raw
  To: gentoo-portage-dev; +Cc: Zac Medico

This variable specifies a command that filters build log output to a
log file. The plan is to extend this to support a separate filter for
tty output in the future.

In order to enable the EbuildPhase class to write elog messages to
the build log with PORTAGE_LOG_FILTER_FILE_CMD support, convert its
_elog method to a coroutine, and add a SchedulerInterface async_output
method for it to use.

Use a new BuildLogger class to manage log output (with or without a
filter command), with compression support provided by PipeLogger.
BuildLogger has a stdin property which provides access to a writable
binary file stream (refers to a pipe) that log content is written to.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico <zmedico@gentoo.org>
---
 lib/_emerge/AbstractEbuildProcess.py          |   3 +-
 lib/_emerge/BinpkgFetcher.py                  |   3 +-
 lib/_emerge/EbuildFetcher.py                  |   3 +-
 lib/_emerge/EbuildPhase.py                    |  47 ++++++--
 lib/_emerge/SpawnProcess.py                   |  58 +++++++---
 lib/portage/dbapi/_MergeProcess.py            |   3 +-
 .../ebuild/_config/special_env_vars.py        |   8 +-
 lib/portage/util/_async/BuildLogger.py        | 109 ++++++++++++++++++
 lib/portage/util/_async/SchedulerInterface.py |  32 ++++-
 man/make.conf.5                               |   7 +-
 10 files changed, 243 insertions(+), 30 deletions(-)
 create mode 100644 lib/portage/util/_async/BuildLogger.py

diff --git a/lib/_emerge/AbstractEbuildProcess.py b/lib/_emerge/AbstractEbuildProcess.py
index 1c1955cfe..ae1aae55f 100644
--- a/lib/_emerge/AbstractEbuildProcess.py
+++ b/lib/_emerge/AbstractEbuildProcess.py
@@ -1,4 +1,4 @@
-# Copyright 1999-2019 Gentoo Foundation
+# Copyright 1999-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import errno
@@ -196,6 +196,7 @@ class AbstractEbuildProcess(SpawnProcess):
 			null_fd = os.open('/dev/null', os.O_RDONLY)
 			self.fd_pipes[0] = null_fd
 
+		self.log_filter_file = self.settings.get('PORTAGE_LOG_FILTER_FILE_CMD')
 		try:
 			SpawnProcess._start(self)
 		finally:
diff --git a/lib/_emerge/BinpkgFetcher.py b/lib/_emerge/BinpkgFetcher.py
index 36d027de3..2e5861cc1 100644
--- a/lib/_emerge/BinpkgFetcher.py
+++ b/lib/_emerge/BinpkgFetcher.py
@@ -1,4 +1,4 @@
-# Copyright 1999-2018 Gentoo Foundation
+# Copyright 1999-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import functools
@@ -158,6 +158,7 @@ class _BinpkgFetcherProcess(SpawnProcess):
 		self.env = fetch_env
 		if settings.selinux_enabled():
 			self._selinux_type = settings["PORTAGE_FETCH_T"]
+		self.log_filter_file = settings.get('PORTAGE_LOG_FILTER_FILE_CMD')
 		SpawnProcess._start(self)
 
 	def _pipe(self, fd_pipes):
diff --git a/lib/_emerge/EbuildFetcher.py b/lib/_emerge/EbuildFetcher.py
index 1e40994fb..55349c33c 100644
--- a/lib/_emerge/EbuildFetcher.py
+++ b/lib/_emerge/EbuildFetcher.py
@@ -1,4 +1,4 @@
-# Copyright 1999-2018 Gentoo Foundation
+# Copyright 1999-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import copy
@@ -225,6 +225,7 @@ class _EbuildFetcherProcess(ForkProcess):
 			settings["NOCOLOR"] = nocolor
 
 		self._settings = settings
+		self.log_filter_file = settings.get('PORTAGE_LOG_FILTER_FILE_CMD')
 		ForkProcess._start(self)
 
 		# Free settings now since it's no longer needed in
diff --git a/lib/_emerge/EbuildPhase.py b/lib/_emerge/EbuildPhase.py
index 477e0ba97..ddb3dc719 100644
--- a/lib/_emerge/EbuildPhase.py
+++ b/lib/_emerge/EbuildPhase.py
@@ -26,6 +26,8 @@ from portage.package.ebuild.prepare_build_dirs import (_prepare_workdir,
 from portage.util.futures.compat_coroutine import coroutine
 from portage.util import writemsg
 from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
+from portage.util._async.BuildLogger import BuildLogger
+from portage.util.futures import asyncio
 from portage.util.futures.executor.fork import ForkExecutor
 
 try:
@@ -69,6 +71,11 @@ class EbuildPhase(CompositeTask):
 	_locked_phases = ("setup", "preinst", "postinst", "prerm", "postrm")
 
 	def _start(self):
+		future = asyncio.ensure_future(self._async_start(), loop=self.scheduler)
+		self._start_task(AsyncTaskFuture(future=future), self._async_start_exit)
+
+	@coroutine
+	def _async_start(self):
 
 		need_builddir = self.phase not in EbuildProcess._phases_without_builddir
 
@@ -126,7 +133,7 @@ class EbuildPhase(CompositeTask):
 			# Force background=True for this header since it's intended
 			# for the log and it doesn't necessarily need to be visible
 			# elsewhere.
-			self._elog('einfo', msg, background=True)
+			yield self._elog('einfo', msg, background=True)
 
 		if self.phase == 'package':
 			if 'PORTAGE_BINPKG_TMPFILE' not in self.settings:
@@ -134,6 +141,12 @@ class EbuildPhase(CompositeTask):
 					os.path.join(self.settings['PKGDIR'],
 					self.settings['CATEGORY'], self.settings['PF']) + '.tbz2'
 
+	def _async_start_exit(self, task):
+		task.future.cancelled() or task.future.result()
+		if self._default_exit(task) != os.EX_OK:
+			self.wait()
+			return
+
 		if self.phase in ("pretend", "prerm"):
 			env_extractor = BinpkgEnvExtractor(background=self.background,
 				scheduler=self.scheduler, settings=self.settings)
@@ -391,6 +404,7 @@ class EbuildPhase(CompositeTask):
 		self.returncode = 1
 		self.wait()
 
+	@coroutine
 	def _elog(self, elog_funcname, lines, background=None):
 		if background is None:
 			background = self.background
@@ -407,11 +421,30 @@ class EbuildPhase(CompositeTask):
 			portage.output.havecolor = global_havecolor
 		msg = out.getvalue()
 		if msg:
-			log_path = None
-			if self.settings.get("PORTAGE_BACKGROUND") != "subprocess":
-				log_path = self.settings.get("PORTAGE_LOG_FILE")
-			self.scheduler.output(msg, log_path=log_path,
-				background=background)
+			build_logger = None
+			try:
+				log_file = None
+				log_path = None
+				if self.settings.get("PORTAGE_BACKGROUND") != "subprocess":
+					log_path = self.settings.get("PORTAGE_LOG_FILE")
+				if log_path:
+					build_logger = BuildLogger(env=self.settings.environ(),
+						log_path=log_path,
+						log_filter_file=self.settings.get('PORTAGE_LOG_FILTER_FILE_CMD'),
+						scheduler=self.scheduler)
+					build_logger.start()
+					log_file = build_logger.stdin
+
+				yield self.scheduler.async_output(msg, log_file=log_file,
+					background=background)
+
+				if build_logger is not None:
+					build_logger.stdin.close()
+					yield build_logger.async_wait()
+			except asyncio.CancelledError:
+				if build_logger is not None:
+					build_logger.cancel()
+				raise
 
 
 class _PostPhaseCommands(CompositeTask):
@@ -480,4 +513,4 @@ class _PostPhaseCommands(CompositeTask):
 			qa_msg.extend("\t%s: %s" % (filename, " ".join(sorted(soname_deps)))
 				for filename, soname_deps in unresolved)
 			qa_msg.append("")
-			self.elog("eqawarn", qa_msg)
+			yield self.elog("eqawarn", qa_msg)
diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py
index 395d66bb9..f96911571 100644
--- a/lib/_emerge/SpawnProcess.py
+++ b/lib/_emerge/SpawnProcess.py
@@ -1,4 +1,4 @@
-# Copyright 2008-2018 Gentoo Foundation
+# Copyright 2008-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 try:
@@ -19,7 +19,10 @@ from portage.const import BASH_BINARY
 from portage.localization import _
 from portage.output import EOutput
 from portage.util import writemsg_level
+from portage.util._async.BuildLogger import BuildLogger
 from portage.util._async.PipeLogger import PipeLogger
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
 
 class SpawnProcess(SubProcess):
 
@@ -34,8 +37,8 @@ class SpawnProcess(SubProcess):
 		"path_lookup", "pre_exec", "close_fds", "cgroup",
 		"unshare_ipc", "unshare_mount", "unshare_pid", "unshare_net")
 
-	__slots__ = ("args",) + \
-		_spawn_kwarg_names + ("_pipe_logger", "_selinux_type",)
+	__slots__ = ("args", "log_filter_file") + \
+		_spawn_kwarg_names + ("_main_task", "_selinux_type",)
 
 	# Max number of attempts to kill the processes listed in cgroup.procs,
 	# given that processes may fork before they can be killed.
@@ -137,13 +140,43 @@ class SpawnProcess(SubProcess):
 						fcntl.fcntl(stdout_fd,
 						fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
 
-		self._pipe_logger = PipeLogger(background=self.background,
+		build_logger = BuildLogger(env=self.env,
+			log_path=log_file_path,
+			log_filter_file=self.log_filter_file,
+			scheduler=self.scheduler)
+		build_logger.start()
+
+		pipe_logger = PipeLogger(background=self.background,
 			scheduler=self.scheduler, input_fd=master_fd,
-			log_file_path=log_file_path,
+			log_file_path=build_logger.stdin,
 			stdout_fd=stdout_fd)
-		self._pipe_logger.addExitListener(self._pipe_logger_exit)
-		self._pipe_logger.start()
+
+		pipe_logger.start()
+
 		self._registered = True
+		self._main_task = asyncio.ensure_future(self._main(build_logger, pipe_logger), loop=self.scheduler)
+		self._main_task.add_done_callback(self._main_exit)
+
+	@coroutine
+	def _main(self, build_logger, pipe_logger):
+		try:
+			if pipe_logger.poll() is None:
+				yield pipe_logger.async_wait()
+			if build_logger.poll() is None:
+				yield build_logger.async_wait()
+		except asyncio.CancelledError:
+			if pipe_logger.poll() is None:
+				pipe_logger.cancel()
+			if build_logger.poll() is None:
+				build_logger.cancel()
+			raise
+
+	def _main_exit(self, main_task):
+		try:
+			main_task.result()
+		except asyncio.CancelledError:
+			self.cancel()
+		self._async_waitpid()
 
 	def _can_log(self, slave_fd):
 		return True
@@ -167,20 +200,17 @@ class SpawnProcess(SubProcess):
 
 		return spawn_func(args, **kwargs)
 
-	def _pipe_logger_exit(self, pipe_logger):
-		self._pipe_logger = None
-		self._async_waitpid()
-
 	def _unregister(self):
 		SubProcess._unregister(self)
 		if self.cgroup is not None:
 			self._cgroup_cleanup()
 			self.cgroup = None
-		if self._pipe_logger is not None:
-			self._pipe_logger.cancel()
-			self._pipe_logger = None
+		if self._main_task is not None:
+			self._main_task.done() or self._main_task.cancel()
 
 	def _cancel(self):
+		if self._main_task is not None:
+			self._main_task.done() or self._main_task.cancel()
 		SubProcess._cancel(self)
 		self._cgroup_cleanup()
 
diff --git a/lib/portage/dbapi/_MergeProcess.py b/lib/portage/dbapi/_MergeProcess.py
index 371550079..236d1a255 100644
--- a/lib/portage/dbapi/_MergeProcess.py
+++ b/lib/portage/dbapi/_MergeProcess.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2018 Gentoo Foundation
+# Copyright 2010-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import io
@@ -57,6 +57,7 @@ class MergeProcess(ForkProcess):
 			self.fd_pipes = self.fd_pipes.copy()
 		self.fd_pipes.setdefault(0, portage._get_stdin().fileno())
 
+		self.log_filter_file = self.settings.get('PORTAGE_LOG_FILTER_FILE_CMD')
 		super(MergeProcess, self)._start()
 
 	def _lock_vdb(self):
diff --git a/lib/portage/package/ebuild/_config/special_env_vars.py b/lib/portage/package/ebuild/_config/special_env_vars.py
index 440dd00b2..f44cb9b1b 100644
--- a/lib/portage/package/ebuild/_config/special_env_vars.py
+++ b/lib/portage/package/ebuild/_config/special_env_vars.py
@@ -1,4 +1,4 @@
-# Copyright 2010-2019 Gentoo Authors
+# Copyright 2010-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 from __future__ import unicode_literals
@@ -175,7 +175,7 @@ environ_filter += [
 	"PORTAGE_RO_DISTDIRS",
 	"PORTAGE_RSYNC_EXTRA_OPTS", "PORTAGE_RSYNC_OPTS",
 	"PORTAGE_RSYNC_RETRIES", "PORTAGE_SSH_OPTS", "PORTAGE_SYNC_STALE",
-	"PORTAGE_USE",
+	"PORTAGE_USE", "PORTAGE_LOG_FILTER_FILE_CMD",
 	"PORTAGE_LOGDIR", "PORTAGE_LOGDIR_CLEAN",
 	"QUICKPKG_DEFAULT_OPTS", "REPOMAN_DEFAULT_OPTS",
 	"RESUMECOMMAND", "RESUMECOMMAND_FTP",
@@ -204,7 +204,9 @@ default_globals = {
 	'PORTAGE_BZIP2_COMMAND':    'bzip2',
 }
 
-validate_commands = ('PORTAGE_BZIP2_COMMAND', 'PORTAGE_BUNZIP2_COMMAND',)
+validate_commands = ('PORTAGE_BZIP2_COMMAND', 'PORTAGE_BUNZIP2_COMMAND',
+	'PORTAGE_LOG_FILTER_FILE_CMD',
+)
 
 # To enhance usability, make some vars case insensitive
 # by forcing them to lower case.
diff --git a/lib/portage/util/_async/BuildLogger.py b/lib/portage/util/_async/BuildLogger.py
new file mode 100644
index 000000000..f5fea77ea
--- /dev/null
+++ b/lib/portage/util/_async/BuildLogger.py
@@ -0,0 +1,109 @@
+# Copyright 2020 Gentoo Authors
+# Distributed under the terms of the GNU General Public License v2
+
+import subprocess
+
+from portage import os
+from portage.util import shlex_split
+from _emerge.AsynchronousTask import AsynchronousTask
+from portage.util._async.PipeLogger import PipeLogger
+from portage.util._async.PopenProcess import PopenProcess
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
+
+
+class BuildLogger(AsynchronousTask):
+	"""
+	Write to a log file, with compression support provided by PipeLogger.
+	If the log_filter_file parameter is specified, then it is interpreted
+	as a command to execute which filters log output (see the
+	PORTAGE_LOG_FILTER_FILE_CMD variable in make.conf(5)). The stdin property
+	provides access to a writable binary file stream (refers to a pipe)
+	that log content should be written to (usually redirected from
+	subprocess stdout and stderr streams).
+	"""
+
+	__slots__ = ('env', 'log_path', 'log_filter_file', '_main_task', '_stdin')
+
+	@property
+	def stdin(self):
+		return self._stdin
+
+	def _start(self):
+		filter_proc = None
+		log_input = None
+		if self.log_path is not None:
+			log_filter_file = self.log_filter_file
+			if log_filter_file is not None:
+				split_value = shlex_split(log_filter_file)
+				log_filter_file = split_value if split_value else None
+			if log_filter_file:
+				filter_input, stdin = os.pipe()
+				log_input, filter_output = os.pipe()
+				try:
+					filter_proc = PopenProcess(
+						proc=subprocess.Popen(
+							log_filter_file,
+							env=self.env,
+							stdin=filter_input,
+							stdout=filter_output,
+							stderr=filter_output,
+						),
+						scheduler=self.scheduler,
+					)
+					filter_proc.start()
+				except EnvironmentError:
+					# Maybe the command is missing or broken somehow...
+					os.close(filter_input)
+					os.close(stdin)
+					os.close(log_input)
+					os.close(filter_output)
+				else:
+					self._stdin = os.fdopen(stdin, 'wb', 0)
+					os.close(filter_input)
+					os.close(filter_output)
+
+		if self._stdin is None:
+			# Since log_filter_file is unspecified or refers to a file
+			# that is missing or broken somehow, create a pipe that
+			# logs directly to pipe_logger.
+			log_input, stdin = os.pipe()
+			self._stdin = os.fdopen(stdin, 'wb', 0)
+
+		# Set background=True so that pipe_logger does not log to stdout.
+		pipe_logger = PipeLogger(background=True,
+			scheduler=self.scheduler, input_fd=log_input,
+			log_file_path=self.log_path)
+		pipe_logger.start()
+
+		self._main_task = asyncio.ensure_future(self._main(filter_proc, pipe_logger), loop=self.scheduler)
+		self._main_task.add_done_callback(self._main_exit)
+
+	@coroutine
+	def _main(self, filter_proc, pipe_logger):
+		try:
+			if pipe_logger.poll() is None:
+				yield pipe_logger.async_wait()
+			if filter_proc is not None and filter_proc.poll() is None:
+				yield filter_proc.async_wait()
+		except asyncio.CancelledError:
+			if pipe_logger.poll() is None:
+				pipe_logger.cancel()
+			if filter_proc is not None and filter_proc.poll() is None:
+				filter_proc.cancel()
+			raise
+
+	def _cancel(self):
+		if self._main_task is not None:
+			self._main_task.done() or self._main_task.cancel()
+		if self._stdin is not None and not self._stdin.closed:
+			self._stdin.close()
+
+	def _main_exit(self, main_task):
+		try:
+			main_task.result()
+		except asyncio.CancelledError:
+			self.cancel()
+			self._was_cancelled()
+		self.returncode = self.returncode or 0
+		self._async_wait()
diff --git a/lib/portage/util/_async/SchedulerInterface.py b/lib/portage/util/_async/SchedulerInterface.py
index ec6417da1..3ff250d1d 100644
--- a/lib/portage/util/_async/SchedulerInterface.py
+++ b/lib/portage/util/_async/SchedulerInterface.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2018 Gentoo Foundation
+# Copyright 2012-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import gzip
@@ -7,6 +7,8 @@ import errno
 from portage import _encodings
 from portage import _unicode_encode
 from portage.util import writemsg_level
+from portage.util.futures._asyncio.streams import _writer
+from portage.util.futures.compat_coroutine import coroutine
 from ..SlotObject import SlotObject
 
 class SchedulerInterface(SlotObject):
@@ -53,6 +55,34 @@ class SchedulerInterface(SlotObject):
 	def _return_false():
 		return False
 
+	@coroutine
+	def async_output(self, msg, log_file=None, background=None,
+		level=0, noiselevel=-1):
+		"""
+		Output a msg to stdio (if not in background) and to a log file
+		if provided.
+
+		@param msg: a message string, including newline if appropriate
+		@type msg: str
+		@param log_file: log file in binary mode
+		@type log_file: file
+		@param background: send messages only to log (not to stdio)
+		@type background: bool
+		@param level: a numeric logging level (see the logging module)
+		@type level: int
+		@param noiselevel: passed directly to writemsg
+		@type noiselevel: int
+		"""
+		global_background = self._is_background()
+		if background is None or global_background:
+			background = global_background
+
+		if not background:
+			writemsg_level(msg, level=level, noiselevel=noiselevel)
+
+		if log_file is not None:
+			yield _writer(log_file, _unicode_encode(msg))
+
 	def output(self, msg, log_path=None, background=None,
 		level=0, noiselevel=-1):
 		"""
diff --git a/man/make.conf.5 b/man/make.conf.5
index a3bd662ae..eb812150f 100644
--- a/man/make.conf.5
+++ b/man/make.conf.5
@@ -1,4 +1,4 @@
-.TH "MAKE.CONF" "5" "May 2020" "Portage VERSION" "Portage"
+.TH "MAKE.CONF" "5" "Jun 2020" "Portage VERSION" "Portage"
 .SH "NAME"
 make.conf \- custom settings for Portage
 .SH "SYNOPSIS"
@@ -979,6 +979,11 @@ with an integer pid. For example, a value of "ionice \-c 3 \-p \\${PID}"
 will set idle io priority. For more information about ionice, see
 \fBionice\fR(1). This variable is unset by default.
 .TP
+.B PORTAGE_LOG_FILTER_FILE_CMD
+This variable specifies a command that filters build log output to a
+log file. In order to filter ANSI escape codes from build logs,
+\fBansifilter\fR(1) is a convenient setting for this variable.
+.TP
 .B PORTAGE_LOGDIR
 This variable defines the directory in which per\-ebuild logs are kept.
 Logs are created only when this is set. They are stored as
-- 
2.25.3



^ permalink raw reply related	[flat|nested] 6+ messages in thread

* Re: [gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe (bug 709746)
  2020-06-19 20:39 ` [gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe " Zac Medico
@ 2020-06-22 14:35   ` Brian Dolbec
  0 siblings, 0 replies; 6+ messages in thread
From: Brian Dolbec @ 2020-06-22 14:35 UTC (permalink / raw
  To: gentoo-portage-dev

On Fri, 19 Jun 2020 13:39:18 -0700
Zac Medico <zmedico@gentoo.org> wrote:

> Add support to write to a non-blocking pipe instead of a
> log file. This is needed for the purposes of bug 709746,
> where PipeLogger will write to a pipe that is drained
> by anoher PipeLogger instance which is running in the same
> process.
> 
> Bug: https://bugs.gentoo.org/709746
> Signed-off-by: Zac Medico <zmedico@gentoo.org>
> ---
>  lib/portage/tests/process/test_PipeLogger.py | 58 ++++++++++++++++
>  lib/portage/util/_async/PipeLogger.py        | 73
> +++++++++++++++----- 2 files changed, 115 insertions(+), 16
> deletions(-) create mode 100644
> lib/portage/tests/process/test_PipeLogger.py
> 
> diff --git a/lib/portage/tests/process/test_PipeLogger.py
> b/lib/portage/tests/process/test_PipeLogger.py new file mode 100644
> index 000000000..2bd94cf39
> --- /dev/null
> +++ b/lib/portage/tests/process/test_PipeLogger.py
> @@ -0,0 +1,58 @@
> +# Copyright 2020 Gentoo Authors
> +# Distributed under the terms of the GNU General Public License v2
> +
> +from portage import os
> +from portage.tests import TestCase
> +from portage.util._async.PipeLogger import PipeLogger
> +from portage.util.futures import asyncio
> +from portage.util.futures._asyncio.streams import _reader, _writer
> +from portage.util.futures.compat_coroutine import coroutine,
> coroutine_return +from portage.util.futures.unix_events import
> _set_nonblocking +
> +
> +class PipeLoggerTestCase(TestCase):
> +
> +	@coroutine
> +	def _testPipeLoggerToPipe(self, test_string, loop=None):
> +		"""
> +		Test PipeLogger writing to a pipe connected to a
> PipeReader.
> +		This verifies that PipeLogger does not deadlock when
> writing
> +		to a pipe that's drained by a PipeReader running in
> the same
> +		process (requires non-blocking write).
> +		"""
> +
> +		input_fd, writer_pipe = os.pipe()
> +		_set_nonblocking(writer_pipe)
> +		writer_pipe = os.fdopen(writer_pipe, 'wb', 0)
> +		writer = asyncio.ensure_future(_writer(writer_pipe,
> test_string.encode('ascii'), loop=loop), loop=loop)
> +		writer.add_done_callback(lambda writer:
> writer_pipe.close()) +
> +		pr, pw = os.pipe()
> +
> +		consumer = PipeLogger(background=True,
> +			input_fd=input_fd,
> +			log_file_path=os.fdopen(pw, 'wb', 0),
> +			scheduler=loop)
> +		consumer.start()
> +
> +		# Before starting the reader, wait here for a
> moment, in order
> +		# to exercise PipeLogger's handling of EAGAIN during
> write.
> +		yield asyncio.wait([writer], timeout=0.01)
> +
> +		reader = _reader(pr, loop=loop)
> +		yield writer
> +		content = yield reader
> +		yield consumer.async_wait()
> +
> +		self.assertEqual(consumer.returncode, os.EX_OK)
> +
> +		coroutine_return(content.decode('ascii', 'replace'))
> +
> +	def testPipeLogger(self):
> +		loop = asyncio._wrap_loop()
> +
> +		for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12,
> 2**13, 2**14, 2**17, 2**17 + 1):
> +			test_string = x * "a"
> +			output =
> loop.run_until_complete(self._testPipeLoggerToPipe(test_string,
> loop=loop))
> +			self.assertEqual(test_string, output,
> +				"x = %s, len(output) = %s" % (x,
> len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py
> b/lib/portage/util/_async/PipeLogger.py index a4258f350..ce8afb846
> 100644 --- a/lib/portage/util/_async/PipeLogger.py
> +++ b/lib/portage/util/_async/PipeLogger.py
> @@ -8,6 +8,10 @@ import sys
>  
>  import portage
>  from portage import os, _encodings, _unicode_encode
> +from portage.util.futures import asyncio
> +from portage.util.futures._asyncio.streams import _writer
> +from portage.util.futures.compat_coroutine import coroutine
> +from portage.util.futures.unix_events import _set_nonblocking
>  from _emerge.AbstractPollTask import AbstractPollTask
>  
>  class PipeLogger(AbstractPollTask):
> @@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask):
>  	"""
>  
>  	__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
> -		("_log_file", "_log_file_real")
> +		("_io_loop_task", "_log_file", "_log_file_nb",
> "_log_file_real") 
>  	def _start(self):
>  
>  		log_file_path = self.log_file_path
> -		if log_file_path is not None:
> -
> +		if hasattr(log_file_path, 'write'):
> +			self._log_file_nb = True
> +			self._log_file = log_file_path
> +			_set_nonblocking(self._log_file.fileno())
> +		elif log_file_path is not None:
>  			self._log_file =
> open(_unicode_encode(log_file_path, encoding=_encodings['fs'],
> errors='strict'), mode='ab') if log_file_path.endswith('.gz'):
> @@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask):
>  				mode=0o660)
>  
>  		if isinstance(self.input_fd, int):
> -			fd = self.input_fd
> -		else:
> -			fd = self.input_fd.fileno()
> +			self.input_fd = os.fdopen(self.input_fd,
> 'rb', 0) +
> +		fd = self.input_fd.fileno()
>  
>  		fcntl.fcntl(fd, fcntl.F_SETFL,
>  			fcntl.fcntl(fd, fcntl.F_GETFL) |
> os.O_NONBLOCK) @@ -57,7 +64,8 @@ class PipeLogger(AbstractPollTask):
>  				fcntl.fcntl(fd, fcntl.F_SETFD,
>  					fcntl.fcntl(fd,
> fcntl.F_GETFD) | fcntl.FD_CLOEXEC) 
> -		self.scheduler.add_reader(fd, self._output_handler,
> fd)
> +		self._io_loop_task =
> asyncio.ensure_future(self._io_loop(self.input_fd),
> loop=self.scheduler)
> +
> self._io_loop_task.add_done_callback(self._io_loop_done)
> self._registered = True 
>  	def _cancel(self):
> @@ -65,25 +73,36 @@ class PipeLogger(AbstractPollTask):
>  		if self.returncode is None:
>  			self.returncode = self._cancelled_returncode
>  
> -	def _output_handler(self, fd):
> -
> +	@coroutine
> +	def _io_loop(self, input_file):
>  		background = self.background
>  		stdout_fd = self.stdout_fd
>  		log_file = self._log_file 
> +		fd = input_file.fileno()
>  
>  		while True:
>  			buf = self._read_buf(fd)
>  
>  			if buf is None:
>  				# not a POLLIN event, EAGAIN, etc...
> -				break
> +				future =
> self.scheduler.create_future()
> +				self.scheduler.add_reader(fd,
> future.set_result, None)
> +				try:
> +					yield future
> +				finally:
> +					# The loop and input file
> may have been closed.
> +					if not
> self.scheduler.is_closed():
> +						future.done() or
> future.cancel()
> +						# Do not call
> remove_reader in cases where fd has
> +						# been closed and
> then re-allocated to a concurrent
> +						# coroutine as in
> bug 716636.
> +						if not
> input_file.closed:
> +
> self.scheduler.remove_reader(fd)
> +				continue
>  
>  			if not buf:
>  				# EOF
> -				self._unregister()
> -				self.returncode = self.returncode or
> os.EX_OK
> -				self._async_wait()
> -				break
> +				return
>  
>  			else:
>  				if not background and stdout_fd is
> not None: @@ -120,8 +139,25 @@ class PipeLogger(AbstractPollTask):
>  								fcntl.F_GETFL)
> ^ os.O_NONBLOCK) 
>  				if log_file is not None:
> -					log_file.write(buf)
> -					log_file.flush()
> +					if self._log_file_nb:
> +						# Use the _writer
> function which uses os.write, since the
> +						# log_file.write
> method looses data when an EAGAIN occurs.
> +						yield
> _writer(log_file, buf, loop=self.scheduler)
> +					else:
> +						# For gzip.GzipFile
> instances, the above _writer function
> +						# will not work
> because data written directly to the file
> +						# descriptor
> bypasses compression.
> +						log_file.write(buf)
> +						log_file.flush()
> +
> +	def _io_loop_done(self, future):
> +		try:
> +			future.result()
> +		except asyncio.CancelledError:
> +			self.cancel()
> +			self._was_cancelled()
> +		self.returncode = self.returncode or os.EX_OK
> +		self._async_wait()
>  
>  	def _unregister(self):
>  		if self.input_fd is not None:
> @@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask):
>  				self.input_fd.close()
>  			self.input_fd = None
>  
> +		if self._io_loop_task is not None:
> +			self._io_loop_task.done() or
> self._io_loop_task.cancel()
> +			self._io_loop_task = None
> +
>  		if self.stdout_fd is not None:
>  			os.close(self.stdout_fd)
>  			self.stdout_fd = None
>  
>  		if self._log_file is not None:
> +
> self.scheduler.remove_writer(self._log_file.fileno())
> self._log_file.close() self._log_file = None
>  

Looks good


^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [gentoo-portage-dev] [PATCH 2/2] Support PORTAGE_LOG_FILTER_FILE_CMD (bug 709746)
  2020-06-19 20:39 ` [gentoo-portage-dev] [PATCH 2/2] Support PORTAGE_LOG_FILTER_FILE_CMD " Zac Medico
@ 2020-06-22 14:46   ` Brian Dolbec
  2020-06-23  4:22     ` Zac Medico
  0 siblings, 1 reply; 6+ messages in thread
From: Brian Dolbec @ 2020-06-22 14:46 UTC (permalink / raw
  To: gentoo-portage-dev

On Fri, 19 Jun 2020 13:39:19 -0700
Zac Medico <zmedico@gentoo.org> wrote:

> This variable specifies a command that filters build log output to a
> log file. The plan is to extend this to support a separate filter for
> tty output in the future.
> 
> In order to enable the EbuildPhase class to write elog messages to
> the build log with PORTAGE_LOG_FILTER_FILE_CMD support, convert its
> _elog method to a coroutine, and add a SchedulerInterface async_output
> method for it to use.
> 
> Use a new BuildLogger class to manage log output (with or without a
> filter command), with compression support provided by PipeLogger.
> BuildLogger has a stdin property which provides access to a writable
> binary file stream (refers to a pipe) that log content is written to.
> 
> Bug: https://bugs.gentoo.org/709746
> Signed-off-by: Zac Medico <zmedico@gentoo.org>
> ---
>  lib/_emerge/AbstractEbuildProcess.py          |   3 +-
>  lib/_emerge/BinpkgFetcher.py                  |   3 +-
>  lib/_emerge/EbuildFetcher.py                  |   3 +-
>  lib/_emerge/EbuildPhase.py                    |  47 ++++++--
>  lib/_emerge/SpawnProcess.py                   |  58 +++++++---
>  lib/portage/dbapi/_MergeProcess.py            |   3 +-
>  .../ebuild/_config/special_env_vars.py        |   8 +-
>  lib/portage/util/_async/BuildLogger.py        | 109
> ++++++++++++++++++ lib/portage/util/_async/SchedulerInterface.py |
> 32 ++++- man/make.conf.5                               |   7 +-
>  10 files changed, 243 insertions(+), 30 deletions(-)
>  create mode 100644 lib/portage/util/_async/BuildLogger.py
> 
> diff --git a/lib/_emerge/AbstractEbuildProcess.py
> b/lib/_emerge/AbstractEbuildProcess.py index 1c1955cfe..ae1aae55f
> 100644 --- a/lib/_emerge/AbstractEbuildProcess.py
> +++ b/lib/_emerge/AbstractEbuildProcess.py
> @@ -1,4 +1,4 @@
> -# Copyright 1999-2019 Gentoo Foundation
> +# Copyright 1999-2020 Gentoo Authors
>  # Distributed under the terms of the GNU General Public License v2
>  
>  import errno
> @@ -196,6 +196,7 @@ class AbstractEbuildProcess(SpawnProcess):
>  			null_fd = os.open('/dev/null', os.O_RDONLY)
>  			self.fd_pipes[0] = null_fd
>  
> +		self.log_filter_file =
> self.settings.get('PORTAGE_LOG_FILTER_FILE_CMD') try:
>  			SpawnProcess._start(self)
>  		finally:
> diff --git a/lib/_emerge/BinpkgFetcher.py
> b/lib/_emerge/BinpkgFetcher.py index 36d027de3..2e5861cc1 100644
> --- a/lib/_emerge/BinpkgFetcher.py
> +++ b/lib/_emerge/BinpkgFetcher.py
> @@ -1,4 +1,4 @@
> -# Copyright 1999-2018 Gentoo Foundation
> +# Copyright 1999-2020 Gentoo Authors
>  # Distributed under the terms of the GNU General Public License v2
>  
>  import functools
> @@ -158,6 +158,7 @@ class _BinpkgFetcherProcess(SpawnProcess):
>  		self.env = fetch_env
>  		if settings.selinux_enabled():
>  			self._selinux_type =
> settings["PORTAGE_FETCH_T"]
> +		self.log_filter_file =
> settings.get('PORTAGE_LOG_FILTER_FILE_CMD') SpawnProcess._start(self)
>  
>  	def _pipe(self, fd_pipes):
> diff --git a/lib/_emerge/EbuildFetcher.py
> b/lib/_emerge/EbuildFetcher.py index 1e40994fb..55349c33c 100644
> --- a/lib/_emerge/EbuildFetcher.py
> +++ b/lib/_emerge/EbuildFetcher.py
> @@ -1,4 +1,4 @@
> -# Copyright 1999-2018 Gentoo Foundation
> +# Copyright 1999-2020 Gentoo Authors
>  # Distributed under the terms of the GNU General Public License v2
>  
>  import copy
> @@ -225,6 +225,7 @@ class _EbuildFetcherProcess(ForkProcess):
>  			settings["NOCOLOR"] = nocolor
>  
>  		self._settings = settings
> +		self.log_filter_file =
> settings.get('PORTAGE_LOG_FILTER_FILE_CMD') ForkProcess._start(self)
>  
>  		# Free settings now since it's no longer needed in
> diff --git a/lib/_emerge/EbuildPhase.py b/lib/_emerge/EbuildPhase.py
> index 477e0ba97..ddb3dc719 100644
> --- a/lib/_emerge/EbuildPhase.py
> +++ b/lib/_emerge/EbuildPhase.py
> @@ -26,6 +26,8 @@ from portage.package.ebuild.prepare_build_dirs
> import (_prepare_workdir, from portage.util.futures.compat_coroutine
> import coroutine from portage.util import writemsg
>  from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
> +from portage.util._async.BuildLogger import BuildLogger
> +from portage.util.futures import asyncio
>  from portage.util.futures.executor.fork import ForkExecutor
>  
>  try:
> @@ -69,6 +71,11 @@ class EbuildPhase(CompositeTask):
>  	_locked_phases = ("setup", "preinst", "postinst", "prerm",
> "postrm") 
>  	def _start(self):
> +		future = asyncio.ensure_future(self._async_start(),
> loop=self.scheduler)
> +		self._start_task(AsyncTaskFuture(future=future),
> self._async_start_exit) +
> +	@coroutine
> +	def _async_start(self):
>  
>  		need_builddir = self.phase not in
> EbuildProcess._phases_without_builddir 
> @@ -126,7 +133,7 @@ class EbuildPhase(CompositeTask):
>  			# Force background=True for this header
> since it's intended # for the log and it doesn't necessarily need to
> be visible # elsewhere.
> -			self._elog('einfo', msg, background=True)
> +			yield self._elog('einfo', msg,
> background=True) 
>  		if self.phase == 'package':
>  			if 'PORTAGE_BINPKG_TMPFILE' not in
> self.settings: @@ -134,6 +141,12 @@ class EbuildPhase(CompositeTask):
>  					os.path.join(self.settings['PKGDIR'],
>  					self.settings['CATEGORY'],
> self.settings['PF']) + '.tbz2' 
> +	def _async_start_exit(self, task):
> +		task.future.cancelled() or task.future.result()
> +		if self._default_exit(task) != os.EX_OK:
> +			self.wait()
> +			return
> +
>  		if self.phase in ("pretend", "prerm"):
>  			env_extractor =
> BinpkgEnvExtractor(background=self.background,
> scheduler=self.scheduler, settings=self.settings) @@ -391,6 +404,7 @@
> class EbuildPhase(CompositeTask): self.returncode = 1
>  		self.wait()
>  
> +	@coroutine
>  	def _elog(self, elog_funcname, lines, background=None):
>  		if background is None:
>  			background = self.background
> @@ -407,11 +421,30 @@ class EbuildPhase(CompositeTask):
>  			portage.output.havecolor = global_havecolor
>  		msg = out.getvalue()
>  		if msg:
> -			log_path = None
> -			if self.settings.get("PORTAGE_BACKGROUND")
> != "subprocess":
> -				log_path =
> self.settings.get("PORTAGE_LOG_FILE")
> -			self.scheduler.output(msg, log_path=log_path,
> -				background=background)
> +			build_logger = None
> +			try:
> +				log_file = None
> +				log_path = None
> +				if
> self.settings.get("PORTAGE_BACKGROUND") != "subprocess":
> +					log_path =
> self.settings.get("PORTAGE_LOG_FILE")
> +				if log_path:
> +					build_logger =
> BuildLogger(env=self.settings.environ(),
> +						log_path=log_path,
> +
> log_filter_file=self.settings.get('PORTAGE_LOG_FILTER_FILE_CMD'),
> +
> scheduler=self.scheduler)
> +					build_logger.start()
> +					log_file = build_logger.stdin
> +
> +				yield
> self.scheduler.async_output(msg, log_file=log_file,
> +					background=background)
> +
> +				if build_logger is not None:
> +					build_logger.stdin.close()
> +					yield
> build_logger.async_wait()
> +			except asyncio.CancelledError:
> +				if build_logger is not None:
> +					build_logger.cancel()
> +				raise
>  
>  
>  class _PostPhaseCommands(CompositeTask):
> @@ -480,4 +513,4 @@ class _PostPhaseCommands(CompositeTask):
>  			qa_msg.extend("\t%s: %s" % (filename, "
> ".join(sorted(soname_deps))) for filename, soname_deps in unresolved)
>  			qa_msg.append("")
> -			self.elog("eqawarn", qa_msg)
> +			yield self.elog("eqawarn", qa_msg)
> diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py
> index 395d66bb9..f96911571 100644
> --- a/lib/_emerge/SpawnProcess.py
> +++ b/lib/_emerge/SpawnProcess.py
> @@ -1,4 +1,4 @@
> -# Copyright 2008-2018 Gentoo Foundation
> +# Copyright 2008-2020 Gentoo Authors
>  # Distributed under the terms of the GNU General Public License v2
>  
>  try:
> @@ -19,7 +19,10 @@ from portage.const import BASH_BINARY
>  from portage.localization import _
>  from portage.output import EOutput
>  from portage.util import writemsg_level
> +from portage.util._async.BuildLogger import BuildLogger
>  from portage.util._async.PipeLogger import PipeLogger
> +from portage.util.futures import asyncio
> +from portage.util.futures.compat_coroutine import coroutine
>  
>  class SpawnProcess(SubProcess):
>  
> @@ -34,8 +37,8 @@ class SpawnProcess(SubProcess):
>  		"path_lookup", "pre_exec", "close_fds", "cgroup",
>  		"unshare_ipc", "unshare_mount", "unshare_pid",
> "unshare_net") 
> -	__slots__ = ("args",) + \
> -		_spawn_kwarg_names + ("_pipe_logger",
> "_selinux_type",)
> +	__slots__ = ("args", "log_filter_file") + \
> +		_spawn_kwarg_names + ("_main_task", "_selinux_type",)
>  
>  	# Max number of attempts to kill the processes listed in
> cgroup.procs, # given that processes may fork before they can be
> killed. @@ -137,13 +140,43 @@ class SpawnProcess(SubProcess):
>  						fcntl.fcntl(stdout_fd,
>  						fcntl.F_GETFD) |
> fcntl.FD_CLOEXEC) 
> -		self._pipe_logger =
> PipeLogger(background=self.background,
> +		build_logger = BuildLogger(env=self.env,
> +			log_path=log_file_path,
> +			log_filter_file=self.log_filter_file,
> +			scheduler=self.scheduler)
> +		build_logger.start()
> +
> +		pipe_logger = PipeLogger(background=self.background,
>  			scheduler=self.scheduler, input_fd=master_fd,
> -			log_file_path=log_file_path,
> +			log_file_path=build_logger.stdin,
>  			stdout_fd=stdout_fd)
> -
> self._pipe_logger.addExitListener(self._pipe_logger_exit)
> -		self._pipe_logger.start()
> +
> +		pipe_logger.start()
> +
>  		self._registered = True
> +		self._main_task =
> asyncio.ensure_future(self._main(build_logger, pipe_logger),
> loop=self.scheduler)
> +		self._main_task.add_done_callback(self._main_exit)
> +
> +	@coroutine
> +	def _main(self, build_logger, pipe_logger):
> +		try:
> +			if pipe_logger.poll() is None:
> +				yield pipe_logger.async_wait()
> +			if build_logger.poll() is None:
> +				yield build_logger.async_wait()
> +		except asyncio.CancelledError:
> +			if pipe_logger.poll() is None:
> +				pipe_logger.cancel()
> +			if build_logger.poll() is None:
> +				build_logger.cancel()
> +			raise
> +
> +	def _main_exit(self, main_task):
> +		try:
> +			main_task.result()
> +		except asyncio.CancelledError:
> +			self.cancel()
> +		self._async_waitpid()
>  
>  	def _can_log(self, slave_fd):
>  		return True
> @@ -167,20 +200,17 @@ class SpawnProcess(SubProcess):
>  
>  		return spawn_func(args, **kwargs)
>  
> -	def _pipe_logger_exit(self, pipe_logger):
> -		self._pipe_logger = None
> -		self._async_waitpid()
> -
>  	def _unregister(self):
>  		SubProcess._unregister(self)
>  		if self.cgroup is not None:
>  			self._cgroup_cleanup()
>  			self.cgroup = None
> -		if self._pipe_logger is not None:
> -			self._pipe_logger.cancel()
> -			self._pipe_logger = None
> +		if self._main_task is not None:
> +			self._main_task.done() or
> self._main_task.cancel() 
>  	def _cancel(self):
> +		if self._main_task is not None:
> +			self._main_task.done() or
> self._main_task.cancel() SubProcess._cancel(self)
>  		self._cgroup_cleanup()
>  
> diff --git a/lib/portage/dbapi/_MergeProcess.py
> b/lib/portage/dbapi/_MergeProcess.py index 371550079..236d1a255 100644
> --- a/lib/portage/dbapi/_MergeProcess.py
> +++ b/lib/portage/dbapi/_MergeProcess.py
> @@ -1,4 +1,4 @@
> -# Copyright 2010-2018 Gentoo Foundation
> +# Copyright 2010-2020 Gentoo Authors
>  # Distributed under the terms of the GNU General Public License v2
>  
>  import io
> @@ -57,6 +57,7 @@ class MergeProcess(ForkProcess):
>  			self.fd_pipes = self.fd_pipes.copy()
>  		self.fd_pipes.setdefault(0,
> portage._get_stdin().fileno()) 
> +		self.log_filter_file =
> self.settings.get('PORTAGE_LOG_FILTER_FILE_CMD') super(MergeProcess,
> self)._start() 
>  	def _lock_vdb(self):
> diff --git a/lib/portage/package/ebuild/_config/special_env_vars.py
> b/lib/portage/package/ebuild/_config/special_env_vars.py index
> 440dd00b2..f44cb9b1b 100644 ---
> a/lib/portage/package/ebuild/_config/special_env_vars.py +++
> b/lib/portage/package/ebuild/_config/special_env_vars.py @@ -1,4 +1,4
> @@ -# Copyright 2010-2019 Gentoo Authors
> +# Copyright 2010-2020 Gentoo Authors
>  # Distributed under the terms of the GNU General Public License v2
>  
>  from __future__ import unicode_literals
> @@ -175,7 +175,7 @@ environ_filter += [
>  	"PORTAGE_RO_DISTDIRS",
>  	"PORTAGE_RSYNC_EXTRA_OPTS", "PORTAGE_RSYNC_OPTS",
>  	"PORTAGE_RSYNC_RETRIES", "PORTAGE_SSH_OPTS",
> "PORTAGE_SYNC_STALE",
> -	"PORTAGE_USE",
> +	"PORTAGE_USE", "PORTAGE_LOG_FILTER_FILE_CMD",
>  	"PORTAGE_LOGDIR", "PORTAGE_LOGDIR_CLEAN",
>  	"QUICKPKG_DEFAULT_OPTS", "REPOMAN_DEFAULT_OPTS",
>  	"RESUMECOMMAND", "RESUMECOMMAND_FTP",
> @@ -204,7 +204,9 @@ default_globals = {
>  	'PORTAGE_BZIP2_COMMAND':    'bzip2',
>  }
>  
> -validate_commands = ('PORTAGE_BZIP2_COMMAND',
> 'PORTAGE_BUNZIP2_COMMAND',) +validate_commands =
> ('PORTAGE_BZIP2_COMMAND', 'PORTAGE_BUNZIP2_COMMAND',
> +	'PORTAGE_LOG_FILTER_FILE_CMD',
> +)
>  
>  # To enhance usability, make some vars case insensitive
>  # by forcing them to lower case.
> diff --git a/lib/portage/util/_async/BuildLogger.py
> b/lib/portage/util/_async/BuildLogger.py new file mode 100644
> index 000000000..f5fea77ea
> --- /dev/null
> +++ b/lib/portage/util/_async/BuildLogger.py
> @@ -0,0 +1,109 @@
> +# Copyright 2020 Gentoo Authors
> +# Distributed under the terms of the GNU General Public License v2
> +
> +import subprocess
> +
> +from portage import os
> +from portage.util import shlex_split
> +from _emerge.AsynchronousTask import AsynchronousTask
> +from portage.util._async.PipeLogger import PipeLogger
> +from portage.util._async.PopenProcess import PopenProcess
> +from portage.util.futures import asyncio
> +from portage.util.futures.compat_coroutine import coroutine
> +
> +
> +class BuildLogger(AsynchronousTask):
> +	"""
> +	Write to a log file, with compression support provided by
> PipeLogger.
> +	If the log_filter_file parameter is specified, then it is
> interpreted
> +	as a command to execute which filters log output (see the
> +	PORTAGE_LOG_FILTER_FILE_CMD variable in make.conf(5)). The
> stdin property
> +	provides access to a writable binary file stream (refers to
> a pipe)
> +	that log content should be written to (usually redirected
> from
> +	subprocess stdout and stderr streams).
> +	"""
> +
> +	__slots__ = ('env', 'log_path', 'log_filter_file',
> '_main_task', '_stdin') +
> +	@property
> +	def stdin(self):
> +		return self._stdin
> +
> +	def _start(self):
> +		filter_proc = None
> +		log_input = None
> +		if self.log_path is not None:
> +			log_filter_file = self.log_filter_file
> +			if log_filter_file is not None:
> +				split_value =
> shlex_split(log_filter_file)
> +				log_filter_file = split_value if
> split_value else None
> +			if log_filter_file:
> +				filter_input, stdin = os.pipe()
> +				log_input, filter_output = os.pipe()
> +				try:
> +					filter_proc = PopenProcess(
> +
> proc=subprocess.Popen(
> +
> log_filter_file,
> +							env=self.env,
> +
> stdin=filter_input,
> +
> stdout=filter_output,
> +
> stderr=filter_output,
> +						),
> +
> scheduler=self.scheduler,
> +					)
> +					filter_proc.start()
> +				except EnvironmentError:
> +					# Maybe the command is
> missing or broken somehow...
> +					os.close(filter_input)
> +					os.close(stdin)
> +					os.close(log_input)
> +					os.close(filter_output)
> +				else:
> +					self._stdin =
> os.fdopen(stdin, 'wb', 0)
> +					os.close(filter_input)
> +					os.close(filter_output)
> +
> +		if self._stdin is None:
> +			# Since log_filter_file is unspecified or
> refers to a file
> +			# that is missing or broken somehow, create
> a pipe that
> +			# logs directly to pipe_logger.
> +			log_input, stdin = os.pipe()
> +			self._stdin = os.fdopen(stdin, 'wb', 0)
> +
> +		# Set background=True so that pipe_logger does not
> log to stdout.
> +		pipe_logger = PipeLogger(background=True,
> +			scheduler=self.scheduler, input_fd=log_input,
> +			log_file_path=self.log_path)
> +		pipe_logger.start()
> +
> +		self._main_task =
> asyncio.ensure_future(self._main(filter_proc, pipe_logger),
> loop=self.scheduler)
> +		self._main_task.add_done_callback(self._main_exit)
> +
> +	@coroutine
> +	def _main(self, filter_proc, pipe_logger):
> +		try:
> +			if pipe_logger.poll() is None:
> +				yield pipe_logger.async_wait()
> +			if filter_proc is not None and
> filter_proc.poll() is None:
> +				yield filter_proc.async_wait()
> +		except asyncio.CancelledError:
> +			if pipe_logger.poll() is None:
> +				pipe_logger.cancel()
> +			if filter_proc is not None and
> filter_proc.poll() is None:
> +				filter_proc.cancel()
> +			raise
> +
> +	def _cancel(self):
> +		if self._main_task is not None:
> +			self._main_task.done() or
> self._main_task.cancel()
> +		if self._stdin is not None and not
> self._stdin.closed:
> +			self._stdin.close()
> +
> +	def _main_exit(self, main_task):
> +		try:
> +			main_task.result()
> +		except asyncio.CancelledError:
> +			self.cancel()
> +			self._was_cancelled()
> +		self.returncode = self.returncode or 0
> +		self._async_wait()
> diff --git a/lib/portage/util/_async/SchedulerInterface.py
> b/lib/portage/util/_async/SchedulerInterface.py index
> ec6417da1..3ff250d1d 100644 ---
> a/lib/portage/util/_async/SchedulerInterface.py +++
> b/lib/portage/util/_async/SchedulerInterface.py @@ -1,4 +1,4 @@
> -# Copyright 2012-2018 Gentoo Foundation
> +# Copyright 2012-2020 Gentoo Authors
>  # Distributed under the terms of the GNU General Public License v2
>  
>  import gzip
> @@ -7,6 +7,8 @@ import errno
>  from portage import _encodings
>  from portage import _unicode_encode
>  from portage.util import writemsg_level
> +from portage.util.futures._asyncio.streams import _writer
> +from portage.util.futures.compat_coroutine import coroutine
>  from ..SlotObject import SlotObject
>  
>  class SchedulerInterface(SlotObject):
> @@ -53,6 +55,34 @@ class SchedulerInterface(SlotObject):
>  	def _return_false():
>  		return False
>  
> +	@coroutine
> +	def async_output(self, msg, log_file=None, background=None,
> +		level=0, noiselevel=-1):
> +		"""
> +		Output a msg to stdio (if not in background) and to
> a log file
> +		if provided.
> +
> +		@param msg: a message string, including newline if
> appropriate
> +		@type msg: str
> +		@param log_file: log file in binary mode
> +		@type log_file: file
> +		@param background: send messages only to log (not to
> stdio)
> +		@type background: bool
> +		@param level: a numeric logging level (see the
> logging module)
> +		@type level: int
> +		@param noiselevel: passed directly to writemsg
> +		@type noiselevel: int
> +		"""
> +		global_background = self._is_background()
> +		if background is None or global_background:
> +			background = global_background
> +
> +		if not background:
> +			writemsg_level(msg, level=level,
> noiselevel=noiselevel) +
> +		if log_file is not None:
> +			yield _writer(log_file, _unicode_encode(msg))
> +
>  	def output(self, msg, log_path=None, background=None,
>  		level=0, noiselevel=-1):
>  		"""
> diff --git a/man/make.conf.5 b/man/make.conf.5
> index a3bd662ae..eb812150f 100644
> --- a/man/make.conf.5
> +++ b/man/make.conf.5
> @@ -1,4 +1,4 @@
> -.TH "MAKE.CONF" "5" "May 2020" "Portage VERSION" "Portage"
> +.TH "MAKE.CONF" "5" "Jun 2020" "Portage VERSION" "Portage"
>  .SH "NAME"
>  make.conf \- custom settings for Portage
>  .SH "SYNOPSIS"
> @@ -979,6 +979,11 @@ with an integer pid. For example, a value of
> "ionice \-c 3 \-p \\${PID}" will set idle io priority. For more
> information about ionice, see \fBionice\fR(1). This variable is unset
> by default. .TP
> +.B PORTAGE_LOG_FILTER_FILE_CMD
> +This variable specifies a command that filters build log output to a
> +log file. In order to filter ANSI escape codes from build logs,
> +\fBansifilter\fR(1) is a convenient setting for this variable.
> +.TP
>  .B PORTAGE_LOGDIR
>  This variable defines the directory in which per\-ebuild logs are
> kept. Logs are created only when this is set. They are stored as

That's a lot of code...but I couldn't spot anything wrong, so looks good


^ permalink raw reply	[flat|nested] 6+ messages in thread

* Re: [gentoo-portage-dev] [PATCH 2/2] Support PORTAGE_LOG_FILTER_FILE_CMD (bug 709746)
  2020-06-22 14:46   ` Brian Dolbec
@ 2020-06-23  4:22     ` Zac Medico
  0 siblings, 0 replies; 6+ messages in thread
From: Zac Medico @ 2020-06-23  4:22 UTC (permalink / raw
  To: gentoo-portage-dev, Brian Dolbec


[-- Attachment #1.1: Type: text/plain, Size: 259 bytes --]

On 6/22/20 7:46 AM, Brian Dolbec wrote:
> 
> That's a lot of code...but I couldn't spot anything wrong, so looks good

Thanks, merged:

https://gitweb.gentoo.org/proj/portage.git/commit/?id=dd69ce742c62b9515cf7ae37e46bcf7f178777db
-- 
Thanks,
Zac


[-- Attachment #2: OpenPGP digital signature --]
[-- Type: application/pgp-signature, Size: 981 bytes --]

^ permalink raw reply	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2020-06-23  4:22 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2020-06-19 20:39 [gentoo-portage-dev] [PATCH 0/2] Support PORTAGE_LOG_FILTER_FILE_CMD (bug 709746) Zac Medico
2020-06-19 20:39 ` [gentoo-portage-dev] [PATCH 1/2] PipeLogger: non-blocking write to pipe " Zac Medico
2020-06-22 14:35   ` Brian Dolbec
2020-06-19 20:39 ` [gentoo-portage-dev] [PATCH 2/2] Support PORTAGE_LOG_FILTER_FILE_CMD " Zac Medico
2020-06-22 14:46   ` Brian Dolbec
2020-06-23  4:22     ` Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox