public inbox for gentoo-portage-dev@lists.gentoo.org
 help / color / mirror / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download: 
* [gentoo-portage-dev] [PATCH] Implement AbstractEventLoop.connect_write_pipe (bug 649588)
@ 2018-04-16  1:09 99% Zac Medico
  0 siblings, 0 replies; 1+ results
From: Zac Medico @ 2018-04-16  1:09 UTC (permalink / raw
  To: gentoo-portage-dev; +Cc: Zac Medico

In python versions that support asyncio, this allows API consumers
to use subprocess.PIPE for asyncio.create_subprocess_exec() stdin
parameters.

Bug: https://bugs.gentoo.org/649588
---
 .../util/futures/asyncio/test_subprocess_exec.py   |  34 +++
 pym/portage/util/futures/transports.py             |  90 +++++++
 pym/portage/util/futures/unix_events.py            | 259 ++++++++++++++++++++-
 3 files changed, 372 insertions(+), 11 deletions(-)
 create mode 100644 pym/portage/util/futures/transports.py

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index 94984fc93..8c8c395ca 100644
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -191,3 +191,37 @@ class SubprocessExecTestCase(TestCase):
 			self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
 
 		self._run_test(test)
+
+	def testWriteTransport(self):
+		"""
+		Test asyncio.create_subprocess_exec(stdin=subprocess.PIPE) which
+		requires an AbstractEventLoop.connect_write_pipe implementation
+		(and a WriteTransport implementation for it to return).
+		"""
+		if not hasattr(asyncio, 'create_subprocess_exec'):
+			self.skipTest('create_subprocess_exec not implemented for python2')
+
+		stdin_data = b'hello world'
+		cat_binary = find_binary("cat")
+		self.assertNotEqual(cat_binary, None)
+		cat_binary = cat_binary.encode()
+
+		def test(loop):
+			proc = loop.run_until_complete(
+				asyncio.create_subprocess_exec(
+				cat_binary,
+				stdin=subprocess.PIPE,
+				stdout=subprocess.PIPE, stderr=subprocess.STDOUT))
+
+			# This buffers data when necessary to avoid blocking.
+			proc.stdin.write(stdin_data)
+			# Any buffered data is written asynchronously after the
+			# close method is called.
+			proc.stdin.close()
+
+			self.assertEqual(
+				loop.run_until_complete(proc.stdout.read()),
+				stdin_data)
+			self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
+
+		self._run_test(test)
diff --git a/pym/portage/util/futures/transports.py b/pym/portage/util/futures/transports.py
new file mode 100644
index 000000000..60ea93073
--- /dev/null
+++ b/pym/portage/util/futures/transports.py
@@ -0,0 +1,90 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+try:
+	from asyncio.transports import Transport as _Transport
+except ImportError:
+	_Transport = object
+
+
+class _FlowControlMixin(_Transport):
+	"""
+	This is identical to the standard library's private
+	asyncio.transports._FlowControlMixin class.
+
+	All the logic for (write) flow control in a mix-in base class.
+
+	The subclass must implement get_write_buffer_size().  It must call
+	_maybe_pause_protocol() whenever the write buffer size increases,
+	and _maybe_resume_protocol() whenever it decreases.  It may also
+	override set_write_buffer_limits() (e.g. to specify different
+	defaults).
+
+	The subclass constructor must call super().__init__(extra).  This
+	will call set_write_buffer_limits().
+
+	The user may call set_write_buffer_limits() and
+	get_write_buffer_size(), and their protocol's pause_writing() and
+	resume_writing() may be called.
+	"""
+
+	def __init__(self, extra=None, loop=None):
+		super().__init__(extra)
+		assert loop is not None
+		self._loop = loop
+		self._protocol_paused = False
+		self._set_write_buffer_limits()
+
+	def _maybe_pause_protocol(self):
+		size = self.get_write_buffer_size()
+		if size <= self._high_water:
+			return
+		if not self._protocol_paused:
+			self._protocol_paused = True
+			try:
+				self._protocol.pause_writing()
+			except Exception as exc:
+				self._loop.call_exception_handler({
+					'message': 'protocol.pause_writing() failed',
+					'exception': exc,
+					'transport': self,
+					'protocol': self._protocol,
+				})
+
+	def _maybe_resume_protocol(self):
+		if (self._protocol_paused and
+			self.get_write_buffer_size() <= self._low_water):
+			self._protocol_paused = False
+			try:
+				self._protocol.resume_writing()
+			except Exception as exc:
+				self._loop.call_exception_handler({
+					'message': 'protocol.resume_writing() failed',
+					'exception': exc,
+					'transport': self,
+					'protocol': self._protocol,
+				})
+
+	def get_write_buffer_limits(self):
+		return (self._low_water, self._high_water)
+
+	def _set_write_buffer_limits(self, high=None, low=None):
+		if high is None:
+			if low is None:
+				high = 64*1024
+			else:
+				high = 4*low
+		if low is None:
+			low = high // 4
+		if not high >= low >= 0:
+			raise ValueError('high (%r) must be >= low (%r) must be >= 0' %
+							 (high, low))
+		self._high_water = high
+		self._low_water = low
+
+	def set_write_buffer_limits(self, high=None, low=None):
+		self._set_write_buffer_limits(high=high, low=low)
+		self._maybe_pause_protocol()
+
+	def get_write_buffer_size(self):
+		raise NotImplementedError
diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
index 9d84ab6aa..a1d7cac80 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -9,19 +9,25 @@ __all__ = (
 try:
 	from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport
 	from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
-	from asyncio.transports import ReadTransport as _ReadTransport
+	from asyncio.transports import (
+		ReadTransport as _ReadTransport,
+		WriteTransport as _WriteTransport,
+	)
 except ImportError:
 	_AbstractChildWatcher = object
 	_BaseSubprocessTransport = object
 	_ReadTransport = object
+	_WriteTransport = object
 
 import errno
 import fcntl
 import functools
 import logging
 import os
+import socket
 import stat
 import subprocess
+import sys
 
 from portage.util._eventloop.global_event_loop import (
 	global_event_loop as _global_event_loop,
@@ -30,7 +36,7 @@ from portage.util.futures import (
 	asyncio,
 	events,
 )
-from portage.util.futures.futures import Future
+from portage.util.futures.transports import _FlowControlMixin
 
 
 class _PortageEventLoop(events.AbstractEventLoop):
@@ -117,6 +123,35 @@ class _PortageEventLoop(events.AbstractEventLoop):
 		waiter.add_done_callback(waiter_callback)
 		return result
 
+	def connect_write_pipe(self, protocol_factory, pipe):
+		"""
+		Register write pipe in event loop. Set the pipe to non-blocking mode.
+
+		@type protocol_factory: callable
+		@param protocol_factory: must instantiate object with Protocol interface
+		@type pipe: file
+		@param pipe: a pipe to write to
+		@rtype: asyncio.Future
+		@return: Return pair (transport, protocol), where transport supports the
+			WriteTransport interface.
+		"""
+		protocol = protocol_factory()
+		result = self.create_future()
+		waiter = self.create_future()
+		transport = self._make_write_pipe_transport(pipe, protocol, waiter)
+
+		def waiter_callback(waiter):
+			try:
+				waiter.result()
+			except Exception as e:
+				transport.close()
+				result.set_exception(e)
+			else:
+				result.set_result((transport, protocol))
+
+		waiter.add_done_callback(waiter_callback)
+		return result
+
 	def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
 		"""
 		Run subprocesses asynchronously using the subprocess module.
@@ -140,11 +175,6 @@ class _PortageEventLoop(events.AbstractEventLoop):
 		stdout = kwargs.pop('stdout', subprocess.PIPE)
 		stderr = kwargs.pop('stderr', subprocess.PIPE)
 
-		if stdin == subprocess.PIPE:
-			# Requires connect_write_pipe implementation, for example
-			# see asyncio.unix_events._UnixWritePipeTransport.
-			raise NotImplementedError()
-
 		universal_newlines = kwargs.pop('universal_newlines', False)
 		shell = kwargs.pop('shell', False)
 		bufsize = kwargs.pop('bufsize', 0)
@@ -171,6 +201,10 @@ class _PortageEventLoop(events.AbstractEventLoop):
 								  extra=None):
 		return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)
 
+	def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
+								   extra=None):
+		return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)
+
 	def _make_subprocess_transport(self, result, protocol, args, shell,
 		stdin, stdout, stderr, bufsize, extra=None, **kwargs):
 		waiter = self.create_future()
@@ -314,18 +348,221 @@ class _UnixReadPipeTransport(_ReadTransport):
 			self._loop = None
 
 
+class _UnixWritePipeTransport(_FlowControlMixin, _WriteTransport):
+	"""
+	This is identical to the standard library's private
+	asyncio.unix_events._UnixWritePipeTransport class, except that it
+	only calls public AbstractEventLoop methods.
+	"""
+
+	def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
+		super().__init__(extra, loop)
+		self._extra['pipe'] = pipe
+		self._pipe = pipe
+		self._fileno = pipe.fileno()
+		self._protocol = protocol
+		self._buffer = bytearray()
+		self._conn_lost = 0
+		self._closing = False  # Set when close() or write_eof() called.
+
+		mode = os.fstat(self._fileno).st_mode
+		is_char = stat.S_ISCHR(mode)
+		is_fifo = stat.S_ISFIFO(mode)
+		is_socket = stat.S_ISSOCK(mode)
+		if not (is_char or is_fifo or is_socket):
+			self._pipe = None
+			self._fileno = None
+			self._protocol = None
+			raise ValueError("Pipe transport is only for "
+							 "pipes, sockets and character devices")
+
+		_set_nonblocking(self._fileno)
+		self._loop.call_soon(self._protocol.connection_made, self)
+
+		# On AIX, the reader trick (to be notified when the read end of the
+		# socket is closed) only works for sockets. On other platforms it
+		# works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
+		if is_socket or (is_fifo and not sys.platform.startswith("aix")):
+			# only start reading when connection_made() has been called
+			self._loop.call_soon(self._loop.add_reader,
+								 self._fileno, self._read_ready)
+
+		if waiter is not None:
+			# only wake up the waiter when connection_made() has been called
+			self._loop.call_soon(
+				lambda: None if waiter.cancelled() else waiter.set_result(None))
+
+	def get_write_buffer_size(self):
+		return len(self._buffer)
+
+	def _read_ready(self):
+		# Pipe was closed by peer.
+		if self._loop.get_debug():
+			logging.info("%r was closed by peer", self)
+		if self._buffer:
+			self._close(BrokenPipeError())
+		else:
+			self._close()
+
+	def write(self, data):
+		assert isinstance(data, (bytes, bytearray, memoryview)), repr(data)
+		if isinstance(data, bytearray):
+			data = memoryview(data)
+		if not data:
+			return
+
+		if self._conn_lost or self._closing:
+			self._conn_lost += 1
+			return
+
+		if not self._buffer:
+			# Attempt to send it right away first.
+			try:
+				n = os.write(self._fileno, data)
+			except (BlockingIOError, InterruptedError):
+				n = 0
+			except Exception as exc:
+				self._conn_lost += 1
+				self._fatal_error(exc, 'Fatal write error on pipe transport')
+				return
+			if n == len(data):
+				return
+			elif n > 0:
+				data = memoryview(data)[n:]
+			self._loop.add_writer(self._fileno, self._write_ready)
+
+		self._buffer += data
+		self._maybe_pause_protocol()
+
+	def _write_ready(self):
+		assert self._buffer, 'Data should not be empty'
+
+		try:
+			n = os.write(self._fileno, self._buffer)
+		except (BlockingIOError, InterruptedError):
+			pass
+		except Exception as exc:
+			self._buffer.clear()
+			self._conn_lost += 1
+			# Remove writer here, _fatal_error() doesn't it
+			# because _buffer is empty.
+			self._loop.remove_writer(self._fileno)
+			self._fatal_error(exc, 'Fatal write error on pipe transport')
+		else:
+			if n == len(self._buffer):
+				self._buffer.clear()
+				self._loop.remove_writer(self._fileno)
+				self._maybe_resume_protocol()  # May append to buffer.
+				if self._closing:
+					self._loop.remove_reader(self._fileno)
+					self._call_connection_lost(None)
+				return
+			elif n > 0:
+				del self._buffer[:n]
+
+	def can_write_eof(self):
+		return True
+
+	def write_eof(self):
+		if self._closing:
+			return
+		assert self._pipe
+		self._closing = True
+		if not self._buffer:
+			self._loop.remove_reader(self._fileno)
+			self._loop.call_soon(self._call_connection_lost, None)
+
+	def set_protocol(self, protocol):
+		self._protocol = protocol
+
+	def get_protocol(self):
+		return self._protocol
+
+	def is_closing(self):
+		return self._closing
+
+	def close(self):
+		if self._pipe is not None and not self._closing:
+			# write_eof is all what we needed to close the write pipe
+			self.write_eof()
+
+	def abort(self):
+		self._close(None)
+
+	def _fatal_error(self, exc, message='Fatal error on pipe transport'):
+		# should be called by exception handler only
+		if isinstance(exc,
+			(BrokenPipeError, ConnectionResetError, ConnectionAbortedError)):
+			if self._loop.get_debug():
+				logging.debug("%r: %s", self, message, exc_info=True)
+		else:
+			self._loop.call_exception_handler({
+				'message': message,
+				'exception': exc,
+				'transport': self,
+				'protocol': self._protocol,
+			})
+		self._close(exc)
+
+	def _close(self, exc=None):
+		self._closing = True
+		if self._buffer:
+			self._loop.remove_writer(self._fileno)
+		self._buffer.clear()
+		self._loop.remove_reader(self._fileno)
+		self._loop.call_soon(self._call_connection_lost, exc)
+
+	def _call_connection_lost(self, exc):
+		try:
+			self._protocol.connection_lost(exc)
+		finally:
+			self._pipe.close()
+			self._pipe = None
+			self._protocol = None
+			self._loop = None
+
+
+if hasattr(os, 'set_inheritable'):
+	# Python 3.4 and newer
+	_set_inheritable = os.set_inheritable
+else:
+	def _set_inheritable(fd, inheritable):
+		cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
+
+		old = fcntl.fcntl(fd, fcntl.F_GETFD)
+		if not inheritable:
+			fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
+		else:
+			fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
+
+
 class _UnixSubprocessTransport(_BaseSubprocessTransport):
 	"""
 	This is identical to the standard library's private
-	asyncio.unix_events._UnixSubprocessTransport class, except that
-	subprocess.PIPE is not implemented for stdin, since that would
-	require connect_write_pipe support in the event loop. For example,
-	see the asyncio.unix_events._UnixWritePipeTransport class.
+	asyncio.unix_events._UnixSubprocessTransport class, except that it
+	only calls public AbstractEventLoop methods.
 	"""
 	def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+		stdin_w = None
+		if stdin == subprocess.PIPE:
+			# Use a socket pair for stdin, since not all platforms
+			# support selecting read events on the write end of a
+			# socket (which we use in order to detect closing of the
+			# other end).  Notably this is needed on AIX, and works
+			# just fine on other platforms.
+			stdin, stdin_w = socket.socketpair()
+
+			# Mark the write end of the stdin pipe as non-inheritable,
+			# needed by close_fds=False on Python 3.3 and older
+			# (Python 3.4 implements the PEP 446, socketpair returns
+			# non-inheritable sockets)
+			_set_inheritable(stdin_w.fileno(), False)
 		self._proc = subprocess.Popen(
 			args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr,
 			universal_newlines=False, bufsize=bufsize, **kwargs)
+		if stdin_w is not None:
+			stdin.close()
+			self._proc.stdin = os.fdopen(stdin_w.detach(), 'wb', bufsize)
 
 
 class AbstractChildWatcher(_AbstractChildWatcher):
-- 
2.13.6



^ permalink raw reply related	[relevance 99%]

Results 1-1 of 1 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2018-04-16  1:09 99% [gentoo-portage-dev] [PATCH] Implement AbstractEventLoop.connect_write_pipe (bug 649588) Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox