public inbox for gentoo-portage-dev@lists.gentoo.org
 help / color / mirror / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download: 
* [gentoo-portage-dev] [PATCH] Add minimal asyncio.AbstractEventLoop implementation (bug 649588)
@ 2018-04-09  3:34 99% Zac Medico
  0 siblings, 0 replies; 1+ results
From: Zac Medico @ 2018-04-09  3:34 UTC (permalink / raw
  To: gentoo-portage-dev; +Cc: Zac Medico

This provides minimal interoperability with existing asyncio code,
by adding a portage.util.futures.unix_events.DefaultEventLoopPolicy
class that makes asyncio use portage's internal event loop when an
instance is passed into asyncio.set_event_loop_policy(). The
get_event_loop() method of this policy returns an instance of a
_PortageEventLoop class that wraps portage's internal event loop and
implements asyncio's AbstractEventLoop interface.

The portage.util.futures.asyncio module refers to the real
asyncio module when available, and otherwise falls back to a
minimal implementation that works with python2.7. The included
EventLoopInForkTestCase demonstrates usage, and works with all
supported versions of python, include python2.7.

Bug: https://bugs.gentoo.org/649588
---
 pym/portage/tests/util/futures/asyncio/__init__.py |   0
 pym/portage/tests/util/futures/asyncio/__test__.py |   0
 .../futures/asyncio/test_event_loop_in_fork.py     |  62 +++++++
 pym/portage/util/_eventloop/EventLoop.py           |  11 +-
 pym/portage/util/futures/__init__.py               |   9 +
 pym/portage/util/futures/_asyncio.py               | 116 +++++++++++++
 pym/portage/util/futures/events.py                 | 191 +++++++++++++++++++++
 pym/portage/util/futures/futures.py                |   7 +-
 pym/portage/util/futures/unix_events.py            |  91 ++++++++++
 9 files changed, 479 insertions(+), 8 deletions(-)
 create mode 100644 pym/portage/tests/util/futures/asyncio/__init__.py
 create mode 100644 pym/portage/tests/util/futures/asyncio/__test__.py
 create mode 100644 pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
 create mode 100644 pym/portage/util/futures/_asyncio.py
 create mode 100644 pym/portage/util/futures/events.py
 create mode 100644 pym/portage/util/futures/unix_events.py

diff --git a/pym/portage/tests/util/futures/asyncio/__init__.py b/pym/portage/tests/util/futures/asyncio/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/pym/portage/tests/util/futures/asyncio/__test__.py b/pym/portage/tests/util/futures/asyncio/__test__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
new file mode 100644
index 000000000..1ef46229b
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
@@ -0,0 +1,62 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import multiprocessing
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+
+
+def fork_main(parent_conn, child_conn):
+	parent_conn.close()
+	loop = asyncio.get_event_loop()
+	# This fails with python's default event loop policy,
+	# see https://bugs.python.org/issue22087.
+	loop.run_until_complete(asyncio.sleep(0.1))
+
+
+def async_main(loop=None):
+	loop = loop or asyncio.get_event_loop()
+	future = loop.create_future()
+
+	# Since python2.7 does not support Process.sentinel, use Pipe to
+	# monitor for process exit.
+	parent_conn, child_conn = multiprocessing.Pipe()
+
+	def eof_callback():
+		loop.remove_reader(parent_conn.fileno())
+		parent_conn.close()
+		future.set_result(None)
+
+	loop.add_reader(parent_conn.fileno(), eof_callback)
+	proc = multiprocessing.Process(target=fork_main, args=(parent_conn, child_conn))
+	proc.start()
+	child_conn.close()
+
+	return future
+
+
+class EventLoopInForkTestCase(TestCase):
+	"""
+	The default asyncio event loop policy does not support loops
+	running in forks, see https://bugs.python.org/issue22087.
+	Portage's DefaultEventLoopPolicy supports forks.
+	"""
+
+	def testEventLoopInForkTestCase(self):
+		initial_policy = asyncio.get_event_loop_policy()
+		if not isinstance(initial_policy, DefaultEventLoopPolicy):
+			asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+		try:
+			loop = asyncio.get_event_loop()
+			exit_future = loop.create_future()
+			def trigger_exit(*args):
+				exit_future.set_result(True)
+
+			def start_async_main():
+				async_main(loop=loop).add_done_callback(trigger_exit)
+
+			loop.call_soon(start_async_main)
+			loop.run_until_complete(exit_future)
+		finally:
+			asyncio.set_event_loop_policy(initial_policy)
diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
index 72eb407fc..d53a76ba1 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -23,8 +23,9 @@ except ImportError:
 
 import portage
 portage.proxy.lazyimport.lazyimport(globals(),
-	'portage.util.futures.futures:_EventLoopFuture',
+	'portage.util.futures.futures:Future',
 	'portage.util.futures.executor.fork:ForkExecutor',
+	'portage.util.futures.unix_events:_PortageEventLoop',
 )
 
 from portage import OrderedDict
@@ -188,15 +189,13 @@ class EventLoop(object):
 		self._sigchld_write = None
 		self._sigchld_src_id = None
 		self._pid = os.getpid()
+		self._asyncio_wrapper = _PortageEventLoop(loop=self)
 
 	def create_future(self):
 		"""
-		Create a Future object attached to the loop. This returns
-		an instance of _EventLoopFuture, because EventLoop is currently
-		missing some of the asyncio.AbstractEventLoop methods that
-		asyncio.Future requires.
+		Create a Future object attached to the loop.
 		"""
-		return _EventLoopFuture(loop=self)
+		return Future(loop=self._asyncio_wrapper)
 
 	def _new_source_id(self):
 		"""
diff --git a/pym/portage/util/futures/__init__.py b/pym/portage/util/futures/__init__.py
index e69de29bb..789080c85 100644
--- a/pym/portage/util/futures/__init__.py
+++ b/pym/portage/util/futures/__init__.py
@@ -0,0 +1,9 @@
+
+__all__ = (
+	'asyncio',
+)
+
+try:
+	import asyncio
+except ImportError:
+	from portage.util.futures import _asyncio as asyncio
diff --git a/pym/portage/util/futures/_asyncio.py b/pym/portage/util/futures/_asyncio.py
new file mode 100644
index 000000000..6874e133f
--- /dev/null
+++ b/pym/portage/util/futures/_asyncio.py
@@ -0,0 +1,116 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+	'ensure_future',
+	'get_event_loop',
+	'get_event_loop_policy',
+	'set_event_loop_policy',
+	'sleep',
+	'Task',
+)
+
+import functools
+
+try:
+	import threading
+except ImportError:
+	import dummy_threading as threading
+
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+	'portage.util.futures.unix_events:DefaultEventLoopPolicy',
+)
+from portage.util.futures.futures import Future
+
+_lock = threading.Lock()
+_policy = None
+
+
+def get_event_loop_policy():
+	"""
+	Get the current event loop policy.
+
+	@rtype: asyncio.AbstractEventLoopPolicy (or compatible)
+	@return: the current event loop policy
+	"""
+	global _lock, _policy
+	with _lock:
+		if _policy is None:
+			_policy = DefaultEventLoopPolicy()
+		return _policy
+
+
+def set_event_loop_policy(policy):
+	"""
+	Set the current event loop policy. If policy is None, the default
+	policy is restored.
+
+	@type policy: asyncio.AbstractEventLoopPolicy or None
+	@param policy: new event loop policy
+	"""
+	global _lock, _policy
+	with _lock:
+		_policy = policy or DefaultEventLoopPolicy()
+
+
+def get_event_loop():
+	"""
+	Equivalent to calling get_event_loop_policy().get_event_loop().
+
+	@rtype: asyncio.AbstractEventLoop (or compatible)
+	@return: the event loop for the current context
+	"""
+	return get_event_loop_policy().get_event_loop()
+
+
+class Task(Future):
+	"""
+	Schedule the execution of a coroutine: wrap it in a future. A task
+	is a subclass of Future.
+	"""
+	def __init__(self, coro, loop=None):
+		raise NotImplementedError
+
+
+def ensure_future(coro_or_future, loop=None):
+	"""
+	Wrap a coroutine or an awaitable in a future.
+
+	If the argument is a Future, it is returned directly.
+
+	@type coro_or_future: coroutine or Future
+	@param coro_or_future: coroutine or future to wrap
+	@type loop: asyncio.AbstractEventLoop (or compatible)
+	@param loop: event loop
+	@rtype: asyncio.Future (or compatible)
+	@return: an instance of Future
+	"""
+	if isinstance(coro_or_future, Future):
+		return coro_or_future
+	raise NotImplementedError
+
+
+def sleep(delay, result=None, loop=None):
+	"""
+	Create a future that completes after a given time (in seconds). If
+	result is provided, it is produced to the caller when the future
+	completes.
+
+	@type delay: int or float
+	@param delay: delay seconds
+	@type result: object
+	@param result: result of the future
+	@type loop: asyncio.AbstractEventLoop (or compatible)
+	@param loop: event loop
+	@rtype: asyncio.Future (or compatible)
+	@return: an instance of Future
+	"""
+	loop = loop or get_event_loop()
+	future = loop.create_future()
+	handle = loop.call_later(delay, functools.partial(future.set_result, result))
+	def cancel_callback(future):
+		if future.cancelled():
+			handle.cancel()
+	future.add_done_callback(cancel_callback)
+	return future
diff --git a/pym/portage/util/futures/events.py b/pym/portage/util/futures/events.py
new file mode 100644
index 000000000..b772bc242
--- /dev/null
+++ b/pym/portage/util/futures/events.py
@@ -0,0 +1,191 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+	'AbstractEventLoopPolicy',
+	'AbstractEventLoop',
+)
+
+import socket
+import subprocess
+
+try:
+	from asyncio.events import (
+		AbstractEventLoop as _AbstractEventLoop,
+		AbstractEventLoopPolicy as _AbstractEventLoopPolicy,
+	)
+except ImportError:
+	_AbstractEventLoop = object
+	_AbstractEventLoopPolicy = object
+
+
+class AbstractEventLoopPolicy(_AbstractEventLoopPolicy):
+    """Abstract policy for accessing the event loop."""
+
+    def get_event_loop(self):
+        raise NotImplementedError
+
+    def set_event_loop(self, loop):
+        raise NotImplementedError
+
+    def new_event_loop(self):
+        raise NotImplementedError
+
+    def get_child_watcher(self):
+        raise NotImplementedError
+
+    def set_child_watcher(self, watcher):
+        raise NotImplementedError
+
+
+class AbstractEventLoop(_AbstractEventLoop):
+	"""Abstract event loop."""
+
+	def run_forever(self):
+		raise NotImplementedError
+
+	def run_until_complete(self, future):
+		raise NotImplementedError
+
+	def stop(self):
+		raise NotImplementedError
+
+	def is_running(self):
+		raise NotImplementedError
+
+	def is_closed(self):
+		raise NotImplementedError
+
+	def close(self):
+		raise NotImplementedError
+
+	def shutdown_asyncgens(self):
+		raise NotImplementedError
+
+	def _timer_handle_cancelled(self, handle):
+		raise NotImplementedError
+
+	def call_soon(self, callback, *args):
+		return self.call_later(0, callback, *args)
+
+	def call_later(self, delay, callback, *args):
+		raise NotImplementedError
+
+	def call_at(self, when, callback, *args):
+		raise NotImplementedError
+
+	def time(self):
+		raise NotImplementedError
+
+	def create_future(self):
+		raise NotImplementedError
+
+	def create_task(self, coro):
+		raise NotImplementedError
+
+	def call_soon_threadsafe(self, callback, *args):
+		raise NotImplementedError
+
+	def run_in_executor(self, executor, func, *args):
+		raise NotImplementedError
+
+	def set_default_executor(self, executor):
+		raise NotImplementedError
+
+	def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
+		raise NotImplementedError
+
+	def getnameinfo(self, sockaddr, flags=0):
+		raise NotImplementedError
+
+	def create_connection(self, protocol_factory, host=None, port=None,
+						  ssl=None, family=0, proto=0, flags=0, sock=None,
+						  local_addr=None, server_hostname=None):
+		raise NotImplementedError
+
+	def create_server(self, protocol_factory, host=None, port=None,
+					  family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
+					  sock=None, backlog=100, ssl=None, reuse_address=None,
+					  reuse_port=None):
+		raise NotImplementedError
+
+	def create_unix_connection(self, protocol_factory, path,
+							   ssl=None, sock=None,
+							   server_hostname=None):
+		raise NotImplementedError
+
+	def create_unix_server(self, protocol_factory, path,
+						   sock=None, backlog=100, ssl=None):
+		raise NotImplementedError
+
+	def create_datagram_endpoint(self, protocol_factory,
+								 local_addr=None, remote_addr=None,
+								 family=0, proto=0, flags=0,
+								 reuse_address=None, reuse_port=None,
+								 allow_broadcast=None, sock=None):
+		raise NotImplementedError
+
+	def connect_read_pipe(self, protocol_factory, pipe):
+		raise NotImplementedError
+
+	def connect_write_pipe(self, protocol_factory, pipe):
+		raise NotImplementedError
+
+	def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
+						 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+						 **kwargs):
+		raise NotImplementedError
+
+	def subprocess_exec(self, protocol_factory, *args, **kwargs):
+		for k in ('stdin', 'stdout', 'stderr'):
+			kwargs.setdefault(k, subprocess.PIPE)
+		raise NotImplementedError
+
+	def add_writer(self, fd, callback, *args):
+		raise NotImplementedError
+
+	def remove_writer(self, fd):
+		raise NotImplementedError
+
+	def sock_recv(self, sock, nbytes):
+		raise NotImplementedError
+
+	def sock_sendall(self, sock, data):
+		raise NotImplementedError
+
+	def sock_connect(self, sock, address):
+		raise NotImplementedError
+
+	def sock_accept(self, sock):
+		raise NotImplementedError
+
+	def add_signal_handler(self, sig, callback, *args):
+		raise NotImplementedError
+
+	def remove_signal_handler(self, sig):
+		raise NotImplementedError
+
+	def set_task_factory(self, factory):
+		raise NotImplementedError
+
+	def get_task_factory(self):
+		raise NotImplementedError
+
+	def get_exception_handler(self):
+		raise NotImplementedError
+
+	def set_exception_handler(self, handler):
+		raise NotImplementedError
+
+	def default_exception_handler(self, context):
+		raise NotImplementedError
+
+	def call_exception_handler(self, context):
+		raise NotImplementedError
+
+	def get_debug(self):
+		raise NotImplementedError
+
+	def set_debug(self, enabled):
+		raise NotImplementedError
+
diff --git a/pym/portage/util/futures/futures.py b/pym/portage/util/futures/futures.py
index cd56a27eb..dc4e0a7d7 100644
--- a/pym/portage/util/futures/futures.py
+++ b/pym/portage/util/futures/futures.py
@@ -41,7 +41,10 @@ except ImportError:
 
 	Future = None
 
-from portage.util._eventloop.global_event_loop import global_event_loop
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+	'portage.util._eventloop.global_event_loop:global_event_loop@_global_event_loop',
+)
 
 _PENDING = 'PENDING'
 _CANCELLED = 'CANCELLED'
@@ -69,7 +72,7 @@ class _EventLoopFuture(object):
 		the default event loop.
 		"""
 		if loop is None:
-			self._loop = global_event_loop()
+			self._loop = _global_event_loop()._asyncio_wrapper
 		else:
 			self._loop = loop
 		self._callbacks = []
diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
new file mode 100644
index 000000000..ed4c6e519
--- /dev/null
+++ b/pym/portage/util/futures/unix_events.py
@@ -0,0 +1,91 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+	'DefaultEventLoopPolicy',
+)
+
+from portage.util._eventloop.global_event_loop import (
+	global_event_loop as _global_event_loop,
+)
+from portage.util.futures import (
+	asyncio,
+	events,
+)
+from portage.util.futures.futures import Future
+
+
+class _PortageEventLoop(events.AbstractEventLoop):
+	"""
+	Implementation of asyncio.AbstractEventLoop which wraps portage's
+	internal event loop.
+	"""
+
+	def __init__(self, loop):
+		"""
+		@type loop: EventLoop
+		@param loop: an instance of portage's internal event loop
+		"""
+		self._loop = loop
+		self.call_soon = loop.call_soon
+		self.call_soon_threadsafe = loop.call_soon_threadsafe
+		self.call_later = loop.call_later
+		self.call_at = loop.call_at
+		self.is_closed = loop.is_closed
+		self.close = loop.close
+		self.create_future = loop.create_future
+		self.add_reader = loop.add_reader
+		self.remove_reader = loop.remove_reader
+		self.add_writer = loop.add_writer
+		self.remove_writer = loop.remove_writer
+		self.run_in_executor = loop.run_in_executor
+		self.time = loop.time
+		self.set_debug = loop.set_debug
+		self.get_debug = loop.get_debug
+
+	def run_until_complete(self, future):
+		"""
+		Run the event loop until a Future is done.
+
+		@type future: asyncio.Future
+		@param future: a Future to wait for
+		@rtype: object
+		@return: the Future's result
+		@raise: the Future's exception
+		"""
+		return self._loop.run_until_complete(
+			asyncio.ensure_future(future, loop=self))
+
+	def create_task(self, coro):
+		"""
+		Schedule a coroutine object.
+
+		@type coro: coroutine
+		@param coro: a coroutine to schedule
+		@rtype: asyncio.Task
+		@return: a task object
+		"""
+		return asyncio.Task(coro, loop=self)
+
+
+class _PortageEventLoopPolicy(events.AbstractEventLoopPolicy):
+	"""
+	Implementation of asyncio.AbstractEventLoopPolicy based on portage's
+	internal event loop. This supports running event loops in forks,
+	which is not supported by the default asyncio event loop policy,
+	see https://bugs.python.org/issue22087.
+	"""
+	def get_event_loop(self):
+		"""
+		Get the event loop for the current context.
+
+		Returns an event loop object implementing the AbstractEventLoop
+		interface.
+
+		@rtype: asyncio.AbstractEventLoop (or compatible)
+		@return: the current event loop policy
+		"""
+		return _global_event_loop()._asyncio_wrapper
+
+
+DefaultEventLoopPolicy = _PortageEventLoopPolicy
-- 
2.13.6



^ permalink raw reply related	[relevance 99%]

Results 1-1 of 1 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2018-04-09  3:34 99% [gentoo-portage-dev] [PATCH] Add minimal asyncio.AbstractEventLoop implementation (bug 649588) Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox