* [gentoo-portage-dev] [PATCH] Add minimal asyncio.AbstractEventLoop implementation (bug 649588)
@ 2018-04-09 3:34 99% Zac Medico
0 siblings, 0 replies; 1+ results
From: Zac Medico @ 2018-04-09 3:34 UTC (permalink / raw
To: gentoo-portage-dev; +Cc: Zac Medico
This provides minimal interoperability with existing asyncio code,
by adding a portage.util.futures.unix_events.DefaultEventLoopPolicy
class that makes asyncio use portage's internal event loop when an
instance is passed into asyncio.set_event_loop_policy(). The
get_event_loop() method of this policy returns an instance of a
_PortageEventLoop class that wraps portage's internal event loop and
implements asyncio's AbstractEventLoop interface.
The portage.util.futures.asyncio module refers to the real
asyncio module when available, and otherwise falls back to a
minimal implementation that works with python2.7. The included
EventLoopInForkTestCase demonstrates usage, and works with all
supported versions of python, include python2.7.
Bug: https://bugs.gentoo.org/649588
---
pym/portage/tests/util/futures/asyncio/__init__.py | 0
pym/portage/tests/util/futures/asyncio/__test__.py | 0
.../futures/asyncio/test_event_loop_in_fork.py | 62 +++++++
pym/portage/util/_eventloop/EventLoop.py | 11 +-
pym/portage/util/futures/__init__.py | 9 +
pym/portage/util/futures/_asyncio.py | 116 +++++++++++++
pym/portage/util/futures/events.py | 191 +++++++++++++++++++++
pym/portage/util/futures/futures.py | 7 +-
pym/portage/util/futures/unix_events.py | 91 ++++++++++
9 files changed, 479 insertions(+), 8 deletions(-)
create mode 100644 pym/portage/tests/util/futures/asyncio/__init__.py
create mode 100644 pym/portage/tests/util/futures/asyncio/__test__.py
create mode 100644 pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
create mode 100644 pym/portage/util/futures/_asyncio.py
create mode 100644 pym/portage/util/futures/events.py
create mode 100644 pym/portage/util/futures/unix_events.py
diff --git a/pym/portage/tests/util/futures/asyncio/__init__.py b/pym/portage/tests/util/futures/asyncio/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/pym/portage/tests/util/futures/asyncio/__test__.py b/pym/portage/tests/util/futures/asyncio/__test__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
new file mode 100644
index 000000000..1ef46229b
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
@@ -0,0 +1,62 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import multiprocessing
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+
+
+def fork_main(parent_conn, child_conn):
+ parent_conn.close()
+ loop = asyncio.get_event_loop()
+ # This fails with python's default event loop policy,
+ # see https://bugs.python.org/issue22087.
+ loop.run_until_complete(asyncio.sleep(0.1))
+
+
+def async_main(loop=None):
+ loop = loop or asyncio.get_event_loop()
+ future = loop.create_future()
+
+ # Since python2.7 does not support Process.sentinel, use Pipe to
+ # monitor for process exit.
+ parent_conn, child_conn = multiprocessing.Pipe()
+
+ def eof_callback():
+ loop.remove_reader(parent_conn.fileno())
+ parent_conn.close()
+ future.set_result(None)
+
+ loop.add_reader(parent_conn.fileno(), eof_callback)
+ proc = multiprocessing.Process(target=fork_main, args=(parent_conn, child_conn))
+ proc.start()
+ child_conn.close()
+
+ return future
+
+
+class EventLoopInForkTestCase(TestCase):
+ """
+ The default asyncio event loop policy does not support loops
+ running in forks, see https://bugs.python.org/issue22087.
+ Portage's DefaultEventLoopPolicy supports forks.
+ """
+
+ def testEventLoopInForkTestCase(self):
+ initial_policy = asyncio.get_event_loop_policy()
+ if not isinstance(initial_policy, DefaultEventLoopPolicy):
+ asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+ try:
+ loop = asyncio.get_event_loop()
+ exit_future = loop.create_future()
+ def trigger_exit(*args):
+ exit_future.set_result(True)
+
+ def start_async_main():
+ async_main(loop=loop).add_done_callback(trigger_exit)
+
+ loop.call_soon(start_async_main)
+ loop.run_until_complete(exit_future)
+ finally:
+ asyncio.set_event_loop_policy(initial_policy)
diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
index 72eb407fc..d53a76ba1 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -23,8 +23,9 @@ except ImportError:
import portage
portage.proxy.lazyimport.lazyimport(globals(),
- 'portage.util.futures.futures:_EventLoopFuture',
+ 'portage.util.futures.futures:Future',
'portage.util.futures.executor.fork:ForkExecutor',
+ 'portage.util.futures.unix_events:_PortageEventLoop',
)
from portage import OrderedDict
@@ -188,15 +189,13 @@ class EventLoop(object):
self._sigchld_write = None
self._sigchld_src_id = None
self._pid = os.getpid()
+ self._asyncio_wrapper = _PortageEventLoop(loop=self)
def create_future(self):
"""
- Create a Future object attached to the loop. This returns
- an instance of _EventLoopFuture, because EventLoop is currently
- missing some of the asyncio.AbstractEventLoop methods that
- asyncio.Future requires.
+ Create a Future object attached to the loop.
"""
- return _EventLoopFuture(loop=self)
+ return Future(loop=self._asyncio_wrapper)
def _new_source_id(self):
"""
diff --git a/pym/portage/util/futures/__init__.py b/pym/portage/util/futures/__init__.py
index e69de29bb..789080c85 100644
--- a/pym/portage/util/futures/__init__.py
+++ b/pym/portage/util/futures/__init__.py
@@ -0,0 +1,9 @@
+
+__all__ = (
+ 'asyncio',
+)
+
+try:
+ import asyncio
+except ImportError:
+ from portage.util.futures import _asyncio as asyncio
diff --git a/pym/portage/util/futures/_asyncio.py b/pym/portage/util/futures/_asyncio.py
new file mode 100644
index 000000000..6874e133f
--- /dev/null
+++ b/pym/portage/util/futures/_asyncio.py
@@ -0,0 +1,116 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+ 'ensure_future',
+ 'get_event_loop',
+ 'get_event_loop_policy',
+ 'set_event_loop_policy',
+ 'sleep',
+ 'Task',
+)
+
+import functools
+
+try:
+ import threading
+except ImportError:
+ import dummy_threading as threading
+
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+ 'portage.util.futures.unix_events:DefaultEventLoopPolicy',
+)
+from portage.util.futures.futures import Future
+
+_lock = threading.Lock()
+_policy = None
+
+
+def get_event_loop_policy():
+ """
+ Get the current event loop policy.
+
+ @rtype: asyncio.AbstractEventLoopPolicy (or compatible)
+ @return: the current event loop policy
+ """
+ global _lock, _policy
+ with _lock:
+ if _policy is None:
+ _policy = DefaultEventLoopPolicy()
+ return _policy
+
+
+def set_event_loop_policy(policy):
+ """
+ Set the current event loop policy. If policy is None, the default
+ policy is restored.
+
+ @type policy: asyncio.AbstractEventLoopPolicy or None
+ @param policy: new event loop policy
+ """
+ global _lock, _policy
+ with _lock:
+ _policy = policy or DefaultEventLoopPolicy()
+
+
+def get_event_loop():
+ """
+ Equivalent to calling get_event_loop_policy().get_event_loop().
+
+ @rtype: asyncio.AbstractEventLoop (or compatible)
+ @return: the event loop for the current context
+ """
+ return get_event_loop_policy().get_event_loop()
+
+
+class Task(Future):
+ """
+ Schedule the execution of a coroutine: wrap it in a future. A task
+ is a subclass of Future.
+ """
+ def __init__(self, coro, loop=None):
+ raise NotImplementedError
+
+
+def ensure_future(coro_or_future, loop=None):
+ """
+ Wrap a coroutine or an awaitable in a future.
+
+ If the argument is a Future, it is returned directly.
+
+ @type coro_or_future: coroutine or Future
+ @param coro_or_future: coroutine or future to wrap
+ @type loop: asyncio.AbstractEventLoop (or compatible)
+ @param loop: event loop
+ @rtype: asyncio.Future (or compatible)
+ @return: an instance of Future
+ """
+ if isinstance(coro_or_future, Future):
+ return coro_or_future
+ raise NotImplementedError
+
+
+def sleep(delay, result=None, loop=None):
+ """
+ Create a future that completes after a given time (in seconds). If
+ result is provided, it is produced to the caller when the future
+ completes.
+
+ @type delay: int or float
+ @param delay: delay seconds
+ @type result: object
+ @param result: result of the future
+ @type loop: asyncio.AbstractEventLoop (or compatible)
+ @param loop: event loop
+ @rtype: asyncio.Future (or compatible)
+ @return: an instance of Future
+ """
+ loop = loop or get_event_loop()
+ future = loop.create_future()
+ handle = loop.call_later(delay, functools.partial(future.set_result, result))
+ def cancel_callback(future):
+ if future.cancelled():
+ handle.cancel()
+ future.add_done_callback(cancel_callback)
+ return future
diff --git a/pym/portage/util/futures/events.py b/pym/portage/util/futures/events.py
new file mode 100644
index 000000000..b772bc242
--- /dev/null
+++ b/pym/portage/util/futures/events.py
@@ -0,0 +1,191 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+ 'AbstractEventLoopPolicy',
+ 'AbstractEventLoop',
+)
+
+import socket
+import subprocess
+
+try:
+ from asyncio.events import (
+ AbstractEventLoop as _AbstractEventLoop,
+ AbstractEventLoopPolicy as _AbstractEventLoopPolicy,
+ )
+except ImportError:
+ _AbstractEventLoop = object
+ _AbstractEventLoopPolicy = object
+
+
+class AbstractEventLoopPolicy(_AbstractEventLoopPolicy):
+ """Abstract policy for accessing the event loop."""
+
+ def get_event_loop(self):
+ raise NotImplementedError
+
+ def set_event_loop(self, loop):
+ raise NotImplementedError
+
+ def new_event_loop(self):
+ raise NotImplementedError
+
+ def get_child_watcher(self):
+ raise NotImplementedError
+
+ def set_child_watcher(self, watcher):
+ raise NotImplementedError
+
+
+class AbstractEventLoop(_AbstractEventLoop):
+ """Abstract event loop."""
+
+ def run_forever(self):
+ raise NotImplementedError
+
+ def run_until_complete(self, future):
+ raise NotImplementedError
+
+ def stop(self):
+ raise NotImplementedError
+
+ def is_running(self):
+ raise NotImplementedError
+
+ def is_closed(self):
+ raise NotImplementedError
+
+ def close(self):
+ raise NotImplementedError
+
+ def shutdown_asyncgens(self):
+ raise NotImplementedError
+
+ def _timer_handle_cancelled(self, handle):
+ raise NotImplementedError
+
+ def call_soon(self, callback, *args):
+ return self.call_later(0, callback, *args)
+
+ def call_later(self, delay, callback, *args):
+ raise NotImplementedError
+
+ def call_at(self, when, callback, *args):
+ raise NotImplementedError
+
+ def time(self):
+ raise NotImplementedError
+
+ def create_future(self):
+ raise NotImplementedError
+
+ def create_task(self, coro):
+ raise NotImplementedError
+
+ def call_soon_threadsafe(self, callback, *args):
+ raise NotImplementedError
+
+ def run_in_executor(self, executor, func, *args):
+ raise NotImplementedError
+
+ def set_default_executor(self, executor):
+ raise NotImplementedError
+
+ def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
+ raise NotImplementedError
+
+ def getnameinfo(self, sockaddr, flags=0):
+ raise NotImplementedError
+
+ def create_connection(self, protocol_factory, host=None, port=None,
+ ssl=None, family=0, proto=0, flags=0, sock=None,
+ local_addr=None, server_hostname=None):
+ raise NotImplementedError
+
+ def create_server(self, protocol_factory, host=None, port=None,
+ family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
+ sock=None, backlog=100, ssl=None, reuse_address=None,
+ reuse_port=None):
+ raise NotImplementedError
+
+ def create_unix_connection(self, protocol_factory, path,
+ ssl=None, sock=None,
+ server_hostname=None):
+ raise NotImplementedError
+
+ def create_unix_server(self, protocol_factory, path,
+ sock=None, backlog=100, ssl=None):
+ raise NotImplementedError
+
+ def create_datagram_endpoint(self, protocol_factory,
+ local_addr=None, remote_addr=None,
+ family=0, proto=0, flags=0,
+ reuse_address=None, reuse_port=None,
+ allow_broadcast=None, sock=None):
+ raise NotImplementedError
+
+ def connect_read_pipe(self, protocol_factory, pipe):
+ raise NotImplementedError
+
+ def connect_write_pipe(self, protocol_factory, pipe):
+ raise NotImplementedError
+
+ def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ **kwargs):
+ raise NotImplementedError
+
+ def subprocess_exec(self, protocol_factory, *args, **kwargs):
+ for k in ('stdin', 'stdout', 'stderr'):
+ kwargs.setdefault(k, subprocess.PIPE)
+ raise NotImplementedError
+
+ def add_writer(self, fd, callback, *args):
+ raise NotImplementedError
+
+ def remove_writer(self, fd):
+ raise NotImplementedError
+
+ def sock_recv(self, sock, nbytes):
+ raise NotImplementedError
+
+ def sock_sendall(self, sock, data):
+ raise NotImplementedError
+
+ def sock_connect(self, sock, address):
+ raise NotImplementedError
+
+ def sock_accept(self, sock):
+ raise NotImplementedError
+
+ def add_signal_handler(self, sig, callback, *args):
+ raise NotImplementedError
+
+ def remove_signal_handler(self, sig):
+ raise NotImplementedError
+
+ def set_task_factory(self, factory):
+ raise NotImplementedError
+
+ def get_task_factory(self):
+ raise NotImplementedError
+
+ def get_exception_handler(self):
+ raise NotImplementedError
+
+ def set_exception_handler(self, handler):
+ raise NotImplementedError
+
+ def default_exception_handler(self, context):
+ raise NotImplementedError
+
+ def call_exception_handler(self, context):
+ raise NotImplementedError
+
+ def get_debug(self):
+ raise NotImplementedError
+
+ def set_debug(self, enabled):
+ raise NotImplementedError
+
diff --git a/pym/portage/util/futures/futures.py b/pym/portage/util/futures/futures.py
index cd56a27eb..dc4e0a7d7 100644
--- a/pym/portage/util/futures/futures.py
+++ b/pym/portage/util/futures/futures.py
@@ -41,7 +41,10 @@ except ImportError:
Future = None
-from portage.util._eventloop.global_event_loop import global_event_loop
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+ 'portage.util._eventloop.global_event_loop:global_event_loop@_global_event_loop',
+)
_PENDING = 'PENDING'
_CANCELLED = 'CANCELLED'
@@ -69,7 +72,7 @@ class _EventLoopFuture(object):
the default event loop.
"""
if loop is None:
- self._loop = global_event_loop()
+ self._loop = _global_event_loop()._asyncio_wrapper
else:
self._loop = loop
self._callbacks = []
diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
new file mode 100644
index 000000000..ed4c6e519
--- /dev/null
+++ b/pym/portage/util/futures/unix_events.py
@@ -0,0 +1,91 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+ 'DefaultEventLoopPolicy',
+)
+
+from portage.util._eventloop.global_event_loop import (
+ global_event_loop as _global_event_loop,
+)
+from portage.util.futures import (
+ asyncio,
+ events,
+)
+from portage.util.futures.futures import Future
+
+
+class _PortageEventLoop(events.AbstractEventLoop):
+ """
+ Implementation of asyncio.AbstractEventLoop which wraps portage's
+ internal event loop.
+ """
+
+ def __init__(self, loop):
+ """
+ @type loop: EventLoop
+ @param loop: an instance of portage's internal event loop
+ """
+ self._loop = loop
+ self.call_soon = loop.call_soon
+ self.call_soon_threadsafe = loop.call_soon_threadsafe
+ self.call_later = loop.call_later
+ self.call_at = loop.call_at
+ self.is_closed = loop.is_closed
+ self.close = loop.close
+ self.create_future = loop.create_future
+ self.add_reader = loop.add_reader
+ self.remove_reader = loop.remove_reader
+ self.add_writer = loop.add_writer
+ self.remove_writer = loop.remove_writer
+ self.run_in_executor = loop.run_in_executor
+ self.time = loop.time
+ self.set_debug = loop.set_debug
+ self.get_debug = loop.get_debug
+
+ def run_until_complete(self, future):
+ """
+ Run the event loop until a Future is done.
+
+ @type future: asyncio.Future
+ @param future: a Future to wait for
+ @rtype: object
+ @return: the Future's result
+ @raise: the Future's exception
+ """
+ return self._loop.run_until_complete(
+ asyncio.ensure_future(future, loop=self))
+
+ def create_task(self, coro):
+ """
+ Schedule a coroutine object.
+
+ @type coro: coroutine
+ @param coro: a coroutine to schedule
+ @rtype: asyncio.Task
+ @return: a task object
+ """
+ return asyncio.Task(coro, loop=self)
+
+
+class _PortageEventLoopPolicy(events.AbstractEventLoopPolicy):
+ """
+ Implementation of asyncio.AbstractEventLoopPolicy based on portage's
+ internal event loop. This supports running event loops in forks,
+ which is not supported by the default asyncio event loop policy,
+ see https://bugs.python.org/issue22087.
+ """
+ def get_event_loop(self):
+ """
+ Get the event loop for the current context.
+
+ Returns an event loop object implementing the AbstractEventLoop
+ interface.
+
+ @rtype: asyncio.AbstractEventLoop (or compatible)
+ @return: the current event loop policy
+ """
+ return _global_event_loop()._asyncio_wrapper
+
+
+DefaultEventLoopPolicy = _PortageEventLoopPolicy
--
2.13.6
^ permalink raw reply related [relevance 99%]
Results 1-1 of 1 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2018-04-09 3:34 99% [gentoo-portage-dev] [PATCH] Add minimal asyncio.AbstractEventLoop implementation (bug 649588) Zac Medico
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox