From: "Zac Medico" <zmedico@gentoo.org>
To: gentoo-commits@lists.gentoo.org
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/
Date: Fri, 13 Apr 2018 16:48:19 +0000 (UTC) [thread overview]
Message-ID: <1523603410.d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2.zmedico@gentoo> (raw)
commit: d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Apr 12 03:56:25 2018 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Apr 13 07:10:10 2018 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=d31db4df
Implement _PortageEventLoop.subprocess_exec (bug 649588)
In python versions that support asyncio, this allows API consumers
to use the asyncio.create_subprocess_exec() function with portage's
internal event loop. Currently, subprocess.PIPE is not implemented
because that would require an implementation of asyncio's private
asyncio.unix_events._UnixReadPipeTransport class. However, it's
possible to use pipes created with os.pipe() for stdin, stdout,
and stderr, as demonstrated in the included unit tests.
Bug: https://bugs.gentoo.org/649588
.../util/futures/asyncio/test_subprocess_exec.py | 163 +++++++++++++++++++++
pym/portage/util/futures/unix_events.py | 98 +++++++++++++
2 files changed, 261 insertions(+)
diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
new file mode 100644
index 000000000..d30f48c43
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -0,0 +1,163 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+
+from portage.process import find_binary
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.executor.fork import ForkExecutor
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+from _emerge.PipeReader import PipeReader
+
+
+def reader(input_file, loop=None):
+ """
+ Asynchronously read a binary input file.
+
+ @param input_file: binary input file
+ @type input_file: file
+ @param loop: event loop
+ @type loop: EventLoop
+ @return: bytes
+ @rtype: asyncio.Future (or compatible)
+ """
+ loop = loop or asyncio.get_event_loop()
+ loop = getattr(loop, '_asyncio_wrapper', loop)
+ future = loop.create_future()
+ _Reader(future, input_file, loop)
+ return future
+
+
+class _Reader(object):
+ def __init__(self, future, input_file, loop):
+ self._future = future
+ self._pipe_reader = PipeReader(
+ input_files={'input_file':input_file}, scheduler=loop._loop)
+
+ self._future.add_done_callback(self._cancel_callback)
+ self._pipe_reader.addExitListener(self._eof)
+ self._pipe_reader.start()
+
+ def _cancel_callback(self, future):
+ if future.cancelled():
+ self._cancel()
+
+ def _eof(self, pipe_reader):
+ self._pipe_reader = None
+ self._future.set_result(pipe_reader.getvalue())
+
+ def _cancel(self):
+ if self._pipe_reader is not None and self._pipe_reader.poll() is None:
+ self._pipe_reader.removeExitListener(self._eof)
+ self._pipe_reader.cancel()
+ self._pipe_reader = None
+
+
+class SubprocessExecTestCase(TestCase):
+ def _run_test(self, test):
+ initial_policy = asyncio.get_event_loop_policy()
+ if not isinstance(initial_policy, DefaultEventLoopPolicy):
+ asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+ try:
+ test(asyncio.get_event_loop())
+ finally:
+ asyncio.set_event_loop_policy(initial_policy)
+
+ def testEcho(self):
+ if not hasattr(asyncio, 'create_subprocess_exec'):
+ self.skipTest('create_subprocess_exec not implemented for python2')
+
+ args_tuple = (b'hello', b'world')
+ echo_binary = find_binary("echo")
+ self.assertNotEqual(echo_binary, None)
+ echo_binary = echo_binary.encode()
+
+ # Use os.pipe(), since this loop does not implement the
+ # ReadTransport necessary for subprocess.PIPE support.
+ stdout_pr, stdout_pw = os.pipe()
+ stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
+ stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+ files = [stdout_pr, stdout_pw]
+
+ def test(loop):
+ output = None
+ try:
+ with open(os.devnull, 'rb', 0) as devnull:
+ proc = loop.run_until_complete(
+ asyncio.create_subprocess_exec(
+ echo_binary, *args_tuple,
+ stdin=devnull, stdout=stdout_pw, stderr=stdout_pw))
+
+ # This belongs exclusively to the subprocess now.
+ stdout_pw.close()
+
+ output = asyncio.ensure_future(
+ reader(stdout_pr, loop=loop), loop=loop)
+
+ self.assertEqual(
+ loop.run_until_complete(proc.wait()), os.EX_OK)
+ self.assertEqual(
+ tuple(loop.run_until_complete(output).split()), args_tuple)
+ finally:
+ if output is not None and not output.done():
+ output.cancel()
+ for f in files:
+ f.close()
+
+ self._run_test(test)
+
+ def testCat(self):
+ if not hasattr(asyncio, 'create_subprocess_exec'):
+ self.skipTest('create_subprocess_exec not implemented for python2')
+
+ stdin_data = b'hello world'
+ cat_binary = find_binary("cat")
+ self.assertNotEqual(cat_binary, None)
+ cat_binary = cat_binary.encode()
+
+ # Use os.pipe(), since this loop does not implement the
+ # ReadTransport necessary for subprocess.PIPE support.
+ stdout_pr, stdout_pw = os.pipe()
+ stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
+ stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+
+ stdin_pr, stdin_pw = os.pipe()
+ stdin_pr = os.fdopen(stdin_pr, 'rb', 0)
+ stdin_pw = os.fdopen(stdin_pw, 'wb', 0)
+
+ files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw]
+
+ def test(loop):
+ output = None
+ try:
+ proc = loop.run_until_complete(
+ asyncio.create_subprocess_exec(
+ cat_binary,
+ stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw))
+
+ # These belong exclusively to the subprocess now.
+ stdout_pw.close()
+ stdin_pr.close()
+
+ output = asyncio.ensure_future(
+ reader(stdout_pr, loop=loop), loop=loop)
+
+ with ForkExecutor(loop=loop) as executor:
+ writer = asyncio.ensure_future(loop.run_in_executor(
+ executor, stdin_pw.write, stdin_data), loop=loop)
+
+ # This belongs exclusively to the writer now.
+ stdin_pw.close()
+ loop.run_until_complete(writer)
+
+ self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
+ self.assertEqual(loop.run_until_complete(output), stdin_data)
+ finally:
+ if output is not None and not output.done():
+ output.cancel()
+ for f in files:
+ f.close()
+
+ self._run_test(test)
diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
index 5434cd942..1abc420e1 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -7,11 +7,15 @@ __all__ = (
)
try:
+ from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport
from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
except ImportError:
_AbstractChildWatcher = object
+ _BaseSubprocessTransport = object
+import functools
import os
+import subprocess
from portage.util._eventloop.global_event_loop import (
global_event_loop as _global_event_loop,
@@ -77,6 +81,100 @@ class _PortageEventLoop(events.AbstractEventLoop):
"""
return asyncio.Task(coro, loop=self)
+ def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
+ """
+ Run subprocesses asynchronously using the subprocess module.
+
+ @type protocol_factory: callable
+ @param protocol_factory: must instantiate a subclass of the
+ asyncio.SubprocessProtocol class
+ @type program: str or bytes
+ @param program: the program to execute
+ @type args: str or bytes
+ @param args: program's arguments
+ @type kwargs: varies
+ @param kwargs: subprocess.Popen parameters
+ @rtype: asyncio.Future
+ @return: Returns a pair of (transport, protocol), where transport
+ is an instance of BaseSubprocessTransport
+ """
+
+ # python2.7 does not allow arguments with defaults after *args
+ stdin = kwargs.pop('stdin', subprocess.PIPE)
+ stdout = kwargs.pop('stdout', subprocess.PIPE)
+ stderr = kwargs.pop('stderr', subprocess.PIPE)
+
+ if subprocess.PIPE in (stdin, stdout, stderr):
+ # Requires connect_read/write_pipe implementation, for example
+ # see asyncio.unix_events._UnixReadPipeTransport.
+ raise NotImplementedError()
+
+ universal_newlines = kwargs.pop('universal_newlines', False)
+ shell = kwargs.pop('shell', False)
+ bufsize = kwargs.pop('bufsize', 0)
+
+ if universal_newlines:
+ raise ValueError("universal_newlines must be False")
+ if shell:
+ raise ValueError("shell must be False")
+ if bufsize != 0:
+ raise ValueError("bufsize must be 0")
+ popen_args = (program,) + args
+ for arg in popen_args:
+ if not isinstance(arg, (str, bytes)):
+ raise TypeError("program arguments must be "
+ "a bytes or text string, not %s"
+ % type(arg).__name__)
+ result = self.create_future()
+ self._make_subprocess_transport(
+ result, protocol_factory(), popen_args, False, stdin, stdout, stderr,
+ bufsize, **kwargs)
+ return result
+
+ def _make_subprocess_transport(self, result, protocol, args, shell,
+ stdin, stdout, stderr, bufsize, extra=None, **kwargs):
+ waiter = self.create_future()
+ transp = _UnixSubprocessTransport(self,
+ protocol, args, shell, stdin, stdout, stderr, bufsize,
+ waiter=waiter, extra=extra,
+ **kwargs)
+
+ self._loop._asyncio_child_watcher.add_child_handler(
+ transp.get_pid(), self._child_watcher_callback, transp)
+
+ waiter.add_done_callback(functools.partial(
+ self._subprocess_transport_callback, transp, protocol, result))
+
+ def _subprocess_transport_callback(self, transp, protocol, result, waiter):
+ if waiter.exception() is None:
+ result.set_result((transp, protocol))
+ else:
+ transp.close()
+ wait_transp = asyncio.ensure_future(transp._wait(), loop=self)
+ wait_transp.add_done_callback(
+ functools.partial(self._subprocess_transport_failure,
+ result, waiter.exception()))
+
+ def _child_watcher_callback(self, pid, returncode, transp):
+ self.call_soon_threadsafe(transp._process_exited, returncode)
+
+ def _subprocess_transport_failure(self, result, exception, wait_transp):
+ result.set_exception(wait_transp.exception() or exception)
+
+
+class _UnixSubprocessTransport(_BaseSubprocessTransport):
+ """
+ This is identical to the standard library's private
+ asyncio.unix_events._UnixSubprocessTransport class, except that
+ subprocess.PIPE is not implemented for stdin, since that would
+ require connect_write_pipe support in the event loop. For example,
+ see the asyncio.unix_events._UnixWritePipeTransport class.
+ """
+ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+ self._proc = subprocess.Popen(
+ args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
+ universal_newlines=False, bufsize=bufsize, **kwargs)
+
class AbstractChildWatcher(_AbstractChildWatcher):
def add_child_handler(self, pid, callback, *args):
next reply other threads:[~2018-04-13 16:48 UTC|newest]
Thread overview: 2+ messages / expand[flat|nested] mbox.gz Atom feed top
2018-04-13 16:48 Zac Medico [this message]
-- strict thread matches above, loose matches on Subject: below --
2018-04-16 0:56 [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/ Zac Medico
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1523603410.d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2.zmedico@gentoo \
--to=zmedico@gentoo.org \
--cc=gentoo-commits@lists.gentoo.org \
--cc=gentoo-dev@lists.gentoo.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox