* [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/
@ 2018-04-13 16:48 Zac Medico
0 siblings, 0 replies; 2+ messages in thread
From: Zac Medico @ 2018-04-13 16:48 UTC (permalink / raw
To: gentoo-commits
commit: d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Apr 12 03:56:25 2018 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Apr 13 07:10:10 2018 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=d31db4df
Implement _PortageEventLoop.subprocess_exec (bug 649588)
In python versions that support asyncio, this allows API consumers
to use the asyncio.create_subprocess_exec() function with portage's
internal event loop. Currently, subprocess.PIPE is not implemented
because that would require an implementation of asyncio's private
asyncio.unix_events._UnixReadPipeTransport class. However, it's
possible to use pipes created with os.pipe() for stdin, stdout,
and stderr, as demonstrated in the included unit tests.
Bug: https://bugs.gentoo.org/649588
.../util/futures/asyncio/test_subprocess_exec.py | 163 +++++++++++++++++++++
pym/portage/util/futures/unix_events.py | 98 +++++++++++++
2 files changed, 261 insertions(+)
diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
new file mode 100644
index 000000000..d30f48c43
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -0,0 +1,163 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+
+from portage.process import find_binary
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.executor.fork import ForkExecutor
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+from _emerge.PipeReader import PipeReader
+
+
+def reader(input_file, loop=None):
+ """
+ Asynchronously read a binary input file.
+
+ @param input_file: binary input file
+ @type input_file: file
+ @param loop: event loop
+ @type loop: EventLoop
+ @return: bytes
+ @rtype: asyncio.Future (or compatible)
+ """
+ loop = loop or asyncio.get_event_loop()
+ loop = getattr(loop, '_asyncio_wrapper', loop)
+ future = loop.create_future()
+ _Reader(future, input_file, loop)
+ return future
+
+
+class _Reader(object):
+ def __init__(self, future, input_file, loop):
+ self._future = future
+ self._pipe_reader = PipeReader(
+ input_files={'input_file':input_file}, scheduler=loop._loop)
+
+ self._future.add_done_callback(self._cancel_callback)
+ self._pipe_reader.addExitListener(self._eof)
+ self._pipe_reader.start()
+
+ def _cancel_callback(self, future):
+ if future.cancelled():
+ self._cancel()
+
+ def _eof(self, pipe_reader):
+ self._pipe_reader = None
+ self._future.set_result(pipe_reader.getvalue())
+
+ def _cancel(self):
+ if self._pipe_reader is not None and self._pipe_reader.poll() is None:
+ self._pipe_reader.removeExitListener(self._eof)
+ self._pipe_reader.cancel()
+ self._pipe_reader = None
+
+
+class SubprocessExecTestCase(TestCase):
+ def _run_test(self, test):
+ initial_policy = asyncio.get_event_loop_policy()
+ if not isinstance(initial_policy, DefaultEventLoopPolicy):
+ asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+ try:
+ test(asyncio.get_event_loop())
+ finally:
+ asyncio.set_event_loop_policy(initial_policy)
+
+ def testEcho(self):
+ if not hasattr(asyncio, 'create_subprocess_exec'):
+ self.skipTest('create_subprocess_exec not implemented for python2')
+
+ args_tuple = (b'hello', b'world')
+ echo_binary = find_binary("echo")
+ self.assertNotEqual(echo_binary, None)
+ echo_binary = echo_binary.encode()
+
+ # Use os.pipe(), since this loop does not implement the
+ # ReadTransport necessary for subprocess.PIPE support.
+ stdout_pr, stdout_pw = os.pipe()
+ stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
+ stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+ files = [stdout_pr, stdout_pw]
+
+ def test(loop):
+ output = None
+ try:
+ with open(os.devnull, 'rb', 0) as devnull:
+ proc = loop.run_until_complete(
+ asyncio.create_subprocess_exec(
+ echo_binary, *args_tuple,
+ stdin=devnull, stdout=stdout_pw, stderr=stdout_pw))
+
+ # This belongs exclusively to the subprocess now.
+ stdout_pw.close()
+
+ output = asyncio.ensure_future(
+ reader(stdout_pr, loop=loop), loop=loop)
+
+ self.assertEqual(
+ loop.run_until_complete(proc.wait()), os.EX_OK)
+ self.assertEqual(
+ tuple(loop.run_until_complete(output).split()), args_tuple)
+ finally:
+ if output is not None and not output.done():
+ output.cancel()
+ for f in files:
+ f.close()
+
+ self._run_test(test)
+
+ def testCat(self):
+ if not hasattr(asyncio, 'create_subprocess_exec'):
+ self.skipTest('create_subprocess_exec not implemented for python2')
+
+ stdin_data = b'hello world'
+ cat_binary = find_binary("cat")
+ self.assertNotEqual(cat_binary, None)
+ cat_binary = cat_binary.encode()
+
+ # Use os.pipe(), since this loop does not implement the
+ # ReadTransport necessary for subprocess.PIPE support.
+ stdout_pr, stdout_pw = os.pipe()
+ stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
+ stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+
+ stdin_pr, stdin_pw = os.pipe()
+ stdin_pr = os.fdopen(stdin_pr, 'rb', 0)
+ stdin_pw = os.fdopen(stdin_pw, 'wb', 0)
+
+ files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw]
+
+ def test(loop):
+ output = None
+ try:
+ proc = loop.run_until_complete(
+ asyncio.create_subprocess_exec(
+ cat_binary,
+ stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw))
+
+ # These belong exclusively to the subprocess now.
+ stdout_pw.close()
+ stdin_pr.close()
+
+ output = asyncio.ensure_future(
+ reader(stdout_pr, loop=loop), loop=loop)
+
+ with ForkExecutor(loop=loop) as executor:
+ writer = asyncio.ensure_future(loop.run_in_executor(
+ executor, stdin_pw.write, stdin_data), loop=loop)
+
+ # This belongs exclusively to the writer now.
+ stdin_pw.close()
+ loop.run_until_complete(writer)
+
+ self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
+ self.assertEqual(loop.run_until_complete(output), stdin_data)
+ finally:
+ if output is not None and not output.done():
+ output.cancel()
+ for f in files:
+ f.close()
+
+ self._run_test(test)
diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
index 5434cd942..1abc420e1 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -7,11 +7,15 @@ __all__ = (
)
try:
+ from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport
from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
except ImportError:
_AbstractChildWatcher = object
+ _BaseSubprocessTransport = object
+import functools
import os
+import subprocess
from portage.util._eventloop.global_event_loop import (
global_event_loop as _global_event_loop,
@@ -77,6 +81,100 @@ class _PortageEventLoop(events.AbstractEventLoop):
"""
return asyncio.Task(coro, loop=self)
+ def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
+ """
+ Run subprocesses asynchronously using the subprocess module.
+
+ @type protocol_factory: callable
+ @param protocol_factory: must instantiate a subclass of the
+ asyncio.SubprocessProtocol class
+ @type program: str or bytes
+ @param program: the program to execute
+ @type args: str or bytes
+ @param args: program's arguments
+ @type kwargs: varies
+ @param kwargs: subprocess.Popen parameters
+ @rtype: asyncio.Future
+ @return: Returns a pair of (transport, protocol), where transport
+ is an instance of BaseSubprocessTransport
+ """
+
+ # python2.7 does not allow arguments with defaults after *args
+ stdin = kwargs.pop('stdin', subprocess.PIPE)
+ stdout = kwargs.pop('stdout', subprocess.PIPE)
+ stderr = kwargs.pop('stderr', subprocess.PIPE)
+
+ if subprocess.PIPE in (stdin, stdout, stderr):
+ # Requires connect_read/write_pipe implementation, for example
+ # see asyncio.unix_events._UnixReadPipeTransport.
+ raise NotImplementedError()
+
+ universal_newlines = kwargs.pop('universal_newlines', False)
+ shell = kwargs.pop('shell', False)
+ bufsize = kwargs.pop('bufsize', 0)
+
+ if universal_newlines:
+ raise ValueError("universal_newlines must be False")
+ if shell:
+ raise ValueError("shell must be False")
+ if bufsize != 0:
+ raise ValueError("bufsize must be 0")
+ popen_args = (program,) + args
+ for arg in popen_args:
+ if not isinstance(arg, (str, bytes)):
+ raise TypeError("program arguments must be "
+ "a bytes or text string, not %s"
+ % type(arg).__name__)
+ result = self.create_future()
+ self._make_subprocess_transport(
+ result, protocol_factory(), popen_args, False, stdin, stdout, stderr,
+ bufsize, **kwargs)
+ return result
+
+ def _make_subprocess_transport(self, result, protocol, args, shell,
+ stdin, stdout, stderr, bufsize, extra=None, **kwargs):
+ waiter = self.create_future()
+ transp = _UnixSubprocessTransport(self,
+ protocol, args, shell, stdin, stdout, stderr, bufsize,
+ waiter=waiter, extra=extra,
+ **kwargs)
+
+ self._loop._asyncio_child_watcher.add_child_handler(
+ transp.get_pid(), self._child_watcher_callback, transp)
+
+ waiter.add_done_callback(functools.partial(
+ self._subprocess_transport_callback, transp, protocol, result))
+
+ def _subprocess_transport_callback(self, transp, protocol, result, waiter):
+ if waiter.exception() is None:
+ result.set_result((transp, protocol))
+ else:
+ transp.close()
+ wait_transp = asyncio.ensure_future(transp._wait(), loop=self)
+ wait_transp.add_done_callback(
+ functools.partial(self._subprocess_transport_failure,
+ result, waiter.exception()))
+
+ def _child_watcher_callback(self, pid, returncode, transp):
+ self.call_soon_threadsafe(transp._process_exited, returncode)
+
+ def _subprocess_transport_failure(self, result, exception, wait_transp):
+ result.set_exception(wait_transp.exception() or exception)
+
+
+class _UnixSubprocessTransport(_BaseSubprocessTransport):
+ """
+ This is identical to the standard library's private
+ asyncio.unix_events._UnixSubprocessTransport class, except that
+ subprocess.PIPE is not implemented for stdin, since that would
+ require connect_write_pipe support in the event loop. For example,
+ see the asyncio.unix_events._UnixWritePipeTransport class.
+ """
+ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+ self._proc = subprocess.Popen(
+ args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
+ universal_newlines=False, bufsize=bufsize, **kwargs)
+
class AbstractChildWatcher(_AbstractChildWatcher):
def add_child_handler(self, pid, callback, *args):
^ permalink raw reply related [flat|nested] 2+ messages in thread
* [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/
@ 2018-04-16 0:56 Zac Medico
0 siblings, 0 replies; 2+ messages in thread
From: Zac Medico @ 2018-04-16 0:56 UTC (permalink / raw
To: gentoo-commits
commit: 9a7b0a006e65f8683716d60574e4f19f8ffd603d
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat Apr 14 21:29:29 2018 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Apr 16 00:04:26 2018 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=9a7b0a00
Implement AbstractEventLoop.connect_read_pipe (bug 649588)
In python versions that support asyncio, this allows API consumers
to use subprocess.PIPE for asyncio.create_subprocess_exec() stdout
and stderr parameters.
Bug: https://bugs.gentoo.org/649588
.../util/futures/asyncio/test_subprocess_exec.py | 30 ++++
pym/portage/util/futures/unix_events.py | 157 ++++++++++++++++++++-
2 files changed, 184 insertions(+), 3 deletions(-)
diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index d30f48c43..94984fc93 100644
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -2,6 +2,7 @@
# Distributed under the terms of the GNU General Public License v2
import os
+import subprocess
from portage.process import find_binary
from portage.tests import TestCase
@@ -161,3 +162,32 @@ class SubprocessExecTestCase(TestCase):
f.close()
self._run_test(test)
+
+ def testReadTransport(self):
+ """
+ Test asyncio.create_subprocess_exec(stdout=subprocess.PIPE) which
+ requires an AbstractEventLoop.connect_read_pipe implementation
+ (and a ReadTransport implementation for it to return).
+ """
+ if not hasattr(asyncio, 'create_subprocess_exec'):
+ self.skipTest('create_subprocess_exec not implemented for python2')
+
+ args_tuple = (b'hello', b'world')
+ echo_binary = find_binary("echo")
+ self.assertNotEqual(echo_binary, None)
+ echo_binary = echo_binary.encode()
+
+ def test(loop):
+ with open(os.devnull, 'rb', 0) as devnull:
+ proc = loop.run_until_complete(
+ asyncio.create_subprocess_exec(
+ echo_binary, *args_tuple,
+ stdin=devnull,
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
+
+ self.assertEqual(
+ tuple(loop.run_until_complete(proc.stdout.read()).split()),
+ args_tuple)
+ self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
+
+ self._run_test(test)
diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
index d788c2bea..9d84ab6aa 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -9,12 +9,18 @@ __all__ = (
try:
from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport
from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
+ from asyncio.transports import ReadTransport as _ReadTransport
except ImportError:
_AbstractChildWatcher = object
_BaseSubprocessTransport = object
+ _ReadTransport = object
+import errno
+import fcntl
import functools
+import logging
import os
+import stat
import subprocess
from portage.util._eventloop.global_event_loop import (
@@ -82,6 +88,35 @@ class _PortageEventLoop(events.AbstractEventLoop):
"""
return asyncio.Task(coro, loop=self)
+ def connect_read_pipe(self, protocol_factory, pipe):
+ """
+ Register read pipe in event loop. Set the pipe to non-blocking mode.
+
+ @type protocol_factory: callable
+ @param protocol_factory: must instantiate object with Protocol interface
+ @type pipe: file
+ @param pipe: a pipe to read from
+ @rtype: asyncio.Future
+ @return: Return pair (transport, protocol), where transport supports the
+ ReadTransport interface.
+ """
+ protocol = protocol_factory()
+ result = self.create_future()
+ waiter = self.create_future()
+ transport = self._make_read_pipe_transport(pipe, protocol, waiter=waiter)
+
+ def waiter_callback(waiter):
+ try:
+ waiter.result()
+ except Exception as e:
+ transport.close()
+ result.set_exception(e)
+ else:
+ result.set_result((transport, protocol))
+
+ waiter.add_done_callback(waiter_callback)
+ return result
+
def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
"""
Run subprocesses asynchronously using the subprocess module.
@@ -105,9 +140,9 @@ class _PortageEventLoop(events.AbstractEventLoop):
stdout = kwargs.pop('stdout', subprocess.PIPE)
stderr = kwargs.pop('stderr', subprocess.PIPE)
- if subprocess.PIPE in (stdin, stdout, stderr):
- # Requires connect_read/write_pipe implementation, for example
- # see asyncio.unix_events._UnixReadPipeTransport.
+ if stdin == subprocess.PIPE:
+ # Requires connect_write_pipe implementation, for example
+ # see asyncio.unix_events._UnixWritePipeTransport.
raise NotImplementedError()
universal_newlines = kwargs.pop('universal_newlines', False)
@@ -132,6 +167,10 @@ class _PortageEventLoop(events.AbstractEventLoop):
bufsize, **kwargs)
return result
+ def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
+ extra=None):
+ return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
+
def _make_subprocess_transport(self, result, protocol, args, shell,
stdin, stdout, stderr, bufsize, extra=None, **kwargs):
waiter = self.create_future()
@@ -163,6 +202,118 @@ class _PortageEventLoop(events.AbstractEventLoop):
result.set_exception(wait_transp.exception() or exception)
+if hasattr(os, 'set_blocking'):
+ def _set_nonblocking(fd):
+ os.set_blocking(fd, False)
+else:
+ def _set_nonblocking(fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ flags = flags | os.O_NONBLOCK
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+
+
+class _UnixReadPipeTransport(_ReadTransport):
+ """
+ This is identical to the standard library's private
+ asyncio.unix_events._UnixReadPipeTransport class, except that it
+ only calls public AbstractEventLoop methods.
+ """
+
+ max_size = 256 * 1024 # max bytes we read in one event loop iteration
+
+ def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
+ super().__init__(extra)
+ self._extra['pipe'] = pipe
+ self._loop = loop
+ self._pipe = pipe
+ self._fileno = pipe.fileno()
+ self._protocol = protocol
+ self._closing = False
+
+ mode = os.fstat(self._fileno).st_mode
+ if not (stat.S_ISFIFO(mode) or
+ stat.S_ISSOCK(mode) or
+ stat.S_ISCHR(mode)):
+ self._pipe = None
+ self._fileno = None
+ self._protocol = None
+ raise ValueError("Pipe transport is for pipes/sockets only.")
+
+ _set_nonblocking(self._fileno)
+
+ self._loop.call_soon(self._protocol.connection_made, self)
+ # only start reading when connection_made() has been called
+ self._loop.call_soon(self._loop.add_reader,
+ self._fileno, self._read_ready)
+ if waiter is not None:
+ # only wake up the waiter when connection_made() has been called
+ self._loop.call_soon(
+ lambda: None if waiter.cancelled() else waiter.set_result(None))
+
+ def _read_ready(self):
+ try:
+ data = os.read(self._fileno, self.max_size)
+ except (BlockingIOError, InterruptedError):
+ pass
+ except OSError as exc:
+ self._fatal_error(exc, 'Fatal read error on pipe transport')
+ else:
+ if data:
+ self._protocol.data_received(data)
+ else:
+ self._closing = True
+ self._loop.remove_reader(self._fileno)
+ self._loop.call_soon(self._protocol.eof_received)
+ self._loop.call_soon(self._call_connection_lost, None)
+
+ def pause_reading(self):
+ self._loop.remove_reader(self._fileno)
+
+ def resume_reading(self):
+ self._loop.add_reader(self._fileno, self._read_ready)
+
+ def set_protocol(self, protocol):
+ self._protocol = protocol
+
+ def get_protocol(self):
+ return self._protocol
+
+ def is_closing(self):
+ return self._closing
+
+ def close(self):
+ if not self._closing:
+ self._close(None)
+
+ def _fatal_error(self, exc, message='Fatal error on pipe transport'):
+ # should be called by exception handler only
+ if (isinstance(exc, OSError) and exc.errno == errno.EIO):
+ if self._loop.get_debug():
+ logging.debug("%r: %s", self, message, exc_info=True)
+ else:
+ self._loop.call_exception_handler({
+ 'message': message,
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
+ self._close(exc)
+
+ def _close(self, exc):
+ self._closing = True
+ self._loop.remove_reader(self._fileno)
+ self._loop.call_soon(self._call_connection_lost, exc)
+
+ def _call_connection_lost(self, exc):
+ try:
+ self._protocol.connection_lost(exc)
+ finally:
+ self._pipe.close()
+ self._pipe = None
+ self._protocol = None
+ self._loop = None
+
+
class _UnixSubprocessTransport(_BaseSubprocessTransport):
"""
This is identical to the standard library's private
^ permalink raw reply related [flat|nested] 2+ messages in thread
end of thread, other threads:[~2018-04-16 0:56 UTC | newest]
Thread overview: 2+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2018-04-16 0:56 [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/ Zac Medico
-- strict thread matches above, loose matches on Subject: below --
2018-04-13 16:48 Zac Medico
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox