public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
@ 2020-02-24 10:51 Zac Medico
  0 siblings, 0 replies; 6+ messages in thread
From: Zac Medico @ 2020-02-24 10:51 UTC (permalink / raw
  To: gentoo-commits

commit:     27712651aa7014a960b012dc89457df09677edc1
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Feb 24 08:06:11 2020 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Feb 24 10:26:33 2020 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=27712651

PipeLogger: non-blocking write to pipe (bug 709746)

Add support to write to a non-blocking pipe instead of a
log file. This is needed for the purposes of bug 709746,
where PipeLogger will write to a pipe that is drained
by anoher PipeLogger instance which is running in the same
process.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/process/test_PopenProcess.py | 41 +++++++++++++++-
 lib/portage/util/_async/PipeLogger.py          | 67 +++++++++++++++++++++-----
 2 files changed, 94 insertions(+), 14 deletions(-)

diff --git a/lib/portage/tests/process/test_PopenProcess.py b/lib/portage/tests/process/test_PopenProcess.py
index ed506b814..d4e97f210 100644
--- a/lib/portage/tests/process/test_PopenProcess.py
+++ b/lib/portage/tests/process/test_PopenProcess.py
@@ -9,6 +9,8 @@ from portage.tests import TestCase
 from portage.util._async.PipeLogger import PipeLogger
 from portage.util._async.PopenProcess import PopenProcess
 from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures._asyncio.streams import _reader
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
 from _emerge.PipeReader import PipeReader
 
 class PopenPipeTestCase(TestCase):
@@ -73,8 +75,41 @@ class PopenPipeTestCase(TestCase):
 
 		return content.decode('ascii', 'replace')
 
+	@coroutine
+	def _testPipeLoggerToPipe(self, test_string, loop=None):
+		"""
+		Test PipeLogger writing to a pipe connected to a PipeReader.
+		This verifies that PipeLogger does not deadlock when writing
+		to a pipe that's drained by a PipeReader running in the same
+		process (requires non-blocking write).
+		"""
+
+		producer = PopenProcess(proc=subprocess.Popen(
+			["bash", "-c", self._echo_cmd % test_string],
+			stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
+			scheduler=loop)
+
+		pr, pw = os.pipe()
+
+		consumer = producer.pipe_reader = PipeLogger(background=True,
+			input_fd=producer.proc.stdout,
+			log_file_path=os.fdopen(pw, 'wb', 0))
+
+		reader = _reader(pr, loop=loop)
+		yield producer.async_start()
+		content = yield reader
+		yield producer.async_wait()
+		yield consumer.async_wait()
+
+		self.assertEqual(producer.returncode, os.EX_OK)
+		self.assertEqual(consumer.returncode, os.EX_OK)
+
+		coroutine_return(content.decode('ascii', 'replace'))
+
 	def testPopenPipe(self):
-		for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
+		loop = global_event_loop()
+
+		for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**15, 2**16):
 			test_string = x * "a"
 			output = self._testPipeReader(test_string)
 			self.assertEqual(test_string, output,
@@ -83,3 +118,7 @@ class PopenPipeTestCase(TestCase):
 			output = self._testPipeLogger(test_string)
 			self.assertEqual(test_string, output,
 				"x = %s, len(output) = %s" % (x, len(output)))
+
+			output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
+			self.assertEqual(test_string, output,
+				"x = %s, len(output) = %s" % (x, len(output)))

diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index a4258f350..6b03988a1 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,6 +8,9 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
+from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -21,13 +24,15 @@ class PipeLogger(AbstractPollTask):
 	"""
 
 	__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-		("_log_file", "_log_file_real")
+		("_io_loop_task", "_log_file", "_log_file_real")
 
 	def _start(self):
 
 		log_file_path = self.log_file_path
-		if log_file_path is not None:
-
+		if hasattr(log_file_path, 'write'):
+			self._log_file = log_file_path
+			_set_nonblocking(self._log_file.fileno())
+		elif log_file_path is not None:
 			self._log_file = open(_unicode_encode(log_file_path,
 				encoding=_encodings['fs'], errors='strict'), mode='ab')
 			if log_file_path.endswith('.gz'):
@@ -57,7 +62,8 @@ class PipeLogger(AbstractPollTask):
 				fcntl.fcntl(fd, fcntl.F_SETFD,
 					fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
 
-		self.scheduler.add_reader(fd, self._output_handler, fd)
+		self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), loop=self.scheduler)
+		self._io_loop_task.add_done_callback(self._io_loop_done)
 		self._registered = True
 
 	def _cancel(self):
@@ -65,8 +71,8 @@ class PipeLogger(AbstractPollTask):
 		if self.returncode is None:
 			self.returncode = self._cancelled_returncode
 
-	def _output_handler(self, fd):
-
+	@coroutine
+	def _io_loop(self, fd):
 		background = self.background
 		stdout_fd = self.stdout_fd
 		log_file = self._log_file 
@@ -76,14 +82,18 @@ class PipeLogger(AbstractPollTask):
 
 			if buf is None:
 				# not a POLLIN event, EAGAIN, etc...
-				break
+				future = self.scheduler.create_future()
+				self.scheduler.add_reader(fd, future.set_result, None)
+				try:
+					yield future
+				finally:
+					self.scheduler.remove_reader(fd)
+					future.done() or future.cancel()
+				continue
 
 			if not buf:
 				# EOF
-				self._unregister()
-				self.returncode = self.returncode or os.EX_OK
-				self._async_wait()
-				break
+				return
 
 			else:
 				if not background and stdout_fd is not None:
@@ -120,8 +130,34 @@ class PipeLogger(AbstractPollTask):
 								fcntl.F_GETFL) ^ os.O_NONBLOCK)
 
 				if log_file is not None:
-					log_file.write(buf)
-					log_file.flush()
+					write_buf = buf
+					while True:
+						try:
+							if write_buf is not None:
+								log_file.write(write_buf)
+								write_buf = None
+							log_file.flush()
+						except EnvironmentError as e:
+							if e.errno != errno.EAGAIN:
+								raise
+							future = self.scheduler.create_future()
+							self.scheduler.add_writer(self._log_file.fileno(), future.set_result, None)
+							try:
+								yield future
+							finally:
+								self.scheduler.remove_writer(self._log_file.fileno())
+								future.done() or future.cancel()
+						else:
+							break
+
+	def _io_loop_done(self, future):
+		try:
+			future.result()
+		except asyncio.CancelledError:
+			self.cancel()
+			self._was_cancelled()
+		self.returncode = self.returncode or os.EX_OK
+		self._async_wait()
 
 	def _unregister(self):
 		if self.input_fd is not None:
@@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask):
 				self.input_fd.close()
 			self.input_fd = None
 
+		if self._io_loop_task is not None:
+			self._io_loop_task.done() or self._io_loop_task.cancel()
+			self._io_loop_task = None
+
 		if self.stdout_fd is not None:
 			os.close(self.stdout_fd)
 			self.stdout_fd = None
 
 		if self._log_file is not None:
+			self.scheduler.remove_writer(self._log_file.fileno())
 			self._log_file.close()
 			self._log_file = None
 


^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
@ 2020-04-08  5:56 Zac Medico
  0 siblings, 0 replies; 6+ messages in thread
From: Zac Medico @ 2020-04-08  5:56 UTC (permalink / raw
  To: gentoo-commits

commit:     f1e9389d64b6ded41d0dac99979a049cfa27e75c
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Apr  8 05:00:11 2020 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Apr  8 05:29:48 2020 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=f1e9389d

Revert "PipeLogger: non-blocking write to pipe (bug 709746)"

This reverts commit 27712651aa7014a960b012dc89457df09677edc1.

Bug: https://bugs.gentoo.org/716636
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/process/test_PopenProcess.py | 41 +---------------
 lib/portage/util/_async/PipeLogger.py          | 67 +++++---------------------
 2 files changed, 14 insertions(+), 94 deletions(-)

diff --git a/lib/portage/tests/process/test_PopenProcess.py b/lib/portage/tests/process/test_PopenProcess.py
index d4e97f210..ed506b814 100644
--- a/lib/portage/tests/process/test_PopenProcess.py
+++ b/lib/portage/tests/process/test_PopenProcess.py
@@ -9,8 +9,6 @@ from portage.tests import TestCase
 from portage.util._async.PipeLogger import PipeLogger
 from portage.util._async.PopenProcess import PopenProcess
 from portage.util._eventloop.global_event_loop import global_event_loop
-from portage.util.futures._asyncio.streams import _reader
-from portage.util.futures.compat_coroutine import coroutine, coroutine_return
 from _emerge.PipeReader import PipeReader
 
 class PopenPipeTestCase(TestCase):
@@ -75,41 +73,8 @@ class PopenPipeTestCase(TestCase):
 
 		return content.decode('ascii', 'replace')
 
-	@coroutine
-	def _testPipeLoggerToPipe(self, test_string, loop=None):
-		"""
-		Test PipeLogger writing to a pipe connected to a PipeReader.
-		This verifies that PipeLogger does not deadlock when writing
-		to a pipe that's drained by a PipeReader running in the same
-		process (requires non-blocking write).
-		"""
-
-		producer = PopenProcess(proc=subprocess.Popen(
-			["bash", "-c", self._echo_cmd % test_string],
-			stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
-			scheduler=loop)
-
-		pr, pw = os.pipe()
-
-		consumer = producer.pipe_reader = PipeLogger(background=True,
-			input_fd=producer.proc.stdout,
-			log_file_path=os.fdopen(pw, 'wb', 0))
-
-		reader = _reader(pr, loop=loop)
-		yield producer.async_start()
-		content = yield reader
-		yield producer.async_wait()
-		yield consumer.async_wait()
-
-		self.assertEqual(producer.returncode, os.EX_OK)
-		self.assertEqual(consumer.returncode, os.EX_OK)
-
-		coroutine_return(content.decode('ascii', 'replace'))
-
 	def testPopenPipe(self):
-		loop = global_event_loop()
-
-		for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**15, 2**16):
+		for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
 			test_string = x * "a"
 			output = self._testPipeReader(test_string)
 			self.assertEqual(test_string, output,
@@ -118,7 +83,3 @@ class PopenPipeTestCase(TestCase):
 			output = self._testPipeLogger(test_string)
 			self.assertEqual(test_string, output,
 				"x = %s, len(output) = %s" % (x, len(output)))
-
-			output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
-			self.assertEqual(test_string, output,
-				"x = %s, len(output) = %s" % (x, len(output)))

diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index 6b03988a1..a4258f350 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,9 +8,6 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
-from portage.util.futures import asyncio
-from portage.util.futures.compat_coroutine import coroutine
-from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -24,15 +21,13 @@ class PipeLogger(AbstractPollTask):
 	"""
 
 	__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-		("_io_loop_task", "_log_file", "_log_file_real")
+		("_log_file", "_log_file_real")
 
 	def _start(self):
 
 		log_file_path = self.log_file_path
-		if hasattr(log_file_path, 'write'):
-			self._log_file = log_file_path
-			_set_nonblocking(self._log_file.fileno())
-		elif log_file_path is not None:
+		if log_file_path is not None:
+
 			self._log_file = open(_unicode_encode(log_file_path,
 				encoding=_encodings['fs'], errors='strict'), mode='ab')
 			if log_file_path.endswith('.gz'):
@@ -62,8 +57,7 @@ class PipeLogger(AbstractPollTask):
 				fcntl.fcntl(fd, fcntl.F_SETFD,
 					fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
 
-		self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), loop=self.scheduler)
-		self._io_loop_task.add_done_callback(self._io_loop_done)
+		self.scheduler.add_reader(fd, self._output_handler, fd)
 		self._registered = True
 
 	def _cancel(self):
@@ -71,8 +65,8 @@ class PipeLogger(AbstractPollTask):
 		if self.returncode is None:
 			self.returncode = self._cancelled_returncode
 
-	@coroutine
-	def _io_loop(self, fd):
+	def _output_handler(self, fd):
+
 		background = self.background
 		stdout_fd = self.stdout_fd
 		log_file = self._log_file 
@@ -82,18 +76,14 @@ class PipeLogger(AbstractPollTask):
 
 			if buf is None:
 				# not a POLLIN event, EAGAIN, etc...
-				future = self.scheduler.create_future()
-				self.scheduler.add_reader(fd, future.set_result, None)
-				try:
-					yield future
-				finally:
-					self.scheduler.remove_reader(fd)
-					future.done() or future.cancel()
-				continue
+				break
 
 			if not buf:
 				# EOF
-				return
+				self._unregister()
+				self.returncode = self.returncode or os.EX_OK
+				self._async_wait()
+				break
 
 			else:
 				if not background and stdout_fd is not None:
@@ -130,34 +120,8 @@ class PipeLogger(AbstractPollTask):
 								fcntl.F_GETFL) ^ os.O_NONBLOCK)
 
 				if log_file is not None:
-					write_buf = buf
-					while True:
-						try:
-							if write_buf is not None:
-								log_file.write(write_buf)
-								write_buf = None
-							log_file.flush()
-						except EnvironmentError as e:
-							if e.errno != errno.EAGAIN:
-								raise
-							future = self.scheduler.create_future()
-							self.scheduler.add_writer(self._log_file.fileno(), future.set_result, None)
-							try:
-								yield future
-							finally:
-								self.scheduler.remove_writer(self._log_file.fileno())
-								future.done() or future.cancel()
-						else:
-							break
-
-	def _io_loop_done(self, future):
-		try:
-			future.result()
-		except asyncio.CancelledError:
-			self.cancel()
-			self._was_cancelled()
-		self.returncode = self.returncode or os.EX_OK
-		self._async_wait()
+					log_file.write(buf)
+					log_file.flush()
 
 	def _unregister(self):
 		if self.input_fd is not None:
@@ -169,16 +133,11 @@ class PipeLogger(AbstractPollTask):
 				self.input_fd.close()
 			self.input_fd = None
 
-		if self._io_loop_task is not None:
-			self._io_loop_task.done() or self._io_loop_task.cancel()
-			self._io_loop_task = None
-
 		if self.stdout_fd is not None:
 			os.close(self.stdout_fd)
 			self.stdout_fd = None
 
 		if self._log_file is not None:
-			self.scheduler.remove_writer(self._log_file.fileno())
 			self._log_file.close()
 			self._log_file = None
 


^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
@ 2020-06-13  6:51 Zac Medico
  0 siblings, 0 replies; 6+ messages in thread
From: Zac Medico @ 2020-06-13  6:51 UTC (permalink / raw
  To: gentoo-commits

commit:     3e46825a047067a96ed997fe394f85e042e542a8
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Feb 24 08:06:11 2020 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sat Jun 13 06:30:01 2020 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=3e46825a

PipeLogger: non-blocking write to pipe (bug 709746)

Add support to write to a non-blocking pipe instead of a
log file. This is needed for the purposes of bug 709746,
where PipeLogger will write to a pipe that is drained
by anoher PipeLogger instance which is running in the same
process.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/process/test_PipeLogger.py | 54 +++++++++++++++++++++++
 lib/portage/util/_async/PipeLogger.py        | 66 ++++++++++++++++++++++------
 2 files changed, 107 insertions(+), 13 deletions(-)

diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py
new file mode 100644
index 000000000..2b9f10eeb
--- /dev/null
+++ b/lib/portage/tests/process/test_PipeLogger.py
@@ -0,0 +1,54 @@
+# Copyright 2020 Gentoo Authors
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._async.PipeLogger import PipeLogger
+from portage.util.futures import asyncio
+from portage.util.futures._asyncio.streams import _reader, _writer
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
+
+
+class PipeLoggerTestCase(TestCase):
+
+	@coroutine
+	def _testPipeLoggerToPipe(self, test_string, loop=None):
+		"""
+		Test PipeLogger writing to a pipe connected to a PipeReader.
+		This verifies that PipeLogger does not deadlock when writing
+		to a pipe that's drained by a PipeReader running in the same
+		process (requires non-blocking write).
+		"""
+
+		pr1, pw1 = os.pipe()
+		writer = asyncio.ensure_future(_writer(pw1, test_string.encode('ascii')), loop=loop)
+
+		pr, pw = os.pipe()
+
+		consumer = PipeLogger(background=True,
+			input_fd=pr1,
+			log_file_path=os.fdopen(pw, 'wb', 0),
+			scheduler=loop)
+		consumer.start()
+
+		# Before starting the reader, wait here for a moment, in order
+		# to exercise PipeLogger's handling of EAGAIN during write.
+		yield asyncio.wait([writer], timeout=0.01)
+
+		reader = _reader(pr, loop=loop)
+		yield writer
+		content = yield reader
+		yield consumer.async_wait()
+
+		self.assertEqual(consumer.returncode, os.EX_OK)
+
+		coroutine_return(content.decode('ascii', 'replace'))
+
+	def testPipeLogger(self):
+		loop = asyncio._wrap_loop()
+
+		for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**17, 2**17 + 1):
+			test_string = x * "a"
+			output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
+			self.assertEqual(test_string, output,
+				"x = %s, len(output) = %s" % (x, len(output)))

diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index a4258f350..83669e05e 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,6 +8,9 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine
+from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -21,13 +24,15 @@ class PipeLogger(AbstractPollTask):
 	"""
 
 	__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-		("_log_file", "_log_file_real")
+		("_io_loop_task", "_log_file", "_log_file_real")
 
 	def _start(self):
 
 		log_file_path = self.log_file_path
-		if log_file_path is not None:
-
+		if hasattr(log_file_path, 'write'):
+			self._log_file = log_file_path
+			_set_nonblocking(self._log_file.fileno())
+		elif log_file_path is not None:
 			self._log_file = open(_unicode_encode(log_file_path,
 				encoding=_encodings['fs'], errors='strict'), mode='ab')
 			if log_file_path.endswith('.gz'):
@@ -57,7 +62,8 @@ class PipeLogger(AbstractPollTask):
 				fcntl.fcntl(fd, fcntl.F_SETFD,
 					fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
 
-		self.scheduler.add_reader(fd, self._output_handler, fd)
+		self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), loop=self.scheduler)
+		self._io_loop_task.add_done_callback(self._io_loop_done)
 		self._registered = True
 
 	def _cancel(self):
@@ -65,8 +71,8 @@ class PipeLogger(AbstractPollTask):
 		if self.returncode is None:
 			self.returncode = self._cancelled_returncode
 
-	def _output_handler(self, fd):
-
+	@coroutine
+	def _io_loop(self, fd):
 		background = self.background
 		stdout_fd = self.stdout_fd
 		log_file = self._log_file 
@@ -76,14 +82,19 @@ class PipeLogger(AbstractPollTask):
 
 			if buf is None:
 				# not a POLLIN event, EAGAIN, etc...
-				break
+				future = self.scheduler.create_future()
+				self.scheduler.add_reader(fd, future.set_result, None)
+				try:
+					yield future
+				finally:
+					if not self.scheduler.is_closed():
+						self.scheduler.remove_reader(fd)
+						future.done() or future.cancel()
+				continue
 
 			if not buf:
 				# EOF
-				self._unregister()
-				self.returncode = self.returncode or os.EX_OK
-				self._async_wait()
-				break
+				return
 
 			else:
 				if not background and stdout_fd is not None:
@@ -120,8 +131,32 @@ class PipeLogger(AbstractPollTask):
 								fcntl.F_GETFL) ^ os.O_NONBLOCK)
 
 				if log_file is not None:
-					log_file.write(buf)
-					log_file.flush()
+					write_buf = buf
+					while write_buf:
+						try:
+							# Use os.write, since the log_file.write method
+							# looses data when an EAGAIN occurs.
+							write_buf = write_buf[os.write(log_file.fileno(), write_buf):]
+						except EnvironmentError as e:
+							if e.errno != errno.EAGAIN:
+								raise
+							future = self.scheduler.create_future()
+							self.scheduler.add_writer(self._log_file.fileno(), future.set_result, None)
+							try:
+								yield future
+							finally:
+								if not self.scheduler.is_closed():
+									self.scheduler.remove_writer(self._log_file.fileno())
+									future.done() or future.cancel()
+
+	def _io_loop_done(self, future):
+		try:
+			future.result()
+		except asyncio.CancelledError:
+			self.cancel()
+			self._was_cancelled()
+		self.returncode = self.returncode or os.EX_OK
+		self._async_wait()
 
 	def _unregister(self):
 		if self.input_fd is not None:
@@ -133,11 +168,16 @@ class PipeLogger(AbstractPollTask):
 				self.input_fd.close()
 			self.input_fd = None
 
+		if self._io_loop_task is not None:
+			self._io_loop_task.done() or self._io_loop_task.cancel()
+			self._io_loop_task = None
+
 		if self.stdout_fd is not None:
 			os.close(self.stdout_fd)
 			self.stdout_fd = None
 
 		if self._log_file is not None:
+			self.scheduler.remove_writer(self._log_file.fileno())
 			self._log_file.close()
 			self._log_file = None
 


^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
@ 2020-06-16  3:16 Zac Medico
  0 siblings, 0 replies; 6+ messages in thread
From: Zac Medico @ 2020-06-16  3:16 UTC (permalink / raw
  To: gentoo-commits

commit:     ca763549507d995e91a49753b13bcca8748fae6c
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Jun 16 03:14:36 2020 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Jun 16 03:14:49 2020 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=ca763549

Revert "PipeLogger: non-blocking write to pipe (bug 709746)"

This reverts commit 3e46825a047067a96ed997fe394f85e042e542a8.
We've had reports of emerge hangs, so reverting this for now.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/process/test_PipeLogger.py | 54 --------------------
 lib/portage/util/_async/PipeLogger.py        | 73 +++++-----------------------
 2 files changed, 13 insertions(+), 114 deletions(-)

diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py
deleted file mode 100644
index 2b9f10eeb..000000000
--- a/lib/portage/tests/process/test_PipeLogger.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright 2020 Gentoo Authors
-# Distributed under the terms of the GNU General Public License v2
-
-from portage import os
-from portage.tests import TestCase
-from portage.util._async.PipeLogger import PipeLogger
-from portage.util.futures import asyncio
-from portage.util.futures._asyncio.streams import _reader, _writer
-from portage.util.futures.compat_coroutine import coroutine, coroutine_return
-
-
-class PipeLoggerTestCase(TestCase):
-
-	@coroutine
-	def _testPipeLoggerToPipe(self, test_string, loop=None):
-		"""
-		Test PipeLogger writing to a pipe connected to a PipeReader.
-		This verifies that PipeLogger does not deadlock when writing
-		to a pipe that's drained by a PipeReader running in the same
-		process (requires non-blocking write).
-		"""
-
-		pr1, pw1 = os.pipe()
-		writer = asyncio.ensure_future(_writer(pw1, test_string.encode('ascii')), loop=loop)
-
-		pr, pw = os.pipe()
-
-		consumer = PipeLogger(background=True,
-			input_fd=pr1,
-			log_file_path=os.fdopen(pw, 'wb', 0),
-			scheduler=loop)
-		consumer.start()
-
-		# Before starting the reader, wait here for a moment, in order
-		# to exercise PipeLogger's handling of EAGAIN during write.
-		yield asyncio.wait([writer], timeout=0.01)
-
-		reader = _reader(pr, loop=loop)
-		yield writer
-		content = yield reader
-		yield consumer.async_wait()
-
-		self.assertEqual(consumer.returncode, os.EX_OK)
-
-		coroutine_return(content.decode('ascii', 'replace'))
-
-	def testPipeLogger(self):
-		loop = asyncio._wrap_loop()
-
-		for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**17, 2**17 + 1):
-			test_string = x * "a"
-			output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
-			self.assertEqual(test_string, output,
-				"x = %s, len(output) = %s" % (x, len(output)))

diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index 1776cc860..a4258f350 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,9 +8,6 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
-from portage.util.futures import asyncio
-from portage.util.futures.compat_coroutine import coroutine
-from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -24,15 +21,13 @@ class PipeLogger(AbstractPollTask):
 	"""
 
 	__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-		("_io_loop_task", "_log_file", "_log_file_real")
+		("_log_file", "_log_file_real")
 
 	def _start(self):
 
 		log_file_path = self.log_file_path
-		if hasattr(log_file_path, 'write'):
-			self._log_file = log_file_path
-			_set_nonblocking(self._log_file.fileno())
-		elif log_file_path is not None:
+		if log_file_path is not None:
+
 			self._log_file = open(_unicode_encode(log_file_path,
 				encoding=_encodings['fs'], errors='strict'), mode='ab')
 			if log_file_path.endswith('.gz'):
@@ -62,8 +57,7 @@ class PipeLogger(AbstractPollTask):
 				fcntl.fcntl(fd, fcntl.F_SETFD,
 					fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
 
-		self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), loop=self.scheduler)
-		self._io_loop_task.add_done_callback(self._io_loop_done)
+		self.scheduler.add_reader(fd, self._output_handler, fd)
 		self._registered = True
 
 	def _cancel(self):
@@ -71,8 +65,8 @@ class PipeLogger(AbstractPollTask):
 		if self.returncode is None:
 			self.returncode = self._cancelled_returncode
 
-	@coroutine
-	def _io_loop(self, fd):
+	def _output_handler(self, fd):
+
 		background = self.background
 		stdout_fd = self.stdout_fd
 		log_file = self._log_file 
@@ -82,19 +76,14 @@ class PipeLogger(AbstractPollTask):
 
 			if buf is None:
 				# not a POLLIN event, EAGAIN, etc...
-				future = self.scheduler.create_future()
-				self.scheduler.add_reader(fd, future.set_result, None)
-				try:
-					yield future
-				finally:
-					if not self.scheduler.is_closed():
-						self.scheduler.remove_reader(fd)
-						future.done() or future.cancel()
-				continue
+				break
 
 			if not buf:
 				# EOF
-				return
+				self._unregister()
+				self.returncode = self.returncode or os.EX_OK
+				self._async_wait()
+				break
 
 			else:
 				if not background and stdout_fd is not None:
@@ -131,39 +120,8 @@ class PipeLogger(AbstractPollTask):
 								fcntl.F_GETFL) ^ os.O_NONBLOCK)
 
 				if log_file is not None:
-					if isinstance(log_file, gzip.GzipFile):
-						# Use log_file.write since data written directly
-						# to the file descriptor bypasses compression.
-						log_file.write(buf)
-						log_file.flush()
-						continue
-
-					write_buf = buf
-					while write_buf:
-						try:
-							# Use os.write, since the log_file.write method
-							# looses data when an EAGAIN occurs.
-							write_buf = write_buf[os.write(log_file.fileno(), write_buf):]
-						except EnvironmentError as e:
-							if e.errno != errno.EAGAIN:
-								raise
-							future = self.scheduler.create_future()
-							self.scheduler.add_writer(self._log_file.fileno(), future.set_result, None)
-							try:
-								yield future
-							finally:
-								if not self.scheduler.is_closed():
-									self.scheduler.remove_writer(self._log_file.fileno())
-									future.done() or future.cancel()
-
-	def _io_loop_done(self, future):
-		try:
-			future.result()
-		except asyncio.CancelledError:
-			self.cancel()
-			self._was_cancelled()
-		self.returncode = self.returncode or os.EX_OK
-		self._async_wait()
+					log_file.write(buf)
+					log_file.flush()
 
 	def _unregister(self):
 		if self.input_fd is not None:
@@ -175,16 +133,11 @@ class PipeLogger(AbstractPollTask):
 				self.input_fd.close()
 			self.input_fd = None
 
-		if self._io_loop_task is not None:
-			self._io_loop_task.done() or self._io_loop_task.cancel()
-			self._io_loop_task = None
-
 		if self.stdout_fd is not None:
 			os.close(self.stdout_fd)
 			self.stdout_fd = None
 
 		if self._log_file is not None:
-			self.scheduler.remove_writer(self._log_file.fileno())
 			self._log_file.close()
 			self._log_file = None
 


^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
@ 2020-06-23  2:38 Zac Medico
  0 siblings, 0 replies; 6+ messages in thread
From: Zac Medico @ 2020-06-23  2:38 UTC (permalink / raw
  To: gentoo-commits

commit:     72ac22e722549833c1ee7e7ad1b585db55f7dafc
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Fri Jun 19 03:04:52 2020 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Jun 23 02:13:05 2020 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=72ac22e7

PipeLogger: non-blocking write to pipe (bug 709746)

Add support to write to a non-blocking pipe instead of a
log file. This is needed for the purposes of bug 709746,
where PipeLogger will write to a pipe that is drained
by anoher PipeLogger instance which is running in the same
process.

Bug: https://bugs.gentoo.org/709746
Reviewed-by: Brian Dolbec <dolsen <AT> gentoo.org>
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/process/test_PipeLogger.py | 58 +++++++++++++++++++++
 lib/portage/util/_async/PipeLogger.py        | 75 +++++++++++++++++++++-------
 2 files changed, 116 insertions(+), 17 deletions(-)

diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py
new file mode 100644
index 000000000..2bd94cf39
--- /dev/null
+++ b/lib/portage/tests/process/test_PipeLogger.py
@@ -0,0 +1,58 @@
+# Copyright 2020 Gentoo Authors
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._async.PipeLogger import PipeLogger
+from portage.util.futures import asyncio
+from portage.util.futures._asyncio.streams import _reader, _writer
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
+from portage.util.futures.unix_events import _set_nonblocking
+
+
+class PipeLoggerTestCase(TestCase):
+
+	@coroutine
+	def _testPipeLoggerToPipe(self, test_string, loop=None):
+		"""
+		Test PipeLogger writing to a pipe connected to a PipeReader.
+		This verifies that PipeLogger does not deadlock when writing
+		to a pipe that's drained by a PipeReader running in the same
+		process (requires non-blocking write).
+		"""
+
+		input_fd, writer_pipe = os.pipe()
+		_set_nonblocking(writer_pipe)
+		writer_pipe = os.fdopen(writer_pipe, 'wb', 0)
+		writer = asyncio.ensure_future(_writer(writer_pipe, test_string.encode('ascii'), loop=loop), loop=loop)
+		writer.add_done_callback(lambda writer: writer_pipe.close())
+
+		pr, pw = os.pipe()
+
+		consumer = PipeLogger(background=True,
+			input_fd=input_fd,
+			log_file_path=os.fdopen(pw, 'wb', 0),
+			scheduler=loop)
+		consumer.start()
+
+		# Before starting the reader, wait here for a moment, in order
+		# to exercise PipeLogger's handling of EAGAIN during write.
+		yield asyncio.wait([writer], timeout=0.01)
+
+		reader = _reader(pr, loop=loop)
+		yield writer
+		content = yield reader
+		yield consumer.async_wait()
+
+		self.assertEqual(consumer.returncode, os.EX_OK)
+
+		coroutine_return(content.decode('ascii', 'replace'))
+
+	def testPipeLogger(self):
+		loop = asyncio._wrap_loop()
+
+		for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**17, 2**17 + 1):
+			test_string = x * "a"
+			output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
+			self.assertEqual(test_string, output,
+				"x = %s, len(output) = %s" % (x, len(output)))

diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index a4258f350..4271c8ee2 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -1,4 +1,4 @@
-# Copyright 2008-2018 Gentoo Foundation
+# Copyright 2008-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import fcntl
@@ -8,6 +8,10 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
+from portage.util.futures import asyncio
+from portage.util.futures._asyncio.streams import _writer
+from portage.util.futures.compat_coroutine import coroutine
+from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask):
 	"""
 
 	__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-		("_log_file", "_log_file_real")
+		("_io_loop_task", "_log_file", "_log_file_nb", "_log_file_real")
 
 	def _start(self):
 
 		log_file_path = self.log_file_path
-		if log_file_path is not None:
-
+		if hasattr(log_file_path, 'write'):
+			self._log_file_nb = True
+			self._log_file = log_file_path
+			_set_nonblocking(self._log_file.fileno())
+		elif log_file_path is not None:
 			self._log_file = open(_unicode_encode(log_file_path,
 				encoding=_encodings['fs'], errors='strict'), mode='ab')
 			if log_file_path.endswith('.gz'):
@@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask):
 				mode=0o660)
 
 		if isinstance(self.input_fd, int):
-			fd = self.input_fd
-		else:
-			fd = self.input_fd.fileno()
+			self.input_fd = os.fdopen(self.input_fd, 'rb', 0)
+
+		fd = self.input_fd.fileno()
 
 		fcntl.fcntl(fd, fcntl.F_SETFL,
 			fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
@@ -57,7 +64,8 @@ class PipeLogger(AbstractPollTask):
 				fcntl.fcntl(fd, fcntl.F_SETFD,
 					fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC)
 
-		self.scheduler.add_reader(fd, self._output_handler, fd)
+		self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd), loop=self.scheduler)
+		self._io_loop_task.add_done_callback(self._io_loop_done)
 		self._registered = True
 
 	def _cancel(self):
@@ -65,25 +73,36 @@ class PipeLogger(AbstractPollTask):
 		if self.returncode is None:
 			self.returncode = self._cancelled_returncode
 
-	def _output_handler(self, fd):
-
+	@coroutine
+	def _io_loop(self, input_file):
 		background = self.background
 		stdout_fd = self.stdout_fd
 		log_file = self._log_file 
+		fd = input_file.fileno()
 
 		while True:
 			buf = self._read_buf(fd)
 
 			if buf is None:
 				# not a POLLIN event, EAGAIN, etc...
-				break
+				future = self.scheduler.create_future()
+				self.scheduler.add_reader(fd, future.set_result, None)
+				try:
+					yield future
+				finally:
+					# The loop and input file may have been closed.
+					if not self.scheduler.is_closed():
+						future.done() or future.cancel()
+						# Do not call remove_reader in cases where fd has
+						# been closed and then re-allocated to a concurrent
+						# coroutine as in bug 716636.
+						if not input_file.closed:
+							self.scheduler.remove_reader(fd)
+				continue
 
 			if not buf:
 				# EOF
-				self._unregister()
-				self.returncode = self.returncode or os.EX_OK
-				self._async_wait()
-				break
+				return
 
 			else:
 				if not background and stdout_fd is not None:
@@ -120,8 +139,25 @@ class PipeLogger(AbstractPollTask):
 								fcntl.F_GETFL) ^ os.O_NONBLOCK)
 
 				if log_file is not None:
-					log_file.write(buf)
-					log_file.flush()
+					if self._log_file_nb:
+						# Use the _writer function which uses os.write, since the
+						# log_file.write method looses data when an EAGAIN occurs.
+						yield _writer(log_file, buf, loop=self.scheduler)
+					else:
+						# For gzip.GzipFile instances, the above _writer function
+						# will not work because data written directly to the file
+						# descriptor bypasses compression.
+						log_file.write(buf)
+						log_file.flush()
+
+	def _io_loop_done(self, future):
+		try:
+			future.result()
+		except asyncio.CancelledError:
+			self.cancel()
+			self._was_cancelled()
+		self.returncode = self.returncode or os.EX_OK
+		self._async_wait()
 
 	def _unregister(self):
 		if self.input_fd is not None:
@@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask):
 				self.input_fd.close()
 			self.input_fd = None
 
+		if self._io_loop_task is not None:
+			self._io_loop_task.done() or self._io_loop_task.cancel()
+			self._io_loop_task = None
+
 		if self.stdout_fd is not None:
 			os.close(self.stdout_fd)
 			self.stdout_fd = None
 
 		if self._log_file is not None:
+			self.scheduler.remove_writer(self._log_file.fileno())
 			self._log_file.close()
 			self._log_file = None
 


^ permalink raw reply related	[flat|nested] 6+ messages in thread

* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/
@ 2023-10-22  4:34 Zac Medico
  0 siblings, 0 replies; 6+ messages in thread
From: Zac Medico @ 2023-10-22  4:34 UTC (permalink / raw
  To: gentoo-commits

commit:     6abc969109754ab086db2bac5be1029de1a015c3
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Fri Oct 20 04:11:48 2023 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Oct 22 04:17:48 2023 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=6abc9691

ForkProcess: Implement fd_pipes via send_handle

This new fd_pipes implementation is only enabled
when the multiprocessing start method is not fork,
ensuring backward compatibility with existing
ForkProcess callers that rely on the fork start
method.

Note that the new fd_pipes implementation uses a
thread via run_in_executor, and threads are not
recommended for mixing with the fork start method
due to cpython issue 84559.

Bug: https://bugs.gentoo.org/915896
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/process/test_ForkProcess.py |   7 ++
 lib/portage/util/_async/ForkProcess.py        | 142 +++++++++++++++++++++-----
 2 files changed, 124 insertions(+), 25 deletions(-)

diff --git a/lib/portage/tests/process/test_ForkProcess.py b/lib/portage/tests/process/test_ForkProcess.py
index c07c60e9c6..bc0b836f11 100644
--- a/lib/portage/tests/process/test_ForkProcess.py
+++ b/lib/portage/tests/process/test_ForkProcess.py
@@ -4,6 +4,7 @@
 import functools
 import multiprocessing
 import tempfile
+from unittest.mock import patch
 
 from portage import os
 from portage.tests import TestCase
@@ -37,3 +38,9 @@ class ForkProcessTestCase(TestCase):
 
             with open(logfile.name, "rb") as output:
                 self.assertEqual(output.read(), test_string.encode("utf-8"))
+
+    def test_spawn_logfile_no_send_handle(self):
+        with patch(
+            "portage.util._async.ForkProcess.ForkProcess._HAVE_SEND_HANDLE", new=False
+        ):
+            self.test_spawn_logfile()

diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
index 09e40a2d3e..6d216a5c43 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -10,6 +10,7 @@ import sys
 
 import portage
 from portage import os
+from portage.cache.mappings import slot_dict_class
 from portage.util.futures import asyncio
 from _emerge.SpawnProcess import SpawnProcess
 
@@ -26,29 +27,36 @@ class ForkProcess(SpawnProcess):
         "_proc_join_task",
     )
 
+    _file_names = ("connection", "slave_fd")
+    _files_dict = slot_dict_class(_file_names, prefix="")
+
     # Number of seconds between poll attempts for process exit status
     # (after the sentinel has become ready).
     _proc_join_interval = 0.1
 
-    def _start(self):
-        if self.fd_pipes or self.logfile:
-            if self.fd_pipes:
-                if multiprocessing.get_start_method() != "fork":
-                    raise NotImplementedError(
-                        'fd_pipes only supported with multiprocessing start method "fork"'
-                    )
-                super()._start()
-                return
+    _HAVE_SEND_HANDLE = getattr(multiprocessing.reduction, "HAVE_SEND_HANDLE", False)
 
-            if self.logfile:
-                if multiprocessing.get_start_method() == "fork":
-                    # Use superclass pty support.
-                    super()._start()
-                    return
+    def _start(self):
+        if multiprocessing.get_start_method() == "fork":
+            # Backward compatibility mode.
+            super()._start()
+            return
+
+        # This mode supports multiprocessing start methods
+        # other than fork. Note that the fd_pipes implementation
+        # uses a thread via run_in_executor, and threads are not
+        # recommended for mixing with the fork start method due
+        # to cpython issue 84559.
+        if self.fd_pipes and not self._HAVE_SEND_HANDLE:
+            raise NotImplementedError(
+                'fd_pipes only supported with HAVE_SEND_HANDLE or multiprocessing start method "fork"'
+            )
 
-                # Log via multiprocessing.Pipe if necessary.
-                pr, pw = multiprocessing.Pipe(duplex=False)
-                self._child_connection = pw
+        if self.fd_pipes or self.logfile:
+            # Log via multiprocessing.Pipe if necessary.
+            connection, self._child_connection = multiprocessing.Pipe(
+                duplex=self._HAVE_SEND_HANDLE
+            )
 
         retval = self._spawn(self.args, fd_pipes=self.fd_pipes)
 
@@ -59,11 +67,71 @@ class ForkProcess(SpawnProcess):
             self._async_waitpid()
         else:
             self._child_connection.close()
+            self.fd_pipes = self.fd_pipes or {}
             stdout_fd = None
             if not self.background:
-                stdout_fd = os.dup(sys.__stdout__.fileno())
+                self.fd_pipes.setdefault(0, portage._get_stdin().fileno())
+                self.fd_pipes.setdefault(1, sys.__stdout__.fileno())
+                self.fd_pipes.setdefault(2, sys.__stderr__.fileno())
+                stdout_fd = os.dup(self.fd_pipes[1])
+
+            if self._HAVE_SEND_HANDLE:
+                master_fd, slave_fd = self._pipe(self.fd_pipes)
+                self.fd_pipes[1] = slave_fd
+                self.fd_pipes[2] = slave_fd
+                self._files = self._files_dict(connection=connection, slave_fd=slave_fd)
+            else:
+                master_fd = connection
+
+            self._start_main_task(
+                master_fd, log_file_path=self.logfile, stdout_fd=stdout_fd
+            )
 
-            self._start_main_task(pr, log_file_path=self.logfile, stdout_fd=stdout_fd)
+    @property
+    def _fd_pipes_send_handle(self):
+        """Returns True if we have a connection to implement fd_pipes via send_handle."""
+        return bool(
+            self._HAVE_SEND_HANDLE
+            and self._files
+            and getattr(self._files, "connection", False)
+        )
+
+    def _send_fd_pipes(self):
+        """
+        Communicate with _bootstrap to send fd_pipes via send_handle.
+        This performs blocking IO, intended for invocation via run_in_executor.
+        """
+        fd_list = list(set(self.fd_pipes.values()))
+        self._files.connection.send(
+            (self.fd_pipes, fd_list),
+        )
+        for fd in fd_list:
+            multiprocessing.reduction.send_handle(
+                self._files.connection,
+                fd,
+                self.pid,
+            )
+
+    async def _main(self, build_logger, pipe_logger, loop=None):
+        try:
+            if self._fd_pipes_send_handle:
+                await self.scheduler.run_in_executor(
+                    None,
+                    self._send_fd_pipes,
+                )
+        except asyncio.CancelledError:
+            self._main_cancel(build_logger, pipe_logger)
+            raise
+        finally:
+            if self._files:
+                if hasattr(self._files, "connection"):
+                    self._files.connection.close()
+                    del self._files.connection
+                if hasattr(self._files, "slave_fd"):
+                    os.close(self._files.slave_fd)
+                    del self._files.slave_fd
+
+        await super()._main(build_logger, pipe_logger, loop=loop)
 
     def _spawn(self, args, fd_pipes=None, **kwargs):
         """
@@ -102,9 +170,21 @@ class ForkProcess(SpawnProcess):
                     stdin_dup, fcntl.F_SETFD, fcntl.fcntl(stdin_fd, fcntl.F_GETFD)
                 )
                 fd_pipes[0] = stdin_dup
+
+            if self._fd_pipes_send_handle:
+                # Handle fd_pipes in _main instead.
+                fd_pipes = None
+
             self._proc = multiprocessing.Process(
                 target=self._bootstrap,
-                args=(self._child_connection, fd_pipes, target, args, kwargs),
+                args=(
+                    self._child_connection,
+                    self._HAVE_SEND_HANDLE,
+                    fd_pipes,
+                    target,
+                    args,
+                    kwargs,
+                ),
             )
             self._proc.start()
         finally:
@@ -186,7 +266,7 @@ class ForkProcess(SpawnProcess):
             self._proc_join_task = None
 
     @staticmethod
-    def _bootstrap(child_connection, fd_pipes, target, args, kwargs):
+    def _bootstrap(child_connection, have_send_handle, fd_pipes, target, args, kwargs):
         # Use default signal handlers in order to avoid problems
         # killing subprocesses as reported in bug #353239.
         signal.signal(signal.SIGINT, signal.SIG_DFL)
@@ -205,10 +285,22 @@ class ForkProcess(SpawnProcess):
         portage.locks._close_fds()
 
         if child_connection is not None:
-            fd_pipes = fd_pipes or {}
-            fd_pipes[sys.stdout.fileno()] = child_connection.fileno()
-            fd_pipes[sys.stderr.fileno()] = child_connection.fileno()
-            fd_pipes[child_connection.fileno()] = child_connection.fileno()
+            if have_send_handle:
+                fd_pipes, fd_list = child_connection.recv()
+                fd_pipes_map = {}
+                for fd in fd_list:
+                    fd_pipes_map[fd] = multiprocessing.reduction.recv_handle(
+                        child_connection
+                    )
+                child_connection.close()
+                for k, v in list(fd_pipes.items()):
+                    fd_pipes[k] = fd_pipes_map[v]
+
+            else:
+                fd_pipes = fd_pipes or {}
+                fd_pipes[sys.stdout.fileno()] = child_connection.fileno()
+                fd_pipes[sys.stderr.fileno()] = child_connection.fileno()
+                fd_pipes[child_connection.fileno()] = child_connection.fileno()
 
         if fd_pipes:
             # We don't exec, so use close_fds=False


^ permalink raw reply related	[flat|nested] 6+ messages in thread

end of thread, other threads:[~2023-10-22  4:34 UTC | newest]

Thread overview: 6+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2023-10-22  4:34 [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/, lib/portage/tests/process/ Zac Medico
  -- strict thread matches above, loose matches on Subject: below --
2020-06-23  2:38 Zac Medico
2020-06-16  3:16 Zac Medico
2020-06-13  6:51 Zac Medico
2020-04-08  5:56 Zac Medico
2020-02-24 10:51 Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox