public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/
@ 2018-04-13 16:48 Zac Medico
  0 siblings, 0 replies; 2+ messages in thread
From: Zac Medico @ 2018-04-13 16:48 UTC (permalink / raw
  To: gentoo-commits

commit:     d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Apr 12 03:56:25 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Apr 13 07:10:10 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=d31db4df

Implement _PortageEventLoop.subprocess_exec (bug 649588)

In python versions that support asyncio, this allows API consumers
to use the asyncio.create_subprocess_exec() function with portage's
internal event loop. Currently, subprocess.PIPE is not implemented
because that would require an implementation of asyncio's private
asyncio.unix_events._UnixReadPipeTransport class. However, it's
possible to use pipes created with os.pipe() for stdin, stdout,
and stderr, as demonstrated in the included unit tests.

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio/test_subprocess_exec.py   | 163 +++++++++++++++++++++
 pym/portage/util/futures/unix_events.py            |  98 +++++++++++++
 2 files changed, 261 insertions(+)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
new file mode 100644
index 000000000..d30f48c43
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -0,0 +1,163 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+
+from portage.process import find_binary
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.executor.fork import ForkExecutor
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+from _emerge.PipeReader import PipeReader
+
+
+def reader(input_file, loop=None):
+	"""
+	Asynchronously read a binary input file.
+
+	@param input_file: binary input file
+	@type input_file: file
+	@param loop: event loop
+	@type loop: EventLoop
+	@return: bytes
+	@rtype: asyncio.Future (or compatible)
+	"""
+	loop = loop or asyncio.get_event_loop()
+	loop = getattr(loop, '_asyncio_wrapper', loop)
+	future = loop.create_future()
+	_Reader(future, input_file, loop)
+	return future
+
+
+class _Reader(object):
+	def __init__(self, future, input_file, loop):
+		self._future = future
+		self._pipe_reader = PipeReader(
+			input_files={'input_file':input_file}, scheduler=loop._loop)
+
+		self._future.add_done_callback(self._cancel_callback)
+		self._pipe_reader.addExitListener(self._eof)
+		self._pipe_reader.start()
+
+	def _cancel_callback(self, future):
+		if future.cancelled():
+			self._cancel()
+
+	def _eof(self, pipe_reader):
+		self._pipe_reader = None
+		self._future.set_result(pipe_reader.getvalue())
+
+	def _cancel(self):
+		if self._pipe_reader is not None and self._pipe_reader.poll() is None:
+			self._pipe_reader.removeExitListener(self._eof)
+			self._pipe_reader.cancel()
+			self._pipe_reader = None
+
+
+class SubprocessExecTestCase(TestCase):
+	def _run_test(self, test):
+		initial_policy = asyncio.get_event_loop_policy()
+		if not isinstance(initial_policy, DefaultEventLoopPolicy):
+			asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+		try:
+			test(asyncio.get_event_loop())
+		finally:
+			asyncio.set_event_loop_policy(initial_policy)
+
+	def testEcho(self):
+		if not hasattr(asyncio, 'create_subprocess_exec'):
+			self.skipTest('create_subprocess_exec not implemented for python2')
+
+		args_tuple = (b'hello', b'world')
+		echo_binary = find_binary("echo")
+		self.assertNotEqual(echo_binary, None)
+		echo_binary = echo_binary.encode()
+
+		# Use os.pipe(), since this loop does not implement the
+		# ReadTransport necessary for subprocess.PIPE support.
+		stdout_pr, stdout_pw = os.pipe()
+		stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
+		stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+		files = [stdout_pr, stdout_pw]
+
+		def test(loop):
+			output = None
+			try:
+				with open(os.devnull, 'rb', 0) as devnull:
+					proc = loop.run_until_complete(
+						asyncio.create_subprocess_exec(
+						echo_binary, *args_tuple,
+						stdin=devnull, stdout=stdout_pw, stderr=stdout_pw))
+
+				# This belongs exclusively to the subprocess now.
+				stdout_pw.close()
+
+				output = asyncio.ensure_future(
+					reader(stdout_pr, loop=loop), loop=loop)
+
+				self.assertEqual(
+					loop.run_until_complete(proc.wait()), os.EX_OK)
+				self.assertEqual(
+					tuple(loop.run_until_complete(output).split()), args_tuple)
+			finally:
+				if output is not None and not output.done():
+					output.cancel()
+				for f in files:
+					f.close()
+
+		self._run_test(test)
+
+	def testCat(self):
+		if not hasattr(asyncio, 'create_subprocess_exec'):
+			self.skipTest('create_subprocess_exec not implemented for python2')
+
+		stdin_data = b'hello world'
+		cat_binary = find_binary("cat")
+		self.assertNotEqual(cat_binary, None)
+		cat_binary = cat_binary.encode()
+
+		# Use os.pipe(), since this loop does not implement the
+		# ReadTransport necessary for subprocess.PIPE support.
+		stdout_pr, stdout_pw = os.pipe()
+		stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
+		stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+
+		stdin_pr, stdin_pw = os.pipe()
+		stdin_pr = os.fdopen(stdin_pr, 'rb', 0)
+		stdin_pw = os.fdopen(stdin_pw, 'wb', 0)
+
+		files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw]
+
+		def test(loop):
+			output = None
+			try:
+				proc = loop.run_until_complete(
+					asyncio.create_subprocess_exec(
+					cat_binary,
+					stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw))
+
+				# These belong exclusively to the subprocess now.
+				stdout_pw.close()
+				stdin_pr.close()
+
+				output = asyncio.ensure_future(
+					reader(stdout_pr, loop=loop), loop=loop)
+
+				with ForkExecutor(loop=loop) as executor:
+					writer = asyncio.ensure_future(loop.run_in_executor(
+						executor, stdin_pw.write, stdin_data), loop=loop)
+
+					# This belongs exclusively to the writer now.
+					stdin_pw.close()
+					loop.run_until_complete(writer)
+
+				self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
+				self.assertEqual(loop.run_until_complete(output), stdin_data)
+			finally:
+				if output is not None and not output.done():
+					output.cancel()
+				for f in files:
+					f.close()
+
+		self._run_test(test)

diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
index 5434cd942..1abc420e1 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -7,11 +7,15 @@ __all__ = (
 )
 
 try:
+	from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport
 	from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
 except ImportError:
 	_AbstractChildWatcher = object
+	_BaseSubprocessTransport = object
 
+import functools
 import os
+import subprocess
 
 from portage.util._eventloop.global_event_loop import (
 	global_event_loop as _global_event_loop,
@@ -77,6 +81,100 @@ class _PortageEventLoop(events.AbstractEventLoop):
 		"""
 		return asyncio.Task(coro, loop=self)
 
+	def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
+		"""
+		Run subprocesses asynchronously using the subprocess module.
+
+		@type protocol_factory: callable
+		@param protocol_factory: must instantiate a subclass of the
+			asyncio.SubprocessProtocol class
+		@type program: str or bytes
+		@param program: the program to execute
+		@type args: str or bytes
+		@param args: program's arguments
+		@type kwargs: varies
+		@param kwargs: subprocess.Popen parameters
+		@rtype: asyncio.Future
+		@return: Returns a pair of (transport, protocol), where transport
+			is an instance of BaseSubprocessTransport
+		"""
+
+		# python2.7 does not allow arguments with defaults after *args
+		stdin = kwargs.pop('stdin', subprocess.PIPE)
+		stdout = kwargs.pop('stdout', subprocess.PIPE)
+		stderr = kwargs.pop('stderr', subprocess.PIPE)
+
+		if subprocess.PIPE in (stdin, stdout, stderr):
+			# Requires connect_read/write_pipe implementation, for example
+			# see asyncio.unix_events._UnixReadPipeTransport.
+			raise NotImplementedError()
+
+		universal_newlines = kwargs.pop('universal_newlines', False)
+		shell = kwargs.pop('shell', False)
+		bufsize = kwargs.pop('bufsize', 0)
+
+		if universal_newlines:
+			raise ValueError("universal_newlines must be False")
+		if shell:
+			raise ValueError("shell must be False")
+		if bufsize != 0:
+			raise ValueError("bufsize must be 0")
+		popen_args = (program,) + args
+		for arg in popen_args:
+			if not isinstance(arg, (str, bytes)):
+				raise TypeError("program arguments must be "
+								"a bytes or text string, not %s"
+								% type(arg).__name__)
+		result = self.create_future()
+		self._make_subprocess_transport(
+			result, protocol_factory(), popen_args, False, stdin, stdout, stderr,
+			bufsize, **kwargs)
+		return result
+
+	def _make_subprocess_transport(self, result, protocol, args, shell,
+		stdin, stdout, stderr, bufsize, extra=None, **kwargs):
+		waiter = self.create_future()
+		transp = _UnixSubprocessTransport(self,
+			protocol, args, shell, stdin, stdout, stderr, bufsize,
+			waiter=waiter, extra=extra,
+			**kwargs)
+
+		self._loop._asyncio_child_watcher.add_child_handler(
+			transp.get_pid(), self._child_watcher_callback, transp)
+
+		waiter.add_done_callback(functools.partial(
+			self._subprocess_transport_callback, transp, protocol, result))
+
+	def _subprocess_transport_callback(self, transp, protocol, result, waiter):
+		if waiter.exception() is None:
+			result.set_result((transp, protocol))
+		else:
+			transp.close()
+			wait_transp = asyncio.ensure_future(transp._wait(), loop=self)
+			wait_transp.add_done_callback(
+				functools.partial(self._subprocess_transport_failure,
+				result, waiter.exception()))
+
+	def _child_watcher_callback(self, pid, returncode, transp):
+		self.call_soon_threadsafe(transp._process_exited, returncode)
+
+	def _subprocess_transport_failure(self, result, exception, wait_transp):
+		result.set_exception(wait_transp.exception() or exception)
+
+
+class _UnixSubprocessTransport(_BaseSubprocessTransport):
+	"""
+	This is identical to the standard library's private
+	asyncio.unix_events._UnixSubprocessTransport class, except that
+	subprocess.PIPE is not implemented for stdin, since that would
+	require connect_write_pipe support in the event loop. For example,
+	see the asyncio.unix_events._UnixWritePipeTransport class.
+	"""
+	def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+		self._proc = subprocess.Popen(
+			args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
+			universal_newlines=False, bufsize=bufsize, **kwargs)
+
 
 class AbstractChildWatcher(_AbstractChildWatcher):
 	def add_child_handler(self, pid, callback, *args):


^ permalink raw reply related	[flat|nested] 2+ messages in thread

* [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/
@ 2018-04-16  0:56 Zac Medico
  0 siblings, 0 replies; 2+ messages in thread
From: Zac Medico @ 2018-04-16  0:56 UTC (permalink / raw
  To: gentoo-commits

commit:     9a7b0a006e65f8683716d60574e4f19f8ffd603d
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat Apr 14 21:29:29 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Apr 16 00:04:26 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=9a7b0a00

Implement AbstractEventLoop.connect_read_pipe (bug 649588)

In python versions that support asyncio, this allows API consumers
to use subprocess.PIPE for asyncio.create_subprocess_exec() stdout
and stderr parameters.

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio/test_subprocess_exec.py   |  30 ++++
 pym/portage/util/futures/unix_events.py            | 157 ++++++++++++++++++++-
 2 files changed, 184 insertions(+), 3 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index d30f48c43..94984fc93 100644
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -2,6 +2,7 @@
 # Distributed under the terms of the GNU General Public License v2
 
 import os
+import subprocess
 
 from portage.process import find_binary
 from portage.tests import TestCase
@@ -161,3 +162,32 @@ class SubprocessExecTestCase(TestCase):
 					f.close()
 
 		self._run_test(test)
+
+	def testReadTransport(self):
+		"""
+		Test asyncio.create_subprocess_exec(stdout=subprocess.PIPE) which
+		requires an AbstractEventLoop.connect_read_pipe implementation
+		(and a ReadTransport implementation for it to return).
+		"""
+		if not hasattr(asyncio, 'create_subprocess_exec'):
+			self.skipTest('create_subprocess_exec not implemented for python2')
+
+		args_tuple = (b'hello', b'world')
+		echo_binary = find_binary("echo")
+		self.assertNotEqual(echo_binary, None)
+		echo_binary = echo_binary.encode()
+
+		def test(loop):
+			with open(os.devnull, 'rb', 0) as devnull:
+				proc = loop.run_until_complete(
+					asyncio.create_subprocess_exec(
+					echo_binary, *args_tuple,
+					stdin=devnull,
+					stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
+
+			self.assertEqual(
+				tuple(loop.run_until_complete(proc.stdout.read()).split()),
+				args_tuple)
+			self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
+
+		self._run_test(test)

diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
index d788c2bea..9d84ab6aa 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -9,12 +9,18 @@ __all__ = (
 try:
 	from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport
 	from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
+	from asyncio.transports import ReadTransport as _ReadTransport
 except ImportError:
 	_AbstractChildWatcher = object
 	_BaseSubprocessTransport = object
+	_ReadTransport = object
 
+import errno
+import fcntl
 import functools
+import logging
 import os
+import stat
 import subprocess
 
 from portage.util._eventloop.global_event_loop import (
@@ -82,6 +88,35 @@ class _PortageEventLoop(events.AbstractEventLoop):
 		"""
 		return asyncio.Task(coro, loop=self)
 
+	def connect_read_pipe(self, protocol_factory, pipe):
+		"""
+		Register read pipe in event loop. Set the pipe to non-blocking mode.
+
+		@type protocol_factory: callable
+		@param protocol_factory: must instantiate object with Protocol interface
+		@type pipe: file
+		@param pipe: a pipe to read from
+		@rtype: asyncio.Future
+		@return: Return pair (transport, protocol), where transport supports the
+			ReadTransport interface.
+		"""
+		protocol = protocol_factory()
+		result = self.create_future()
+		waiter = self.create_future()
+		transport = self._make_read_pipe_transport(pipe, protocol, waiter=waiter)
+
+		def waiter_callback(waiter):
+			try:
+				waiter.result()
+			except Exception as e:
+				transport.close()
+				result.set_exception(e)
+			else:
+				result.set_result((transport, protocol))
+
+		waiter.add_done_callback(waiter_callback)
+		return result
+
 	def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
 		"""
 		Run subprocesses asynchronously using the subprocess module.
@@ -105,9 +140,9 @@ class _PortageEventLoop(events.AbstractEventLoop):
 		stdout = kwargs.pop('stdout', subprocess.PIPE)
 		stderr = kwargs.pop('stderr', subprocess.PIPE)
 
-		if subprocess.PIPE in (stdin, stdout, stderr):
-			# Requires connect_read/write_pipe implementation, for example
-			# see asyncio.unix_events._UnixReadPipeTransport.
+		if stdin == subprocess.PIPE:
+			# Requires connect_write_pipe implementation, for example
+			# see asyncio.unix_events._UnixWritePipeTransport.
 			raise NotImplementedError()
 
 		universal_newlines = kwargs.pop('universal_newlines', False)
@@ -132,6 +167,10 @@ class _PortageEventLoop(events.AbstractEventLoop):
 			bufsize, **kwargs)
 		return result
 
+	def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
+								  extra=None):
+		return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
+
 	def _make_subprocess_transport(self, result, protocol, args, shell,
 		stdin, stdout, stderr, bufsize, extra=None, **kwargs):
 		waiter = self.create_future()
@@ -163,6 +202,118 @@ class _PortageEventLoop(events.AbstractEventLoop):
 		result.set_exception(wait_transp.exception() or exception)
 
 
+if hasattr(os, 'set_blocking'):
+	def _set_nonblocking(fd):
+		os.set_blocking(fd, False)
+else:
+	def _set_nonblocking(fd):
+		flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+		flags = flags | os.O_NONBLOCK
+		fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+
+
+class _UnixReadPipeTransport(_ReadTransport):
+	"""
+	This is identical to the standard library's private
+	asyncio.unix_events._UnixReadPipeTransport class, except that it
+	only calls public AbstractEventLoop methods.
+	"""
+
+	max_size = 256 * 1024  # max bytes we read in one event loop iteration
+
+	def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
+		super().__init__(extra)
+		self._extra['pipe'] = pipe
+		self._loop = loop
+		self._pipe = pipe
+		self._fileno = pipe.fileno()
+		self._protocol = protocol
+		self._closing = False
+
+		mode = os.fstat(self._fileno).st_mode
+		if not (stat.S_ISFIFO(mode) or
+				stat.S_ISSOCK(mode) or
+				stat.S_ISCHR(mode)):
+			self._pipe = None
+			self._fileno = None
+			self._protocol = None
+			raise ValueError("Pipe transport is for pipes/sockets only.")
+
+		_set_nonblocking(self._fileno)
+
+		self._loop.call_soon(self._protocol.connection_made, self)
+		# only start reading when connection_made() has been called
+		self._loop.call_soon(self._loop.add_reader,
+							 self._fileno, self._read_ready)
+		if waiter is not None:
+			# only wake up the waiter when connection_made() has been called
+			self._loop.call_soon(
+				lambda: None if waiter.cancelled() else waiter.set_result(None))
+
+	def _read_ready(self):
+		try:
+			data = os.read(self._fileno, self.max_size)
+		except (BlockingIOError, InterruptedError):
+			pass
+		except OSError as exc:
+			self._fatal_error(exc, 'Fatal read error on pipe transport')
+		else:
+			if data:
+				self._protocol.data_received(data)
+			else:
+				self._closing = True
+				self._loop.remove_reader(self._fileno)
+				self._loop.call_soon(self._protocol.eof_received)
+				self._loop.call_soon(self._call_connection_lost, None)
+
+	def pause_reading(self):
+		self._loop.remove_reader(self._fileno)
+
+	def resume_reading(self):
+		self._loop.add_reader(self._fileno, self._read_ready)
+
+	def set_protocol(self, protocol):
+		self._protocol = protocol
+
+	def get_protocol(self):
+		return self._protocol
+
+	def is_closing(self):
+		return self._closing
+
+	def close(self):
+		if not self._closing:
+			self._close(None)
+
+	def _fatal_error(self, exc, message='Fatal error on pipe transport'):
+		# should be called by exception handler only
+		if (isinstance(exc, OSError) and exc.errno == errno.EIO):
+			if self._loop.get_debug():
+				logging.debug("%r: %s", self, message, exc_info=True)
+		else:
+			self._loop.call_exception_handler({
+				'message': message,
+				'exception': exc,
+				'transport': self,
+				'protocol': self._protocol,
+			})
+		self._close(exc)
+
+	def _close(self, exc):
+		self._closing = True
+		self._loop.remove_reader(self._fileno)
+		self._loop.call_soon(self._call_connection_lost, exc)
+
+	def _call_connection_lost(self, exc):
+		try:
+			self._protocol.connection_lost(exc)
+		finally:
+			self._pipe.close()
+			self._pipe = None
+			self._protocol = None
+			self._loop = None
+
+
 class _UnixSubprocessTransport(_BaseSubprocessTransport):
 	"""
 	This is identical to the standard library's private


^ permalink raw reply related	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2018-04-16  0:56 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2018-04-16  0:56 [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/ Zac Medico
  -- strict thread matches above, loose matches on Subject: below --
2018-04-13 16:48 Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox