public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/asyncio/, pym/portage/util/_eventloop/
@ 2018-04-30  2:17 Zac Medico
  0 siblings, 0 replies; 2+ messages in thread
From: Zac Medico @ 2018-04-30  2:17 UTC (permalink / raw
  To: gentoo-commits

commit:     c77afbc31fa687cc612a6f946b324bf4d74d8175
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Apr 30 01:49:18 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Apr 30 02:14:41 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=c77afbc3

EventLoop: call add_reader/writer callbacks after pipe is closed (bug 654382)

Callbacks registered via add_reader/writer methods need to be called
when the other end of a pipe is closed, which does not result in a
normal read or write event. Therefore, respond to other event types
as well, for compatibility with the asyncio event loop implementation.

The included unit tests demonstrate asyncio compatible behavior for
both reader and writer callbacks.

Bug: https://bugs.gentoo.org/654382

 .../tests/util/futures/asyncio/test_pipe_closed.py | 133 +++++++++++++++++++++
 pym/portage/util/_eventloop/EventLoop.py           |   7 +-
 2 files changed, 138 insertions(+), 2 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
new file mode 100644
index 000000000..1ecddab78
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
@@ -0,0 +1,133 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import os
+import pty
+import shutil
+import socket
+import sys
+import tempfile
+
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.unix_events import (
+	DefaultEventLoopPolicy,
+	_set_nonblocking,
+)
+
+
+class _PipeClosedTestCase(object):
+
+	def test_pipe(self):
+		read_end, write_end = os.pipe()
+		self._do_test(read_end, write_end)
+
+	def test_pty_device(self):
+		try:
+			read_end, write_end = pty.openpty()
+		except EnvironmentError:
+			self.skipTest('pty not available')
+		self._do_test(read_end, write_end)
+
+	def test_domain_socket(self):
+		if sys.version_info >= (3, 2):
+			read_end, write_end = socket.socketpair()
+		else:
+			self.skipTest('socket detach not supported')
+		self._do_test(read_end.detach(), write_end.detach())
+
+	def test_named_pipe(self):
+		tempdir = tempfile.mkdtemp()
+		try:
+			fifo_path = os.path.join(tempdir, 'fifo')
+			os.mkfifo(fifo_path)
+			self._do_test(os.open(fifo_path, os.O_NONBLOCK|os.O_RDONLY),
+				os.open(fifo_path, os.O_NONBLOCK|os.O_WRONLY))
+		finally:
+			shutil.rmtree(tempdir)
+
+
+class ReaderPipeClosedTestCase(_PipeClosedTestCase, TestCase):
+	"""
+	Test that a reader callback is called after the other end of
+	the pipe has been closed.
+	"""
+	def _do_test(self, read_end, write_end):
+		initial_policy = asyncio.get_event_loop_policy()
+		if not isinstance(initial_policy, DefaultEventLoopPolicy):
+			asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+		loop = asyncio.get_event_loop()
+		read_end = os.fdopen(read_end, 'rb', 0)
+		write_end = os.fdopen(write_end, 'wb', 0)
+		try:
+			def reader_callback():
+				if not reader_callback.called.done():
+					reader_callback.called.set_result(None)
+
+			reader_callback.called = loop.create_future()
+			loop.add_reader(read_end.fileno(), reader_callback)
+
+			# Allow the loop to check for IO events, and assert
+			# that our future is still not done.
+			loop.run_until_complete(asyncio.sleep(0, loop=loop))
+			self.assertFalse(reader_callback.called.done())
+
+			# Demonstrate that the callback is called afer the
+			# other end of the pipe has been closed.
+			write_end.close()
+			loop.run_until_complete(reader_callback.called)
+		finally:
+			loop.remove_reader(read_end.fileno())
+			write_end.close()
+			read_end.close()
+			asyncio.set_event_loop_policy(initial_policy)
+
+
+class WriterPipeClosedTestCase(_PipeClosedTestCase, TestCase):
+	"""
+	Test that a writer callback is called after the other end of
+	the pipe has been closed.
+	"""
+	def _do_test(self, read_end, write_end):
+		initial_policy = asyncio.get_event_loop_policy()
+		if not isinstance(initial_policy, DefaultEventLoopPolicy):
+			asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+		loop = asyncio.get_event_loop()
+		read_end = os.fdopen(read_end, 'rb', 0)
+		write_end = os.fdopen(write_end, 'wb', 0)
+		try:
+			def writer_callback():
+				if not writer_callback.called.done():
+					writer_callback.called.set_result(None)
+
+			writer_callback.called = loop.create_future()
+			_set_nonblocking(write_end.fileno())
+			loop.add_writer(write_end.fileno(), writer_callback)
+
+			# Fill up the pipe, so that no writer callbacks should be
+			# received until the state has changed.
+			while True:
+				try:
+					os.write(write_end.fileno(), 512 * b'0')
+				except EnvironmentError as e:
+					if e.errno != errno.EAGAIN:
+						raise
+					break
+
+			# Allow the loop to check for IO events, and assert
+			# that our future is still not done.
+			loop.run_until_complete(asyncio.sleep(0, loop=loop))
+			self.assertFalse(writer_callback.called.done())
+
+			# Demonstrate that the callback is called afer the
+			# other end of the pipe has been closed.
+			read_end.close()
+			loop.run_until_complete(writer_callback.called)
+		finally:
+			loop.remove_writer(write_end.fileno())
+			write_end.close()
+			read_end.close()
+			asyncio.set_event_loop_policy(initial_policy)

diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
index 6a8b906ed..fc7380b03 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -192,8 +192,11 @@ class EventLoop(object):
 			self.IO_OUT = PollConstants.POLLOUT
 			self.IO_PRI = PollConstants.POLLPRI
 
-		self._EVENT_READ = self.IO_IN | self.IO_HUP
-		self._EVENT_WRITE = self.IO_OUT
+		# These trigger both reader and writer callbacks.
+		EVENT_SHARED = self.IO_HUP | self.IO_ERR | self.IO_NVAL
+
+		self._EVENT_READ = self.IO_IN | EVENT_SHARED
+		self._EVENT_WRITE = self.IO_OUT | EVENT_SHARED
 
 		self._child_handlers = {}
 		self._sigchld_read = None


^ permalink raw reply related	[flat|nested] 2+ messages in thread

* [gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/asyncio/, pym/portage/util/_eventloop/
@ 2018-05-25  2:03 Zac Medico
  0 siblings, 0 replies; 2+ messages in thread
From: Zac Medico @ 2018-05-25  2:03 UTC (permalink / raw
  To: gentoo-commits

commit:     adee194534f0b3d9762efd1e8e8713c316b93f5a
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu May 24 22:36:29 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri May 25 02:01:27 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=adee1945

AsyncioEventLoop: suppress BlockingIOError warning (bug 655656)

Override AbstractEventLoop.run_until_complete() to prevent
BlockingIOError from occurring when the event loop is not running,
by using signal.set_wakeup_fd(-1) to temporarily disable the wakeup
fd. In order to avoid potential interference with API consumers,
only modify wakeup fd when portage._interal_caller is True.

Bug: https://bugs.gentoo.org/655656

 .../util/futures/asyncio/test_wakeup_fd_sigchld.py | 76 ++++++++++++++++++++++
 pym/portage/util/_eventloop/asyncio_event_loop.py  | 37 +++++++++--
 2 files changed, 106 insertions(+), 7 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_wakeup_fd_sigchld.py b/pym/portage/tests/util/futures/asyncio/test_wakeup_fd_sigchld.py
new file mode 100644
index 000000000..abc67c241
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_wakeup_fd_sigchld.py
@@ -0,0 +1,76 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+import subprocess
+
+import portage
+from portage.const import PORTAGE_PYM_PATH
+from portage.tests import TestCase
+from portage.util._eventloop.global_event_loop import _asyncio_enabled
+
+
+class WakeupFdSigchldTestCase(TestCase):
+	def testWakeupFdSigchld(self):
+		"""
+		This is expected to trigger a bunch of messages like the following
+		unless the fix for bug 655656 works as intended:
+
+		Exception ignored when trying to write to the signal wakeup fd:
+		BlockingIOError: [Errno 11] Resource temporarily unavailable
+		"""
+		if not _asyncio_enabled:
+			self.skipTest('asyncio not enabled')
+
+		script = """
+import asyncio as _real_asyncio
+import os
+import signal
+import sys
+
+import portage
+
+# In order to avoid potential interference with API consumers, wakeup
+# fd handling is enabled only when portage._interal_caller is True.
+portage._internal_caller = True
+
+from portage.util.futures import asyncio
+
+loop = asyncio._wrap_loop()
+
+# Cause the loop to register a child watcher.
+proc = loop.run_until_complete(_real_asyncio.create_subprocess_exec('sleep', '0'))
+loop.run_until_complete(proc.wait())
+
+for i in range(8192):
+	os.kill(os.getpid(), signal.SIGCHLD)
+
+# Verify that the child watcher still works correctly
+# (this will hang if it doesn't).
+proc = loop.run_until_complete(_real_asyncio.create_subprocess_exec('sleep', '0'))
+loop.run_until_complete(proc.wait())
+loop.close()
+sys.stdout.write('success')
+sys.exit(os.EX_OK)
+"""
+
+		pythonpath = os.environ.get('PYTHONPATH', '').strip().split(':')
+		if not pythonpath or pythonpath[0] != PORTAGE_PYM_PATH:
+			pythonpath = [PORTAGE_PYM_PATH] + pythonpath
+		pythonpath = ':'.join(filter(None, pythonpath))
+
+		proc = subprocess.Popen(
+			[portage._python_interpreter, '-c', script],
+			stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+			env=dict(os.environ, PYTHONPATH=pythonpath))
+
+		out, err = proc.communicate()
+		try:
+			self.assertEqual(out[:100], b'success')
+		except Exception:
+			portage.writemsg(''.join('{}\n'.format(line)
+				for line in out.decode(errors='replace').splitlines()[:50]),
+				noiselevel=-1)
+			raise
+
+		self.assertEqual(proc.wait(), os.EX_OK)

diff --git a/pym/portage/util/_eventloop/asyncio_event_loop.py b/pym/portage/util/_eventloop/asyncio_event_loop.py
index bf5937de8..65b354544 100644
--- a/pym/portage/util/_eventloop/asyncio_event_loop.py
+++ b/pym/portage/util/_eventloop/asyncio_event_loop.py
@@ -1,6 +1,7 @@
 # Copyright 2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
+import os
 import signal
 
 try:
@@ -11,6 +12,8 @@ except ImportError:
 	_real_asyncio = None
 	_AbstractEventLoop = object
 
+import portage
+
 
 class AsyncioEventLoop(_AbstractEventLoop):
 	"""
@@ -26,13 +29,15 @@ class AsyncioEventLoop(_AbstractEventLoop):
 	def __init__(self, loop=None):
 		loop = loop or _real_asyncio.get_event_loop()
 		self._loop = loop
-		self.run_until_complete = loop.run_until_complete
+		self.run_until_complete = (self._run_until_complete
+			if portage._internal_caller else loop.run_until_complete)
 		self.call_soon = loop.call_soon
 		self.call_soon_threadsafe = loop.call_soon_threadsafe
 		self.call_later = loop.call_later
 		self.call_at = loop.call_at
 		self.is_running = loop.is_running
 		self.is_closed = loop.is_closed
+		self.close = loop.close
 		self.create_future = (loop.create_future
 			if hasattr(loop, 'create_future') else self._create_future)
 		self.create_task = loop.create_task
@@ -46,6 +51,7 @@ class AsyncioEventLoop(_AbstractEventLoop):
 		self.call_exception_handler = loop.call_exception_handler
 		self.set_debug = loop.set_debug
 		self.get_debug = loop.get_debug
+		self._wakeup_fd = -1
 
 	def _create_future(self):
 		"""
@@ -77,9 +83,26 @@ class AsyncioEventLoop(_AbstractEventLoop):
 		"""
 		return self
 
-	def close(self):
-		# Suppress spurious error messages like the following for bug 655656:
-		#   Exception ignored when trying to write to the signal wakeup fd:
-		#   BlockingIOError: [Errno 11] Resource temporarily unavailable
-		self._loop.remove_signal_handler(signal.SIGCHLD)
-		self._loop.close()
+	def _run_until_complete(self, future):
+		"""
+		An implementation of AbstractEventLoop.run_until_complete that supresses
+		spurious error messages like the following reported in bug 655656:
+
+		    Exception ignored when trying to write to the signal wakeup fd:
+		    BlockingIOError: [Errno 11] Resource temporarily unavailable
+
+		In order to avoid potential interference with API consumers, this
+		implementation is only used when portage._internal_caller is True.
+		"""
+		if self._wakeup_fd != -1:
+			signal.set_wakeup_fd(self._wakeup_fd)
+			self._wakeup_fd = -1
+			# Account for any signals that may have arrived between
+			# set_wakeup_fd calls.
+			os.kill(os.getpid(), signal.SIGCHLD)
+		try:
+			return self._loop.run_until_complete(future)
+		finally:
+			self._wakeup_fd = signal.set_wakeup_fd(-1)
+			if self._wakeup_fd != -1:
+				signal.set_wakeup_fd(-1)


^ permalink raw reply related	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2018-05-25  2:03 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2018-04-30  2:17 [gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/asyncio/, pym/portage/util/_eventloop/ Zac Medico
  -- strict thread matches above, loose matches on Subject: below --
2018-05-25  2:03 Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox