From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from lists.gentoo.org (pigeon.gentoo.org [208.92.234.80]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by finch.gentoo.org (Postfix) with ESMTPS id 4A2E91382C5 for ; Sun, 7 Mar 2021 07:13:24 +0000 (UTC) Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id 89BF1E0841; Sun, 7 Mar 2021 07:13:23 +0000 (UTC) Received: from smtp.gentoo.org (woodpecker.gentoo.org [IPv6:2001:470:ea4a:1:5054:ff:fec7:86e4]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by pigeon.gentoo.org (Postfix) with ESMTPS id 61173E0841 for ; Sun, 7 Mar 2021 07:13:23 +0000 (UTC) Received: from oystercatcher.gentoo.org (unknown [IPv6:2a01:4f8:202:4333:225:90ff:fed9:fc84]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by smtp.gentoo.org (Postfix) with ESMTPS id BF8A63409ED for ; Sun, 7 Mar 2021 07:13:20 +0000 (UTC) Received: from localhost.localdomain (localhost [IPv6:::1]) by oystercatcher.gentoo.org (Postfix) with ESMTP id 455064A8 for ; Sun, 7 Mar 2021 07:13:19 +0000 (UTC) From: "Zac Medico" To: gentoo-commits@lists.gentoo.org Content-Transfer-Encoding: 8bit Content-type: text/plain; charset=UTF-8 Reply-To: gentoo-dev@lists.gentoo.org, "Zac Medico" Message-ID: <1615100504.ed685eb659fccf6e4031d12fa8a59c3829ef1155.zmedico@gentoo> Subject: [gentoo-commits] proj/portage:master commit in: lib/portage/util/futures/ X-VCS-Repository: proj/portage X-VCS-Files: lib/portage/util/futures/transports.py lib/portage/util/futures/unix_events.py X-VCS-Directories: lib/portage/util/futures/ X-VCS-Committer: zmedico X-VCS-Committer-Name: Zac Medico X-VCS-Revision: ed685eb659fccf6e4031d12fa8a59c3829ef1155 X-VCS-Branch: master Date: Sun, 7 Mar 2021 07:13:19 +0000 (UTC) Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-commits@lists.gentoo.org X-Auto-Response-Suppress: DR, RN, NRN, OOF, AutoReply X-Archives-Salt: cc15d5d4-0240-458c-a7b6-33c9743e1f48 X-Archives-Hash: a4c69711de9d4dadecc8450f0d934a1e commit: ed685eb659fccf6e4031d12fa8a59c3829ef1155 Author: Zac Medico gentoo org> AuthorDate: Sun Mar 7 06:56:36 2021 +0000 Commit: Zac Medico gentoo org> CommitDate: Sun Mar 7 07:01:44 2021 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=ed685eb6 Removed unused portage.util.futures.transports Signed-off-by: Zac Medico gentoo.org> lib/portage/util/futures/transports.py | 87 ------ lib/portage/util/futures/unix_events.py | 492 +------------------------------- 2 files changed, 2 insertions(+), 577 deletions(-) diff --git a/lib/portage/util/futures/transports.py b/lib/portage/util/futures/transports.py deleted file mode 100644 index 016ecbef8..000000000 --- a/lib/portage/util/futures/transports.py +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright 2018 Gentoo Foundation -# Distributed under the terms of the GNU General Public License v2 - -from asyncio.transports import Transport as _Transport - - -class _FlowControlMixin(_Transport): - """ - This is identical to the standard library's private - asyncio.transports._FlowControlMixin class. - - All the logic for (write) flow control in a mix-in base class. - - The subclass must implement get_write_buffer_size(). It must call - _maybe_pause_protocol() whenever the write buffer size increases, - and _maybe_resume_protocol() whenever it decreases. It may also - override set_write_buffer_limits() (e.g. to specify different - defaults). - - The subclass constructor must call super().__init__(extra). This - will call set_write_buffer_limits(). - - The user may call set_write_buffer_limits() and - get_write_buffer_size(), and their protocol's pause_writing() and - resume_writing() may be called. - """ - - def __init__(self, extra=None, loop=None): - super().__init__(extra) - assert loop is not None - self._loop = loop - self._protocol_paused = False - self._set_write_buffer_limits() - - def _maybe_pause_protocol(self): - size = self.get_write_buffer_size() - if size <= self._high_water: - return - if not self._protocol_paused: - self._protocol_paused = True - try: - self._protocol.pause_writing() - except Exception as exc: - self._loop.call_exception_handler({ - 'message': 'protocol.pause_writing() failed', - 'exception': exc, - 'transport': self, - 'protocol': self._protocol, - }) - - def _maybe_resume_protocol(self): - if (self._protocol_paused and - self.get_write_buffer_size() <= self._low_water): - self._protocol_paused = False - try: - self._protocol.resume_writing() - except Exception as exc: - self._loop.call_exception_handler({ - 'message': 'protocol.resume_writing() failed', - 'exception': exc, - 'transport': self, - 'protocol': self._protocol, - }) - - def get_write_buffer_limits(self): - return (self._low_water, self._high_water) - - def _set_write_buffer_limits(self, high=None, low=None): - if high is None: - if low is None: - high = 64*1024 - else: - high = 4*low - if low is None: - low = high // 4 - if not high >= low >= 0: - raise ValueError('high (%r) must be >= low (%r) must be >= 0' % - (high, low)) - self._high_water = high - self._low_water = low - - def set_write_buffer_limits(self, high=None, low=None): - self._set_write_buffer_limits(high=high, low=low) - self._maybe_pause_protocol() - - def get_write_buffer_size(self): - raise NotImplementedError diff --git a/lib/portage/util/futures/unix_events.py b/lib/portage/util/futures/unix_events.py index 16a9e12b7..9d5445943 100644 --- a/lib/portage/util/futures/unix_events.py +++ b/lib/portage/util/futures/unix_events.py @@ -1,4 +1,4 @@ -# Copyright 2018 Gentoo Foundation +# Copyright 2018-2021 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 __all__ = ( @@ -7,32 +7,15 @@ __all__ = ( ) import asyncio as _real_asyncio -from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher -from asyncio.transports import ( - ReadTransport as _ReadTransport, - WriteTransport as _WriteTransport, -) -import errno import fcntl -import functools -import logging import os -import socket -import stat -import subprocess -import sys from portage.util._eventloop.global_event_loop import ( global_event_loop as _global_event_loop, ) -from portage.util.futures import ( - asyncio, - events, -) - -from portage.util.futures.transports import _FlowControlMixin +from portage.util.futures import events class _PortageEventLoop(events.AbstractEventLoop): @@ -89,158 +72,6 @@ class _PortageEventLoop(events.AbstractEventLoop): """ return self - def create_task(self, coro): - """ - Schedule a coroutine object. - - @type coro: coroutine - @param coro: a coroutine to schedule - @rtype: asyncio.Task - @return: a task object - """ - return asyncio.Task(coro, loop=self) - - def connect_read_pipe(self, protocol_factory, pipe): - """ - Register read pipe in event loop. Set the pipe to non-blocking mode. - - @type protocol_factory: callable - @param protocol_factory: must instantiate object with Protocol interface - @type pipe: file - @param pipe: a pipe to read from - @rtype: asyncio.Future - @return: Return pair (transport, protocol), where transport supports the - ReadTransport interface. - """ - protocol = protocol_factory() - result = self.create_future() - waiter = self.create_future() - transport = self._make_read_pipe_transport(pipe, protocol, waiter=waiter) - - def waiter_callback(waiter): - try: - waiter.result() - except Exception as e: - transport.close() - result.set_exception(e) - else: - result.set_result((transport, protocol)) - - waiter.add_done_callback(waiter_callback) - return result - - def connect_write_pipe(self, protocol_factory, pipe): - """ - Register write pipe in event loop. Set the pipe to non-blocking mode. - - @type protocol_factory: callable - @param protocol_factory: must instantiate object with Protocol interface - @type pipe: file - @param pipe: a pipe to write to - @rtype: asyncio.Future - @return: Return pair (transport, protocol), where transport supports the - WriteTransport interface. - """ - protocol = protocol_factory() - result = self.create_future() - waiter = self.create_future() - transport = self._make_write_pipe_transport(pipe, protocol, waiter) - - def waiter_callback(waiter): - try: - waiter.result() - except Exception as e: - transport.close() - result.set_exception(e) - else: - result.set_result((transport, protocol)) - - waiter.add_done_callback(waiter_callback) - return result - - def subprocess_exec(self, protocol_factory, program, *args, **kwargs): - """ - Run subprocesses asynchronously using the subprocess module. - - @type protocol_factory: callable - @param protocol_factory: must instantiate a subclass of the - asyncio.SubprocessProtocol class - @type program: str or bytes - @param program: the program to execute - @type args: str or bytes - @param args: program's arguments - @type kwargs: varies - @param kwargs: subprocess.Popen parameters - @rtype: asyncio.Future - @return: Returns a pair of (transport, protocol), where transport - is an instance of BaseSubprocessTransport - """ - - # python2.7 does not allow arguments with defaults after *args - stdin = kwargs.pop('stdin', subprocess.PIPE) - stdout = kwargs.pop('stdout', subprocess.PIPE) - stderr = kwargs.pop('stderr', subprocess.PIPE) - - universal_newlines = kwargs.pop('universal_newlines', False) - shell = kwargs.pop('shell', False) - bufsize = kwargs.pop('bufsize', 0) - - if universal_newlines: - raise ValueError("universal_newlines must be False") - if shell: - raise ValueError("shell must be False") - if bufsize != 0: - raise ValueError("bufsize must be 0") - popen_args = (program,) + args - for arg in popen_args: - if not isinstance(arg, (str, bytes)): - raise TypeError("program arguments must be " - "a bytes or text string, not %s" - % type(arg).__name__) - result = self.create_future() - self._make_subprocess_transport( - result, protocol_factory(), popen_args, False, stdin, stdout, stderr, - bufsize, **kwargs) - return result - - def _make_read_pipe_transport(self, pipe, protocol, waiter=None, - extra=None): - return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) - - def _make_write_pipe_transport(self, pipe, protocol, waiter=None, - extra=None): - return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) - - def _make_subprocess_transport(self, result, protocol, args, shell, - stdin, stdout, stderr, bufsize, extra=None, **kwargs): - waiter = self.create_future() - transp = _UnixSubprocessTransport(self, - protocol, args, shell, stdin, stdout, stderr, bufsize, - waiter=waiter, extra=extra, - **kwargs) - - self._loop._asyncio_child_watcher.add_child_handler( - transp.get_pid(), self._child_watcher_callback, transp) - - waiter.add_done_callback(functools.partial( - self._subprocess_transport_callback, transp, protocol, result)) - - def _subprocess_transport_callback(self, transp, protocol, result, waiter): - if waiter.exception() is None: - result.set_result((transp, protocol)) - else: - transp.close() - wait_transp = asyncio.ensure_future(transp._wait(), loop=self) - wait_transp.add_done_callback( - functools.partial(self._subprocess_transport_failure, - result, waiter.exception())) - - def _child_watcher_callback(self, pid, returncode, transp): - self.call_soon_threadsafe(transp._process_exited, returncode) - - def _subprocess_transport_failure(self, result, exception, wait_transp): - result.set_exception(wait_transp.exception() or exception) - if hasattr(os, 'set_blocking'): def _set_nonblocking(fd): @@ -252,325 +83,6 @@ else: fcntl.fcntl(fd, fcntl.F_SETFL, flags) -class _UnixReadPipeTransport(_ReadTransport): - """ - This is identical to the standard library's private - asyncio.unix_events._UnixReadPipeTransport class, except that it - only calls public AbstractEventLoop methods. - """ - - max_size = 256 * 1024 # max bytes we read in one event loop iteration - - def __init__(self, loop, pipe, protocol, waiter=None, extra=None): - super().__init__(extra) - self._extra['pipe'] = pipe - self._loop = loop - self._pipe = pipe - self._fileno = pipe.fileno() - self._protocol = protocol - self._closing = False - - mode = os.fstat(self._fileno).st_mode - if not (stat.S_ISFIFO(mode) or - stat.S_ISSOCK(mode) or - stat.S_ISCHR(mode)): - self._pipe = None - self._fileno = None - self._protocol = None - raise ValueError("Pipe transport is for pipes/sockets only.") - - _set_nonblocking(self._fileno) - - self._loop.call_soon(self._protocol.connection_made, self) - # only start reading when connection_made() has been called - self._loop.call_soon(self._loop.add_reader, - self._fileno, self._read_ready) - if waiter is not None: - # only wake up the waiter when connection_made() has been called - self._loop.call_soon( - lambda: None if waiter.cancelled() else waiter.set_result(None)) - - def _read_ready(self): - try: - data = os.read(self._fileno, self.max_size) - except (BlockingIOError, InterruptedError): - pass - except OSError as exc: - self._fatal_error(exc, 'Fatal read error on pipe transport') - else: - if data: - self._protocol.data_received(data) - else: - self._closing = True - self._loop.remove_reader(self._fileno) - self._loop.call_soon(self._protocol.eof_received) - self._loop.call_soon(self._call_connection_lost, None) - - def pause_reading(self): - self._loop.remove_reader(self._fileno) - - def resume_reading(self): - self._loop.add_reader(self._fileno, self._read_ready) - - def set_protocol(self, protocol): - self._protocol = protocol - - def get_protocol(self): - return self._protocol - - def is_closing(self): - return self._closing - - def close(self): - if not self._closing: - self._close(None) - - def _fatal_error(self, exc, message='Fatal error on pipe transport'): - # should be called by exception handler only - if (isinstance(exc, OSError) and exc.errno == errno.EIO): - if self._loop.get_debug(): - logging.debug("%r: %s", self, message, exc_info=True) - else: - self._loop.call_exception_handler({ - 'message': message, - 'exception': exc, - 'transport': self, - 'protocol': self._protocol, - }) - self._close(exc) - - def _close(self, exc): - self._closing = True - self._loop.remove_reader(self._fileno) - self._loop.call_soon(self._call_connection_lost, exc) - - def _call_connection_lost(self, exc): - try: - self._protocol.connection_lost(exc) - finally: - self._pipe.close() - self._pipe = None - self._protocol = None - self._loop = None - - -class _UnixWritePipeTransport(_FlowControlMixin, _WriteTransport): - """ - This is identical to the standard library's private - asyncio.unix_events._UnixWritePipeTransport class, except that it - only calls public AbstractEventLoop methods. - """ - - def __init__(self, loop, pipe, protocol, waiter=None, extra=None): - super().__init__(extra, loop) - self._extra['pipe'] = pipe - self._pipe = pipe - self._fileno = pipe.fileno() - self._protocol = protocol - self._buffer = bytearray() - self._conn_lost = 0 - self._closing = False # Set when close() or write_eof() called. - - mode = os.fstat(self._fileno).st_mode - is_char = stat.S_ISCHR(mode) - is_fifo = stat.S_ISFIFO(mode) - is_socket = stat.S_ISSOCK(mode) - if not (is_char or is_fifo or is_socket): - self._pipe = None - self._fileno = None - self._protocol = None - raise ValueError("Pipe transport is only for " - "pipes, sockets and character devices") - - _set_nonblocking(self._fileno) - self._loop.call_soon(self._protocol.connection_made, self) - - # On AIX, the reader trick (to be notified when the read end of the - # socket is closed) only works for sockets. On other platforms it - # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) - if is_socket or (is_fifo and not sys.platform.startswith("aix")): - # only start reading when connection_made() has been called - self._loop.call_soon(self._loop.add_reader, - self._fileno, self._read_ready) - - if waiter is not None: - # only wake up the waiter when connection_made() has been called - self._loop.call_soon( - lambda: None if waiter.cancelled() else waiter.set_result(None)) - - def get_write_buffer_size(self): - return len(self._buffer) - - def _read_ready(self): - # Pipe was closed by peer. - if self._loop.get_debug(): - logging.info("%r was closed by peer", self) - if self._buffer: - self._close(BrokenPipeError()) - else: - self._close() - - def write(self, data): - assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) - if isinstance(data, bytearray): - data = memoryview(data) - if not data: - return - - if self._conn_lost or self._closing: - self._conn_lost += 1 - return - - if not self._buffer: - # Attempt to send it right away first. - try: - n = os.write(self._fileno, data) - except (BlockingIOError, InterruptedError): - n = 0 - except Exception as exc: - self._conn_lost += 1 - self._fatal_error(exc, 'Fatal write error on pipe transport') - return - if n == len(data): - return - if n > 0: - data = memoryview(data)[n:] - self._loop.add_writer(self._fileno, self._write_ready) - - self._buffer += data - self._maybe_pause_protocol() - - def _write_ready(self): - assert self._buffer, 'Data should not be empty' - - try: - n = os.write(self._fileno, self._buffer) - except (BlockingIOError, InterruptedError): - pass - except Exception as exc: - self._buffer.clear() - self._conn_lost += 1 - # Remove writer here, _fatal_error() doesn't it - # because _buffer is empty. - self._loop.remove_writer(self._fileno) - self._fatal_error(exc, 'Fatal write error on pipe transport') - else: - if n == len(self._buffer): - self._buffer.clear() - self._loop.remove_writer(self._fileno) - self._maybe_resume_protocol() # May append to buffer. - if self._closing: - self._loop.remove_reader(self._fileno) - self._call_connection_lost(None) - return - if n > 0: - del self._buffer[:n] - - def can_write_eof(self): - return True - - def write_eof(self): - if self._closing: - return - assert self._pipe - self._closing = True - if not self._buffer: - self._loop.remove_reader(self._fileno) - self._loop.call_soon(self._call_connection_lost, None) - - def set_protocol(self, protocol): - self._protocol = protocol - - def get_protocol(self): - return self._protocol - - def is_closing(self): - return self._closing - - def close(self): - if self._pipe is not None and not self._closing: - # write_eof is all what we needed to close the write pipe - self.write_eof() - - def abort(self): - self._close(None) - - def _fatal_error(self, exc, message='Fatal error on pipe transport'): - # should be called by exception handler only - if isinstance(exc, - (BrokenPipeError, ConnectionResetError, ConnectionAbortedError)): - if self._loop.get_debug(): - logging.debug("%r: %s", self, message, exc_info=True) - else: - self._loop.call_exception_handler({ - 'message': message, - 'exception': exc, - 'transport': self, - 'protocol': self._protocol, - }) - self._close(exc) - - def _close(self, exc=None): - self._closing = True - if self._buffer: - self._loop.remove_writer(self._fileno) - self._buffer.clear() - self._loop.remove_reader(self._fileno) - self._loop.call_soon(self._call_connection_lost, exc) - - def _call_connection_lost(self, exc): - try: - self._protocol.connection_lost(exc) - finally: - self._pipe.close() - self._pipe = None - self._protocol = None - self._loop = None - - -if hasattr(os, 'set_inheritable'): - # Python 3.4 and newer - _set_inheritable = os.set_inheritable -else: - def _set_inheritable(fd, inheritable): - cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1) - - old = fcntl.fcntl(fd, fcntl.F_GETFD) - if not inheritable: - fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag) - else: - fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag) - - -class _UnixSubprocessTransport(_BaseSubprocessTransport): - """ - This is identical to the standard library's private - asyncio.unix_events._UnixSubprocessTransport class, except that it - only calls public AbstractEventLoop methods. - """ - def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): - stdin_w = None - if stdin == subprocess.PIPE: - # Use a socket pair for stdin, since not all platforms - # support selecting read events on the write end of a - # socket (which we use in order to detect closing of the - # other end). Notably this is needed on AIX, and works - # just fine on other platforms. - stdin, stdin_w = socket.socketpair() - - # Mark the write end of the stdin pipe as non-inheritable, - # needed by close_fds=False on Python 3.3 and older - # (Python 3.4 implements the PEP 446, socketpair returns - # non-inheritable sockets) - _set_inheritable(stdin_w.fileno(), False) - self._proc = subprocess.Popen( - args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, - universal_newlines=False, bufsize=bufsize, **kwargs) - if stdin_w is not None: - stdin.close() - self._proc.stdin = os.fdopen(stdin_w.detach(), 'wb', bufsize) - - class AbstractChildWatcher(_AbstractChildWatcher): def add_child_handler(self, pid, callback, *args): raise NotImplementedError()