public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
From: "Zac Medico" <zmedico@gentoo.org>
To: gentoo-commits@lists.gentoo.org
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/
Date: Fri, 13 Apr 2018 16:48:19 +0000 (UTC)	[thread overview]
Message-ID: <1523603410.d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2.zmedico@gentoo> (raw)

commit:     d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Apr 12 03:56:25 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Apr 13 07:10:10 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=d31db4df

Implement _PortageEventLoop.subprocess_exec (bug 649588)

In python versions that support asyncio, this allows API consumers
to use the asyncio.create_subprocess_exec() function with portage's
internal event loop. Currently, subprocess.PIPE is not implemented
because that would require an implementation of asyncio's private
asyncio.unix_events._UnixReadPipeTransport class. However, it's
possible to use pipes created with os.pipe() for stdin, stdout,
and stderr, as demonstrated in the included unit tests.

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio/test_subprocess_exec.py   | 163 +++++++++++++++++++++
 pym/portage/util/futures/unix_events.py            |  98 +++++++++++++
 2 files changed, 261 insertions(+)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
new file mode 100644
index 000000000..d30f48c43
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -0,0 +1,163 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+
+from portage.process import find_binary
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.executor.fork import ForkExecutor
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+from _emerge.PipeReader import PipeReader
+
+
+def reader(input_file, loop=None):
+	"""
+	Asynchronously read a binary input file.
+
+	@param input_file: binary input file
+	@type input_file: file
+	@param loop: event loop
+	@type loop: EventLoop
+	@return: bytes
+	@rtype: asyncio.Future (or compatible)
+	"""
+	loop = loop or asyncio.get_event_loop()
+	loop = getattr(loop, '_asyncio_wrapper', loop)
+	future = loop.create_future()
+	_Reader(future, input_file, loop)
+	return future
+
+
+class _Reader(object):
+	def __init__(self, future, input_file, loop):
+		self._future = future
+		self._pipe_reader = PipeReader(
+			input_files={'input_file':input_file}, scheduler=loop._loop)
+
+		self._future.add_done_callback(self._cancel_callback)
+		self._pipe_reader.addExitListener(self._eof)
+		self._pipe_reader.start()
+
+	def _cancel_callback(self, future):
+		if future.cancelled():
+			self._cancel()
+
+	def _eof(self, pipe_reader):
+		self._pipe_reader = None
+		self._future.set_result(pipe_reader.getvalue())
+
+	def _cancel(self):
+		if self._pipe_reader is not None and self._pipe_reader.poll() is None:
+			self._pipe_reader.removeExitListener(self._eof)
+			self._pipe_reader.cancel()
+			self._pipe_reader = None
+
+
+class SubprocessExecTestCase(TestCase):
+	def _run_test(self, test):
+		initial_policy = asyncio.get_event_loop_policy()
+		if not isinstance(initial_policy, DefaultEventLoopPolicy):
+			asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+		try:
+			test(asyncio.get_event_loop())
+		finally:
+			asyncio.set_event_loop_policy(initial_policy)
+
+	def testEcho(self):
+		if not hasattr(asyncio, 'create_subprocess_exec'):
+			self.skipTest('create_subprocess_exec not implemented for python2')
+
+		args_tuple = (b'hello', b'world')
+		echo_binary = find_binary("echo")
+		self.assertNotEqual(echo_binary, None)
+		echo_binary = echo_binary.encode()
+
+		# Use os.pipe(), since this loop does not implement the
+		# ReadTransport necessary for subprocess.PIPE support.
+		stdout_pr, stdout_pw = os.pipe()
+		stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
+		stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+		files = [stdout_pr, stdout_pw]
+
+		def test(loop):
+			output = None
+			try:
+				with open(os.devnull, 'rb', 0) as devnull:
+					proc = loop.run_until_complete(
+						asyncio.create_subprocess_exec(
+						echo_binary, *args_tuple,
+						stdin=devnull, stdout=stdout_pw, stderr=stdout_pw))
+
+				# This belongs exclusively to the subprocess now.
+				stdout_pw.close()
+
+				output = asyncio.ensure_future(
+					reader(stdout_pr, loop=loop), loop=loop)
+
+				self.assertEqual(
+					loop.run_until_complete(proc.wait()), os.EX_OK)
+				self.assertEqual(
+					tuple(loop.run_until_complete(output).split()), args_tuple)
+			finally:
+				if output is not None and not output.done():
+					output.cancel()
+				for f in files:
+					f.close()
+
+		self._run_test(test)
+
+	def testCat(self):
+		if not hasattr(asyncio, 'create_subprocess_exec'):
+			self.skipTest('create_subprocess_exec not implemented for python2')
+
+		stdin_data = b'hello world'
+		cat_binary = find_binary("cat")
+		self.assertNotEqual(cat_binary, None)
+		cat_binary = cat_binary.encode()
+
+		# Use os.pipe(), since this loop does not implement the
+		# ReadTransport necessary for subprocess.PIPE support.
+		stdout_pr, stdout_pw = os.pipe()
+		stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
+		stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+
+		stdin_pr, stdin_pw = os.pipe()
+		stdin_pr = os.fdopen(stdin_pr, 'rb', 0)
+		stdin_pw = os.fdopen(stdin_pw, 'wb', 0)
+
+		files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw]
+
+		def test(loop):
+			output = None
+			try:
+				proc = loop.run_until_complete(
+					asyncio.create_subprocess_exec(
+					cat_binary,
+					stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw))
+
+				# These belong exclusively to the subprocess now.
+				stdout_pw.close()
+				stdin_pr.close()
+
+				output = asyncio.ensure_future(
+					reader(stdout_pr, loop=loop), loop=loop)
+
+				with ForkExecutor(loop=loop) as executor:
+					writer = asyncio.ensure_future(loop.run_in_executor(
+						executor, stdin_pw.write, stdin_data), loop=loop)
+
+					# This belongs exclusively to the writer now.
+					stdin_pw.close()
+					loop.run_until_complete(writer)
+
+				self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
+				self.assertEqual(loop.run_until_complete(output), stdin_data)
+			finally:
+				if output is not None and not output.done():
+					output.cancel()
+				for f in files:
+					f.close()
+
+		self._run_test(test)

diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
index 5434cd942..1abc420e1 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -7,11 +7,15 @@ __all__ = (
 )
 
 try:
+	from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport
 	from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
 except ImportError:
 	_AbstractChildWatcher = object
+	_BaseSubprocessTransport = object
 
+import functools
 import os
+import subprocess
 
 from portage.util._eventloop.global_event_loop import (
 	global_event_loop as _global_event_loop,
@@ -77,6 +81,100 @@ class _PortageEventLoop(events.AbstractEventLoop):
 		"""
 		return asyncio.Task(coro, loop=self)
 
+	def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
+		"""
+		Run subprocesses asynchronously using the subprocess module.
+
+		@type protocol_factory: callable
+		@param protocol_factory: must instantiate a subclass of the
+			asyncio.SubprocessProtocol class
+		@type program: str or bytes
+		@param program: the program to execute
+		@type args: str or bytes
+		@param args: program's arguments
+		@type kwargs: varies
+		@param kwargs: subprocess.Popen parameters
+		@rtype: asyncio.Future
+		@return: Returns a pair of (transport, protocol), where transport
+			is an instance of BaseSubprocessTransport
+		"""
+
+		# python2.7 does not allow arguments with defaults after *args
+		stdin = kwargs.pop('stdin', subprocess.PIPE)
+		stdout = kwargs.pop('stdout', subprocess.PIPE)
+		stderr = kwargs.pop('stderr', subprocess.PIPE)
+
+		if subprocess.PIPE in (stdin, stdout, stderr):
+			# Requires connect_read/write_pipe implementation, for example
+			# see asyncio.unix_events._UnixReadPipeTransport.
+			raise NotImplementedError()
+
+		universal_newlines = kwargs.pop('universal_newlines', False)
+		shell = kwargs.pop('shell', False)
+		bufsize = kwargs.pop('bufsize', 0)
+
+		if universal_newlines:
+			raise ValueError("universal_newlines must be False")
+		if shell:
+			raise ValueError("shell must be False")
+		if bufsize != 0:
+			raise ValueError("bufsize must be 0")
+		popen_args = (program,) + args
+		for arg in popen_args:
+			if not isinstance(arg, (str, bytes)):
+				raise TypeError("program arguments must be "
+								"a bytes or text string, not %s"
+								% type(arg).__name__)
+		result = self.create_future()
+		self._make_subprocess_transport(
+			result, protocol_factory(), popen_args, False, stdin, stdout, stderr,
+			bufsize, **kwargs)
+		return result
+
+	def _make_subprocess_transport(self, result, protocol, args, shell,
+		stdin, stdout, stderr, bufsize, extra=None, **kwargs):
+		waiter = self.create_future()
+		transp = _UnixSubprocessTransport(self,
+			protocol, args, shell, stdin, stdout, stderr, bufsize,
+			waiter=waiter, extra=extra,
+			**kwargs)
+
+		self._loop._asyncio_child_watcher.add_child_handler(
+			transp.get_pid(), self._child_watcher_callback, transp)
+
+		waiter.add_done_callback(functools.partial(
+			self._subprocess_transport_callback, transp, protocol, result))
+
+	def _subprocess_transport_callback(self, transp, protocol, result, waiter):
+		if waiter.exception() is None:
+			result.set_result((transp, protocol))
+		else:
+			transp.close()
+			wait_transp = asyncio.ensure_future(transp._wait(), loop=self)
+			wait_transp.add_done_callback(
+				functools.partial(self._subprocess_transport_failure,
+				result, waiter.exception()))
+
+	def _child_watcher_callback(self, pid, returncode, transp):
+		self.call_soon_threadsafe(transp._process_exited, returncode)
+
+	def _subprocess_transport_failure(self, result, exception, wait_transp):
+		result.set_exception(wait_transp.exception() or exception)
+
+
+class _UnixSubprocessTransport(_BaseSubprocessTransport):
+	"""
+	This is identical to the standard library's private
+	asyncio.unix_events._UnixSubprocessTransport class, except that
+	subprocess.PIPE is not implemented for stdin, since that would
+	require connect_write_pipe support in the event loop. For example,
+	see the asyncio.unix_events._UnixWritePipeTransport class.
+	"""
+	def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+		self._proc = subprocess.Popen(
+			args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
+			universal_newlines=False, bufsize=bufsize, **kwargs)
+
 
 class AbstractChildWatcher(_AbstractChildWatcher):
 	def add_child_handler(self, pid, callback, *args):


             reply	other threads:[~2018-04-13 16:48 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2018-04-13 16:48 Zac Medico [this message]
  -- strict thread matches above, loose matches on Subject: below --
2018-04-16  0:56 [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/ Zac Medico

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1523603410.d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2.zmedico@gentoo \
    --to=zmedico@gentoo.org \
    --cc=gentoo-commits@lists.gentoo.org \
    --cc=gentoo-dev@lists.gentoo.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox