* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_eventloop/
@ 2019-04-16 2:07 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2019-04-16 2:07 UTC (permalink / raw
To: gentoo-commits
commit: ee1f6ff69887dfa02d5c3a3ab3d61163fa504d15
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Apr 16 01:33:29 2019 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Apr 16 01:40:29 2019 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=ee1f6ff6
AsyncioEventLoop: enable SIGINT in exception handler (bug 672540)
Before the exception handler invokes the pdb shell, enable SIGINT so
that the user can exit with Control-C (otherwise SIGKILL is needed).
Bug: https://bugs.gentoo.org/672540
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_eventloop/asyncio_event_loop.py | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py
index ea0e03b23..a11a10205 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -73,6 +73,11 @@ class AsyncioEventLoop(_AbstractEventLoop):
# aid in diagnosis of the problem. If there's no tty, then
# exit immediately.
if all(s.isatty() for s in (sys.stdout, sys.stderr, sys.stdin)):
+ # Restore default SIGINT handler, since emerge's Scheduler
+ # has a SIGINT handler which delays exit until after
+ # cleanup, and cleanup cannot occur here since the event
+ # loop is suspended (see bug 672540).
+ signal.signal(signal.SIGINT, signal.SIG_DFL)
pdb.set_trace()
else:
# Normally emerge will wait for all coroutines to complete
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_eventloop/
@ 2020-03-02 5:03 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2020-03-02 5:03 UTC (permalink / raw
To: gentoo-commits
commit: 30150206fb0b3e013ef5b163b8d2f24c70a9d977
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Mar 2 04:56:49 2020 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Mar 2 05:01:06 2020 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=30150206
AsyncioEventLoop: always die with SIGTERM in exception handler (bug 705910)
Remove call to pdb.set_trace() in exception handler, since it's
not very useful, and always die with a SIGTERM for unexpected
exceptions here.
Bug: https://bugs.gentoo.org/705910
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_eventloop/asyncio_event_loop.py | 31 +++++++----------------
1 file changed, 9 insertions(+), 22 deletions(-)
diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py
index a11a10205..ce7e06923 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -1,10 +1,8 @@
-# Copyright 2018 Gentoo Foundation
+# Copyright 2018-2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import os
-import pdb
import signal
-import sys
try:
import asyncio as _real_asyncio
@@ -69,25 +67,14 @@ class AsyncioEventLoop(_AbstractEventLoop):
"""
loop.default_exception_handler(context)
if 'exception' in context:
- # If we have a tty then start the debugger, since in might
- # aid in diagnosis of the problem. If there's no tty, then
- # exit immediately.
- if all(s.isatty() for s in (sys.stdout, sys.stderr, sys.stdin)):
- # Restore default SIGINT handler, since emerge's Scheduler
- # has a SIGINT handler which delays exit until after
- # cleanup, and cleanup cannot occur here since the event
- # loop is suspended (see bug 672540).
- signal.signal(signal.SIGINT, signal.SIG_DFL)
- pdb.set_trace()
- else:
- # Normally emerge will wait for all coroutines to complete
- # after SIGTERM has been received. However, an unhandled
- # exception will prevent the interrupted coroutine from
- # completing, therefore use the default SIGTERM handler
- # in order to ensure that emerge exits immediately (though
- # uncleanly).
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- os.kill(os.getpid(), signal.SIGTERM)
+ # Normally emerge will wait for all coroutines to complete
+ # after SIGTERM has been received. However, an unhandled
+ # exception will prevent the interrupted coroutine from
+ # completing, therefore use the default SIGTERM handler
+ # in order to ensure that emerge exits immediately (though
+ # uncleanly).
+ signal.signal(signal.SIGTERM, signal.SIG_DFL)
+ os.kill(os.getpid(), signal.SIGTERM)
def _create_future(self):
"""
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_eventloop/
@ 2021-01-11 7:36 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2021-01-11 7:36 UTC (permalink / raw
To: gentoo-commits
commit: eeeb70bf50994c08ebcddac94474105f0635360c
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Jan 11 06:46:49 2021 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Jan 11 07:28:28 2021 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=eeeb70bf
AsyncioEventLoop: wrap child watcher for thread safety (bug 764905)
Use a child watcher wrapper to deliver the callbacks via the
call_soon_threadsafe method, since documentation for the asycio
AbstractChildWatcher class says that callbacks must be thread
safe.
Bug: https://bugs.gentoo.org/764905
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_eventloop/asyncio_event_loop.py | 30 ++++++++++++++++++++++-
1 file changed, 29 insertions(+), 1 deletion(-)
diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py
index 4d7047ae8..b77728088 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -6,6 +6,7 @@ import signal
import asyncio as _real_asyncio
from asyncio.events import AbstractEventLoop as _AbstractEventLoop
+from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
import portage
@@ -47,6 +48,7 @@ class AsyncioEventLoop(_AbstractEventLoop):
self.set_debug = loop.set_debug
self.get_debug = loop.get_debug
self._wakeup_fd = -1
+ self._child_watcher = None
if portage._internal_caller:
loop.set_exception_handler(self._internal_caller_exception_handler)
@@ -87,7 +89,9 @@ class AsyncioEventLoop(_AbstractEventLoop):
@rtype: asyncio.AbstractChildWatcher
@return: the internal event loop's AbstractChildWatcher interface
"""
- return _real_asyncio.get_child_watcher()
+ if self._child_watcher is None:
+ self._child_watcher = _ChildWatcherThreadSafetyWrapper(self, _real_asyncio.get_child_watcher())
+ return self._child_watcher
@property
def _asyncio_wrapper(self):
@@ -126,3 +130,27 @@ class AsyncioEventLoop(_AbstractEventLoop):
except ValueError:
# This is intended to fail when not called in the main thread.
pass
+
+
+class _ChildWatcherThreadSafetyWrapper(_AbstractChildWatcher):
+ def __init__(self, loop, real_watcher):
+ self._loop = loop
+ self._real_watcher = real_watcher
+
+ def close(self):
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, a, b, c):
+ pass
+
+ def _child_exit(self, pid, status, callback, *args):
+ self._loop.call_soon_threadsafe(callback, pid, status, *args)
+
+ def add_child_handler(self, pid, callback, *args):
+ self._real_watcher.add_child_handler(pid, self._child_exit, callback, *args)
+
+ def remove_child_handler(self, pid):
+ return self._real_watcher.remove_child_handler(pid)
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_eventloop/
@ 2021-03-06 11:04 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2021-03-06 11:04 UTC (permalink / raw
To: gentoo-commits
commit: 05e9b76523569f17c3cbf7465758656c3beb0be8
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat Mar 6 10:39:31 2021 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sat Mar 6 10:41:06 2021 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=05e9b765
Remove unused EventLoop class
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_eventloop/EventLoop.py | 1153 ------------------------------
1 file changed, 1153 deletions(-)
diff --git a/lib/portage/util/_eventloop/EventLoop.py b/lib/portage/util/_eventloop/EventLoop.py
deleted file mode 100644
index ff2b73255..000000000
--- a/lib/portage/util/_eventloop/EventLoop.py
+++ /dev/null
@@ -1,1153 +0,0 @@
-# Copyright 1999-2020 Gentoo Authors
-# Distributed under the terms of the GNU General Public License v2
-
-import asyncio as _real_asyncio
-import collections
-import errno
-import functools
-import logging
-import os
-import select
-import signal
-import time
-import traceback
-
-try:
- import fcntl
-except ImportError:
- # http://bugs.jython.org/issue1074
- fcntl = None
-
-try:
- import threading
-except ImportError:
- import dummy_threading as threading
-
-import portage
-portage.proxy.lazyimport.lazyimport(globals(),
- 'portage.util.futures:asyncio',
- 'portage.util.futures.executor.fork:ForkExecutor',
- 'portage.util.futures.unix_events:_PortageEventLoop,_PortageChildWatcher',
-)
-
-from portage.util import writemsg_level
-from ..SlotObject import SlotObject
-from .PollConstants import PollConstants
-from .PollSelectAdapter import PollSelectAdapter
-
-class EventLoop:
- """
- An event loop, intended to be compatible with the GLib event loop.
- Call the iteration method in order to execute one iteration of the
- loop. The idle_add and timeout_add methods serve as thread-safe
- means to interact with the loop's thread.
- """
-
- supports_multiprocessing = True
-
- # TODO: Find out why SIGCHLD signals aren't delivered during poll
- # calls, forcing us to wakeup in order to receive them.
- _sigchld_interval = 250
-
- class _child_callback_class(SlotObject):
- __slots__ = ("callback", "data", "pid", "source_id")
-
- class _idle_callback_class(SlotObject):
- __slots__ = ("_args", "_callback", "_cancelled")
-
- class _io_handler_class(SlotObject):
- __slots__ = ("args", "callback", "f", "source_id")
-
- class _timeout_handler_class(SlotObject):
- __slots__ = ("args", "function", "calling", "interval", "source_id",
- "timestamp")
-
- class _handle:
- """
- A callback wrapper object, compatible with asyncio.Handle.
- """
- __slots__ = ("_callback_id", "_loop")
-
- def __init__(self, callback_id, loop):
- self._callback_id = callback_id
- self._loop = loop
-
- def cancel(self):
- """
- Cancel the call. If the callback is already canceled or executed,
- this method has no effect.
- """
- self._loop.source_remove(self._callback_id)
-
- class _call_soon_callback:
- """
- Wraps a call_soon callback, and always returns False, since these
- callbacks are only supposed to run once.
- """
- __slots__ = ("_args", "_callback")
-
- def __init__(self, callback, args):
- self._callback = callback
- self._args = args
-
- def __call__(self):
- self._callback(*self._args)
- return False
-
- class _selector_callback:
- """
- Wraps an callback, and always returns True, for callbacks that
- are supposed to run repeatedly.
- """
- __slots__ = ("_args", "_callbacks")
-
- def __init__(self, callbacks):
- self._callbacks = callbacks
-
- def __call__(self, fd, event):
- for callback, mask in self._callbacks:
- if event & mask:
- callback()
- return True
-
- def __init__(self, main=True):
- """
- @param main: If True then this is a singleton instance for use
- in the main thread, otherwise it is a local instance which
- can safely be use in a non-main thread (default is True, so
- that global_event_loop does not need constructor arguments)
- @type main: bool
- """
- self._use_signal = main and fcntl is not None
- self._debug = bool(os.environ.get('PYTHONASYNCIODEBUG'))
- self._thread_rlock = threading.RLock()
- self._thread_condition = threading.Condition(self._thread_rlock)
- self._poll_event_queue = []
- self._poll_event_handlers = {}
- self._poll_event_handler_ids = {}
- # Number of current calls to self.iteration(). A number greater
- # than 1 indicates recursion, which is not supported by asyncio's
- # default event loop.
- self._iteration_depth = 0
- # Increment id for each new handler.
- self._event_handler_id = 0
- # New call_soon callbacks must have an opportunity to
- # execute before it's safe to wait on self._thread_condition
- # without a timeout, since delaying its execution indefinitely
- # could lead to a deadlock. The following attribute stores the
- # event handler id of the most recently added call_soon callback.
- # If this attribute has changed since the last time that the
- # call_soon callbacks have been called, then it's not safe to
- # wait on self._thread_condition without a timeout.
- self._call_soon_id = None
- # Use deque, with thread-safe append, in order to emulate the FIFO
- # queue behavior of the AbstractEventLoop.call_soon method.
- self._idle_callbacks = collections.deque()
- self._idle_callbacks_remaining = 0
- self._timeout_handlers = {}
- self._timeout_interval = None
- self._default_executor = None
-
- self._poll_obj = None
- try:
- select.epoll
- except AttributeError:
- pass
- else:
- try:
- epoll_obj = select.epoll()
- except IOError:
- # This happens with Linux 2.4 kernels:
- # IOError: [Errno 38] Function not implemented
- pass
- else:
- self._poll_obj = _epoll_adapter(epoll_obj)
- self.IO_ERR = select.EPOLLERR
- self.IO_HUP = select.EPOLLHUP
- self.IO_IN = select.EPOLLIN
- self.IO_NVAL = 0
- self.IO_OUT = select.EPOLLOUT
- self.IO_PRI = select.EPOLLPRI
-
- if self._poll_obj is None:
- self._poll_obj = create_poll_instance()
- self.IO_ERR = PollConstants.POLLERR
- self.IO_HUP = PollConstants.POLLHUP
- self.IO_IN = PollConstants.POLLIN
- self.IO_NVAL = PollConstants.POLLNVAL
- self.IO_OUT = PollConstants.POLLOUT
- self.IO_PRI = PollConstants.POLLPRI
-
- # These trigger both reader and writer callbacks.
- EVENT_SHARED = self.IO_HUP | self.IO_ERR | self.IO_NVAL
-
- self._EVENT_READ = self.IO_IN | EVENT_SHARED
- self._EVENT_WRITE = self.IO_OUT | EVENT_SHARED
-
- self._child_handlers = {}
- self._sigchld_read = None
- self._sigchld_write = None
- self._sigchld_src_id = None
- self._pid = portage.getpid()
- self._asyncio_wrapper = _PortageEventLoop(loop=self)
- self._asyncio_child_watcher = _PortageChildWatcher(self)
-
- def create_future(self):
- """
- Create a Future object attached to the loop.
- """
- return asyncio.Future(loop=self._asyncio_wrapper)
-
- def _new_source_id(self):
- """
- Generate a new source id. This method is thread-safe.
- """
- with self._thread_rlock:
- self._event_handler_id += 1
- return self._event_handler_id
-
- def _poll(self, timeout=None):
- """
- All poll() calls pass through here. The poll events
- are added directly to self._poll_event_queue.
- In order to avoid endless blocking, this raises
- StopIteration if timeout is None and there are
- no file descriptors to poll.
- """
-
- if timeout is None and \
- not self._poll_event_handlers:
- raise StopIteration(
- "timeout is None and there are no poll() event handlers")
-
- while True:
- try:
- self._poll_event_queue.extend(self._poll_obj.poll(timeout))
- break
- except (IOError, select.error) as e:
- # Silently handle EINTR, which is normal when we have
- # received a signal such as SIGINT (epoll objects may
- # raise IOError rather than select.error, at least in
- # Python 3.2).
- if not (e.args and e.args[0] == errno.EINTR):
- writemsg_level("\n!!! select error: %s\n" % (e,),
- level=logging.ERROR, noiselevel=-1)
- del e
-
- # This typically means that we've received a SIGINT, so
- # raise StopIteration in order to break out of our current
- # iteration and respond appropriately to the signal as soon
- # as possible.
- raise StopIteration("interrupted")
-
- def iteration(self, *args):
- """
- Like glib.MainContext.iteration(), runs a single iteration. In order
- to avoid blocking forever when may_block is True (the default),
- callers must be careful to ensure that at least one of the following
- conditions is met:
- 1) An event source or timeout is registered which is guaranteed
- to trigger at least on event (a call to an idle function
- only counts as an event if it returns a False value which
- causes it to stop being called)
- 2) Another thread is guaranteed to call one of the thread-safe
- methods which notify iteration to stop waiting (such as
- idle_add or timeout_add).
- These rules ensure that iteration is able to block until an event
- arrives, without doing any busy waiting that would waste CPU time.
- @type may_block: bool
- @param may_block: if True the call may block waiting for an event
- (default is True).
- @rtype: bool
- @return: True if events were dispatched.
- """
- self._iteration_depth += 1
- try:
- return self._iteration(*args)
- finally:
- self._iteration_depth -= 1
-
- def _iteration(self, *args):
- may_block = True
-
- if args:
- if len(args) > 1:
- raise TypeError(
- "expected at most 1 argument (%s given)" % len(args))
- may_block = args[0]
-
- event_queue = self._poll_event_queue
- event_handlers = self._poll_event_handlers
- events_handled = 0
- timeouts_checked = False
-
- if not event_handlers:
- with self._thread_condition:
- prev_call_soon_id = self._call_soon_id
- if self._run_timeouts():
- events_handled += 1
- timeouts_checked = True
-
- call_soon = prev_call_soon_id is not self._call_soon_id
- if self._call_soon_id is not None and self._call_soon_id._cancelled:
- # Allow garbage collection of cancelled callback.
- self._call_soon_id = None
-
- if (not call_soon and not event_handlers
- and not events_handled and may_block):
- # Block so that we don't waste cpu time by looping too
- # quickly. This makes EventLoop useful for code that needs
- # to wait for timeout callbacks regardless of whether or
- # not any IO handlers are currently registered.
- timeout = self._get_poll_timeout()
- if timeout is None:
- wait_timeout = None
- else:
- wait_timeout = timeout / 1000
- # NOTE: In order to avoid a possible infinite wait when
- # wait_timeout is None, the previous _run_timeouts()
- # call must have returned False *with* _thread_condition
- # acquired. Otherwise, we would risk going to sleep after
- # our only notify event has already passed.
- self._thread_condition.wait(wait_timeout)
- if self._run_timeouts():
- events_handled += 1
- timeouts_checked = True
-
- # If any timeouts have executed, then return immediately,
- # in order to minimize latency in termination of iteration
- # loops that they may control.
- if events_handled or not event_handlers:
- return bool(events_handled)
-
- if not event_queue:
-
- if may_block:
- timeout = self._get_poll_timeout()
-
- # Avoid blocking for IO if there are any timeout
- # or idle callbacks available to process.
- if timeout != 0 and not timeouts_checked:
- if self._run_timeouts():
- events_handled += 1
- timeouts_checked = True
- if events_handled:
- # Minimize latency for loops controlled
- # by timeout or idle callback events.
- timeout = 0
- else:
- timeout = 0
-
- try:
- self._poll(timeout=timeout)
- except StopIteration:
- # This can be triggered by EINTR which is caused by signals.
- pass
-
- # NOTE: IO event handlers may be re-entrant, in case something
- # like AbstractPollTask._wait_loop() needs to be called inside
- # a handler for some reason.
- while event_queue:
- events_handled += 1
- f, event = event_queue.pop()
- try:
- x = event_handlers[f]
- except KeyError:
- # This is known to be triggered by the epoll
- # implementation in qemu-user-1.2.2, and appears
- # to be harmless (see bug #451326).
- continue
- if not x.callback(f, event, *x.args):
- self.source_remove(x.source_id)
-
- if not timeouts_checked:
- if self._run_timeouts():
- events_handled += 1
- timeouts_checked = True
-
- return bool(events_handled)
-
- def _get_poll_timeout(self):
-
- with self._thread_rlock:
- if self._child_handlers:
- if self._timeout_interval is None:
- timeout = self._sigchld_interval
- else:
- timeout = min(self._sigchld_interval,
- self._timeout_interval)
- else:
- timeout = self._timeout_interval
-
- return timeout
-
- def child_watch_add(self, pid, callback, data=None):
- """
- Like glib.child_watch_add(), sets callback to be called with the
- user data specified by data when the child indicated by pid exits.
- The signature for the callback is:
-
- def callback(pid, condition, user_data)
-
- where pid is is the child process id, condition is the status
- information about the child process and user_data is data.
-
- @type int
- @param pid: process id of a child process to watch
- @type callback: callable
- @param callback: a function to call
- @type data: object
- @param data: the optional data to pass to function
- @rtype: int
- @return: an integer ID
- """
- source_id = self._new_source_id()
- self._child_handlers[source_id] = self._child_callback_class(
- callback=callback, data=data, pid=pid, source_id=source_id)
-
- if self._use_signal:
- if self._sigchld_read is None:
- self._sigchld_read, self._sigchld_write = os.pipe()
-
- fcntl.fcntl(self._sigchld_read, fcntl.F_SETFL,
- fcntl.fcntl(self._sigchld_read,
- fcntl.F_GETFL) | os.O_NONBLOCK)
-
- # The IO watch is dynamically registered and unregistered as
- # needed, since we don't want to consider it as a valid source
- # of events when there are no child listeners. It's important
- # to distinguish when there are no valid sources of IO events,
- # in order to avoid an endless poll call if there's no timeout.
- if self._sigchld_src_id is None:
- self._sigchld_src_id = self.io_add_watch(
- self._sigchld_read, self.IO_IN, self._sigchld_io_cb)
- signal.signal(signal.SIGCHLD, self._sigchld_sig_cb)
-
- # poll soon, in case the SIGCHLD has already arrived
- self.call_soon(self._poll_child_processes)
- return source_id
-
- def _sigchld_sig_cb(self, signum, frame):
- # If this signal handler was not installed by the
- # current process then the signal doesn't belong to
- # this EventLoop instance.
- if portage.getpid() == self._pid:
- os.write(self._sigchld_write, b'\0')
-
- def _sigchld_io_cb(self, fd, events):
- try:
- while True:
- os.read(self._sigchld_read, 4096)
- except OSError:
- # read until EAGAIN
- pass
- self._poll_child_processes()
- return True
-
- def _poll_child_processes(self):
- if not self._child_handlers:
- return False
-
- calls = 0
-
- for x in list(self._child_handlers.values()):
- if x.source_id not in self._child_handlers:
- # it's already been called via re-entrance
- continue
- try:
- wait_retval = os.waitpid(x.pid, os.WNOHANG)
- except OSError as e:
- if e.errno != errno.ECHILD:
- raise
- del e
- self.source_remove(x.source_id)
- else:
- # With waitpid and WNOHANG, only check the
- # first element of the tuple since the second
- # element may vary (bug #337465).
- if wait_retval[0] != 0:
- calls += 1
- self.source_remove(x.source_id)
- x.callback(x.pid, wait_retval[1], x.data)
-
- return bool(calls)
-
- def idle_add(self, callback, *args):
- """
- Like glib.idle_add(), if callback returns False it is
- automatically removed from the list of event sources and will
- not be called again. This method is thread-safe.
-
- The idle_add method is deprecated. Use the call_soon and
- call_soon_threadsafe methods instead.
-
- @type callback: callable
- @param callback: a function to call
- @return: a handle which can be used to cancel the callback
- via the source_remove method
- @rtype: object
- """
- with self._thread_condition:
- source_id = self._idle_add(callback, *args)
- self._thread_condition.notify()
- return source_id
-
- def _idle_add(self, callback, *args):
- """Like idle_add(), but without thread safety."""
- # Hold self._thread_condition when assigning self._call_soon_id,
- # since it might be modified via a thread-safe method.
- with self._thread_condition:
- handle = self._call_soon_id = self._idle_callback_class(
- _args=args, _callback=callback)
- # This deque append is thread-safe, but it does *not* notify the
- # loop's thread, so the caller must notify if appropriate.
- self._idle_callbacks.append(handle)
- return handle
-
- def _run_idle_callbacks(self):
- # assumes caller has acquired self._thread_rlock
- if not self._idle_callbacks:
- return False
- state_change = 0
- reschedule = []
- # Use remaining count to avoid calling any newly scheduled callbacks,
- # since self._idle_callbacks can be modified during the exection of
- # these callbacks. The remaining count can be reset by recursive
- # calls to this method. Recursion must remain supported until all
- # consumers of AsynchronousLock.unlock() have been migrated to the
- # async_unlock() method, see bug 614108.
- self._idle_callbacks_remaining = len(self._idle_callbacks)
-
- while self._idle_callbacks_remaining:
- self._idle_callbacks_remaining -= 1
- try:
- x = self._idle_callbacks.popleft() # thread-safe
- except IndexError:
- break
- if x._cancelled:
- # it got cancelled while executing another callback
- continue
- if x._callback(*x._args):
- # Reschedule, but not until after it's called, since
- # we don't want it to call itself in a recursive call
- # to this method.
- self._idle_callbacks.append(x)
- else:
- x._cancelled = True
- state_change += 1
-
- return bool(state_change)
-
- def timeout_add(self, interval, function, *args):
- """
- Like glib.timeout_add(), interval argument is the number of
- milliseconds between calls to your function, and your function
- should return False to stop being called, or True to continue
- being called. Any additional positional arguments given here
- are passed to your function when it's called. This method is
- thread-safe.
- """
- with self._thread_condition:
- source_id = self._new_source_id()
- self._timeout_handlers[source_id] = \
- self._timeout_handler_class(
- interval=interval, function=function, args=args,
- source_id=source_id, timestamp=self.time())
- if self._timeout_interval is None or \
- self._timeout_interval > interval:
- self._timeout_interval = interval
- self._thread_condition.notify()
- return source_id
-
- def _run_timeouts(self):
-
- calls = 0
- if not self._use_signal:
- if self._poll_child_processes():
- calls += 1
-
- with self._thread_rlock:
-
- if self._run_idle_callbacks():
- calls += 1
-
- if not self._timeout_handlers:
- return bool(calls)
-
- ready_timeouts = []
- current_time = self.time()
- for x in self._timeout_handlers.values():
- elapsed_seconds = current_time - x.timestamp
- # elapsed_seconds < 0 means the system clock has been adjusted
- if elapsed_seconds < 0 or \
- (x.interval - 1000 * elapsed_seconds) <= 0:
- ready_timeouts.append(x)
-
- # Iterate of our local list, since self._timeout_handlers can be
- # modified during the exection of these callbacks.
- for x in ready_timeouts:
- if x.source_id not in self._timeout_handlers:
- # it got cancelled while executing another timeout
- continue
- if x.calling:
- # don't call it recursively
- continue
- calls += 1
- x.calling = True
- try:
- x.timestamp = self.time()
- if not x.function(*x.args):
- self.source_remove(x.source_id)
- finally:
- x.calling = False
-
- return bool(calls)
-
- def add_reader(self, fd, callback, *args):
- """
- Start watching the file descriptor for read availability and then
- call the callback with specified arguments.
-
- Use functools.partial to pass keywords to the callback.
- """
- handler = self._poll_event_handlers.get(fd)
- callbacks = [(functools.partial(callback, *args), self._EVENT_READ)]
- selector_mask = self._EVENT_READ
- if handler is not None:
- if not isinstance(handler.callback, self._selector_callback):
- raise AssertionError("add_reader called with fd "
- "registered directly via io_add_watch")
- for item in handler.callback._callbacks:
- callback, mask = item
- if mask != self._EVENT_READ:
- selector_mask |= mask
- callbacks.append(item)
- self.source_remove(handler.source_id)
- self.io_add_watch(fd, selector_mask, self._selector_callback(callbacks))
-
- def remove_reader(self, fd):
- """
- Stop watching the file descriptor for read availability.
- """
- handler = self._poll_event_handlers.get(fd)
- if handler is not None:
- if not isinstance(handler.callback, self._selector_callback):
- raise AssertionError("remove_reader called with fd "
- "registered directly via io_add_watch")
- callbacks = []
- selector_mask = 0
- removed = False
- for item in handler.callback._callbacks:
- callback, mask = item
- if mask == self._EVENT_READ:
- removed = True
- else:
- selector_mask |= mask
- callbacks.append(item)
- self.source_remove(handler.source_id)
- if callbacks:
- self.io_add_watch(fd, selector_mask,
- self._selector_callback(callbacks))
- return removed
- return False
-
- def add_writer(self, fd, callback, *args):
- """
- Start watching the file descriptor for write availability and then
- call the callback with specified arguments.
-
- Use functools.partial to pass keywords to the callback.
- """
- handler = self._poll_event_handlers.get(fd)
- callbacks = [(functools.partial(callback, *args), self._EVENT_WRITE)]
- selector_mask = self._EVENT_WRITE
- if handler is not None:
- if not isinstance(handler.callback, self._selector_callback):
- raise AssertionError("add_reader called with fd "
- "registered directly via io_add_watch")
- for item in handler.callback._callbacks:
- callback, mask = item
- if mask != self._EVENT_WRITE:
- selector_mask |= mask
- callbacks.append(item)
- self.source_remove(handler.source_id)
- self.io_add_watch(fd, selector_mask, self._selector_callback(callbacks))
-
- def remove_writer(self, fd):
- """
- Stop watching the file descriptor for write availability.
- """
- handler = self._poll_event_handlers.get(fd)
- if handler is not None:
- if not isinstance(handler.callback, self._selector_callback):
- raise AssertionError("remove_reader called with fd "
- "registered directly via io_add_watch")
- callbacks = []
- selector_mask = 0
- removed = False
- for item in handler.callback._callbacks:
- callback, mask = item
- if mask == self._EVENT_WRITE:
- removed = True
- else:
- selector_mask |= mask
- callbacks.append(item)
- self.source_remove(handler.source_id)
- if callbacks:
- self.io_add_watch(fd, selector_mask,
- self._selector_callback(callbacks))
- return removed
- return False
-
- def io_add_watch(self, f, condition, callback, *args):
- """
- Like glib.io_add_watch(), your function should return False to
- stop being called, or True to continue being called. Any
- additional positional arguments given here are passed to your
- function when it's called.
-
- @type f: int or object with fileno() method
- @param f: a file descriptor to monitor
- @type condition: int
- @param condition: a condition mask
- @type callback: callable
- @param callback: a function to call
- @rtype: int
- @return: an integer ID of the event source
- """
- if f in self._poll_event_handlers:
- raise AssertionError("fd %d is already registered" % f)
- source_id = self._new_source_id()
- self._poll_event_handler_ids[source_id] = f
- self._poll_event_handlers[f] = self._io_handler_class(
- args=args, callback=callback, f=f, source_id=source_id)
- self._poll_obj.register(f, condition)
- return source_id
-
- def source_remove(self, reg_id):
- """
- Like glib.source_remove(), this returns True if the given reg_id
- is found and removed, and False if the reg_id is invalid or has
- already been removed.
- """
- if isinstance(reg_id, self._idle_callback_class):
- if not reg_id._cancelled:
- reg_id._cancelled = True
- return True
- return False
-
- x = self._child_handlers.pop(reg_id, None)
- if x is not None:
- if not self._child_handlers and self._use_signal:
- signal.signal(signal.SIGCHLD, signal.SIG_DFL)
- self.source_remove(self._sigchld_src_id)
- self._sigchld_src_id = None
- return True
-
- with self._thread_rlock:
- timeout_handler = self._timeout_handlers.pop(reg_id, None)
- if timeout_handler is not None:
- if timeout_handler.interval == self._timeout_interval:
- if self._timeout_handlers:
- self._timeout_interval = min(x.interval
- for x in self._timeout_handlers.values())
- else:
- self._timeout_interval = None
- return True
-
- f = self._poll_event_handler_ids.pop(reg_id, None)
- if f is None:
- return False
- self._poll_obj.unregister(f)
- if self._poll_event_queue:
- # Discard any unhandled events that belong to this file,
- # in order to prevent these events from being erroneously
- # delivered to a future handler that is using a reallocated
- # file descriptor of the same numeric value (causing
- # extremely confusing bugs).
- remaining_events = []
- discarded_events = False
- for event in self._poll_event_queue:
- if event[0] == f:
- discarded_events = True
- else:
- remaining_events.append(event)
-
- if discarded_events:
- self._poll_event_queue[:] = remaining_events
-
- del self._poll_event_handlers[f]
- return True
-
- def run_until_complete(self, future):
- """
- Run until the Future is done.
-
- @type future: asyncio.Future
- @param future: a Future to wait for
- @rtype: object
- @return: the Future's result
- @raise: the Future's exception
- """
- future = asyncio.ensure_future(future, loop=self._asyncio_wrapper)
-
- # Since done callbacks are executed via call_soon, it's desirable
- # to continue iterating until those callbacks have executed, which
- # is easily achieved by registering a done callback and waiting for
- # it to execute.
- waiter = self.create_future()
- future.add_done_callback(waiter.set_result)
- while not waiter.done():
- self.iteration()
-
- return future.result()
-
- def call_soon(self, callback, *args, **kwargs):
- """
- Arrange for a callback to be called as soon as possible. The callback
- is called after call_soon() returns, when control returns to the event
- loop.
-
- This operates as a FIFO queue, callbacks are called in the order in
- which they are registered. Each callback will be called exactly once.
-
- Any positional arguments after the callback will be passed to the
- callback when it is called.
-
- The context argument currently does nothing, but exists for minimal
- interoperability with Future instances that require it for PEP 567.
-
- An object compatible with asyncio.Handle is returned, which can
- be used to cancel the callback.
-
- @type callback: callable
- @param callback: a function to call
- @type context: contextvars.Context
- @param context: An optional keyword-only context argument allows
- specifying a custom contextvars.Context for the callback to run
- in. The current context is used when no context is provided.
- @return: a handle which can be used to cancel the callback
- @rtype: asyncio.Handle (or compatible)
- """
- try:
- unexpected = next(key for key in kwargs if key != 'context')
- except StopIteration:
- pass
- else:
- raise TypeError("call_soon() got an unexpected keyword argument '%s'" % unexpected)
- return self._handle(self._idle_add(
- self._call_soon_callback(callback, args)), self)
-
- def call_soon_threadsafe(self, callback, *args, **kwargs):
- """Like call_soon(), but thread safe."""
- try:
- unexpected = next(key for key in kwargs if key != 'context')
- except StopIteration:
- pass
- else:
- raise TypeError("call_soon_threadsafe() got an unexpected keyword argument '%s'" % unexpected)
- # idle_add provides thread safety
- return self._handle(self.idle_add(
- self._call_soon_callback(callback, args)), self)
-
- def time(self):
- """Return the time according to the event loop's clock.
-
- This is a float expressed in seconds since an epoch, but the
- epoch, precision, accuracy and drift are unspecified and may
- differ per event loop.
- """
- return time.monotonic()
-
- def call_later(self, delay, callback, *args, **kwargs):
- """
- Arrange for the callback to be called after the given delay seconds
- (either an int or float).
-
- An instance of asyncio.Handle is returned, which can be used to cancel
- the callback.
-
- callback will be called exactly once per call to call_later(). If two
- callbacks are scheduled for exactly the same time, it is undefined
- which will be called first.
-
- The optional positional args will be passed to the callback when
- it is called. If you want the callback to be called with some named
- arguments, use a closure or functools.partial().
-
- The context argument currently does nothing, but exists for minimal
- interoperability with Future instances that require it for PEP 567.
-
- Use functools.partial to pass keywords to the callback.
-
- @type delay: int or float
- @param delay: delay seconds
- @type callback: callable
- @param callback: a function to call
- @type context: contextvars.Context
- @param context: An optional keyword-only context argument allows
- specifying a custom contextvars.Context for the callback to run
- in. The current context is used when no context is provided.
- @return: a handle which can be used to cancel the callback
- @rtype: asyncio.Handle (or compatible)
- """
- try:
- unexpected = next(key for key in kwargs if key != 'context')
- except StopIteration:
- pass
- else:
- raise TypeError("call_later() got an unexpected keyword argument '%s'" % unexpected)
- return self._handle(self.timeout_add(
- delay * 1000, self._call_soon_callback(callback, args)), self)
-
- def call_at(self, when, callback, *args, **kwargs):
- """
- Arrange for the callback to be called at the given absolute
- timestamp when (an int or float), using the same time reference as
- AbstractEventLoop.time().
-
- This method's behavior is the same as call_later().
-
- An instance of asyncio.Handle is returned, which can be used to
- cancel the callback.
-
- Use functools.partial to pass keywords to the callback.
-
- @type when: int or float
- @param when: absolute timestamp when to call callback
- @type callback: callable
- @param callback: a function to call
- @type context: contextvars.Context
- @param context: An optional keyword-only context argument allows
- specifying a custom contextvars.Context for the callback to run
- in. The current context is used when no context is provided.
- @return: a handle which can be used to cancel the callback
- @rtype: asyncio.Handle (or compatible)
- """
- try:
- unexpected = next(key for key in kwargs if key != 'context')
- except StopIteration:
- pass
- else:
- raise TypeError("call_at() got an unexpected keyword argument '%s'" % unexpected)
- delta = when - self.time()
- return self.call_later(delta if delta > 0 else 0, callback, *args)
-
- def run_in_executor(self, executor, func, *args):
- """
- Arrange for a func to be called in the specified executor.
-
- The executor argument should be an Executor instance. The default
- executor is used if executor is None.
-
- Use functools.partial to pass keywords to the *func*.
-
- @param executor: executor
- @type executor: concurrent.futures.Executor or None
- @param func: a function to call
- @type func: callable
- @return: a Future
- @rtype: asyncio.Future (or compatible)
- """
- if executor is None:
- executor = self._default_executor
- if executor is None:
- executor = ForkExecutor(loop=self)
- self._default_executor = executor
- future = executor.submit(func, *args)
- future = _real_asyncio.wrap_future(future,
- loop=self._asyncio_wrapper)
- return future
-
- def is_running(self):
- """Return whether the event loop is currently running."""
- return self._iteration_depth > 0
-
- def is_closed(self):
- """Returns True if the event loop was closed."""
- return self._poll_obj is None
-
- def close(self):
- """Close the event loop.
-
- This clears the queues and shuts down the executor,
- and waits for it to finish.
- """
- executor = self._default_executor
- if executor is not None:
- self._default_executor = None
- executor.shutdown(wait=True)
-
- if self._poll_obj is not None:
- close = getattr(self._poll_obj, 'close', None)
- if close is not None:
- close()
- self._poll_obj = None
-
- def default_exception_handler(self, context):
- """
- Default exception handler.
-
- This is called when an exception occurs and no exception
- handler is set, and can be called by a custom exception
- handler that wants to defer to the default behavior.
-
- The context parameter has the same meaning as in
- `call_exception_handler()`.
-
- @param context: exception context
- @type context: dict
- """
- message = context.get('message')
- if not message:
- message = 'Unhandled exception in event loop'
-
- exception = context.get('exception')
- if exception is not None:
- exc_info = (type(exception), exception, exception.__traceback__)
- else:
- exc_info = False
-
- log_lines = [message]
- for key in sorted(context):
- if key in {'message', 'exception'}:
- continue
- value = context[key]
- if key == 'source_traceback':
- tb = ''.join(traceback.format_list(value))
- value = 'Object created at (most recent call last):\n'
- value += tb.rstrip()
- elif key == 'handle_traceback':
- tb = ''.join(traceback.format_list(value))
- value = 'Handle created at (most recent call last):\n'
- value += tb.rstrip()
- else:
- value = repr(value)
- log_lines.append('{}: {}'.format(key, value))
-
- logging.error('\n'.join(log_lines), exc_info=exc_info)
- os.kill(portage.getpid(), signal.SIGTERM)
-
- def call_exception_handler(self, context):
- """
- Call the current event loop's exception handler.
-
- The context argument is a dict containing the following keys:
-
- - 'message': Error message;
- - 'exception' (optional): Exception object;
- - 'future' (optional): Future instance;
- - 'handle' (optional): Handle instance;
- - 'protocol' (optional): Protocol instance;
- - 'transport' (optional): Transport instance;
- - 'socket' (optional): Socket instance;
- - 'asyncgen' (optional): Asynchronous generator that caused
- the exception.
-
- New keys may be introduced in the future.
-
- @param context: exception context
- @type context: dict
- """
- self.default_exception_handler(context)
-
- def get_debug(self):
- """
- Get the debug mode (bool) of the event loop.
-
- The default value is True if the environment variable
- PYTHONASYNCIODEBUG is set to a non-empty string, False otherwise.
- """
- return self._debug
-
- def set_debug(self, enabled):
- """Set the debug mode of the event loop."""
- self._debug = enabled
-
-
-_can_poll_device = None
-
-def can_poll_device():
- """
- Test if it's possible to use poll() on a device such as a pty. This
- is known to fail on Darwin.
- @rtype: bool
- @return: True if poll() on a device succeeds, False otherwise.
- """
-
- global _can_poll_device
- if _can_poll_device is not None:
- return _can_poll_device
-
- if not hasattr(select, "poll"):
- _can_poll_device = False
- return _can_poll_device
-
- try:
- dev_null = open('/dev/null', 'rb')
- except IOError:
- _can_poll_device = False
- return _can_poll_device
-
- p = select.poll()
- try:
- p.register(dev_null.fileno(), PollConstants.POLLIN)
- except TypeError:
- # Jython: Object 'org.python.core.io.FileIO@f8f175' is not watchable
- _can_poll_device = False
- return _can_poll_device
-
- invalid_request = False
- for f, event in p.poll():
- if event & PollConstants.POLLNVAL:
- invalid_request = True
- break
- dev_null.close()
-
- _can_poll_device = not invalid_request
- return _can_poll_device
-
-def create_poll_instance():
- """
- Create an instance of select.poll, or an instance of
- PollSelectAdapter there is no poll() implementation or
- it is broken somehow.
- """
- if can_poll_device():
- return select.poll()
- return PollSelectAdapter()
-
-class _epoll_adapter:
- """
- Wraps a select.epoll instance in order to make it compatible
- with select.poll instances. This is necessary since epoll instances
- interpret timeout arguments differently. Note that the file descriptor
- that is associated with an epoll instance will close automatically when
- it is garbage collected, so it's not necessary to close it explicitly.
- """
- __slots__ = ('_epoll_obj', 'close')
-
- def __init__(self, epoll_obj):
- self._epoll_obj = epoll_obj
- self.close = epoll_obj.close
-
- def register(self, fd, *args):
- self._epoll_obj.register(fd, *args)
-
- def unregister(self, fd):
- self._epoll_obj.unregister(fd)
-
- def poll(self, *args):
- if len(args) > 1:
- raise TypeError(
- "poll expected at most 2 arguments, got " + \
- repr(1 + len(args)))
- timeout = -1
- if args:
- timeout = args[0]
- if timeout is None or timeout < 0:
- timeout = -1
- elif timeout != 0:
- timeout = timeout / 1000
-
- return self._epoll_obj.poll(timeout)
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_eventloop/
@ 2021-03-07 2:52 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2021-03-07 2:52 UTC (permalink / raw
To: gentoo-commits
commit: 4cf221840bb980305b48a329c693dd1a310d5248
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Mar 7 02:48:57 2021 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Mar 7 02:49:34 2021 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=4cf22184
Remove unused PollSelectAdapter and PollConstants classes
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_eventloop/PollConstants.py | 17 ------
lib/portage/util/_eventloop/PollSelectAdapter.py | 74 ------------------------
2 files changed, 91 deletions(-)
diff --git a/lib/portage/util/_eventloop/PollConstants.py b/lib/portage/util/_eventloop/PollConstants.py
deleted file mode 100644
index c5700d108..000000000
--- a/lib/portage/util/_eventloop/PollConstants.py
+++ /dev/null
@@ -1,17 +0,0 @@
-# Copyright 1999-2009 Gentoo Foundation
-# Distributed under the terms of the GNU General Public License v2
-
-import select
-class PollConstants:
-
- """
- Provides POLL* constants that are equivalent to those from the
- select module, for use by PollSelectAdapter.
- """
-
- names = ("POLLIN", "POLLPRI", "POLLOUT", "POLLERR", "POLLHUP", "POLLNVAL")
- v = 1
- for k in names:
- locals()[k] = getattr(select, k, v)
- v *= 2
- del k, v
diff --git a/lib/portage/util/_eventloop/PollSelectAdapter.py b/lib/portage/util/_eventloop/PollSelectAdapter.py
deleted file mode 100644
index c4637a352..000000000
--- a/lib/portage/util/_eventloop/PollSelectAdapter.py
+++ /dev/null
@@ -1,74 +0,0 @@
-# Copyright 1999-2020 Gentoo Authors
-# Distributed under the terms of the GNU General Public License v2
-
-import select
-
-from .PollConstants import PollConstants
-
-class PollSelectAdapter:
-
- """
- Use select to emulate a poll object, for
- systems that don't support poll().
- """
-
- def __init__(self):
- self._registered = {}
- self._select_args = [[], [], []]
-
- def register(self, fd, *args):
- """
- Only POLLIN is currently supported!
- """
- if len(args) > 1:
- raise TypeError(
- "register expected at most 2 arguments, got " + \
- repr(1 + len(args)))
-
- eventmask = PollConstants.POLLIN | \
- PollConstants.POLLPRI | PollConstants.POLLOUT
- if args:
- eventmask = args[0]
-
- self._registered[fd] = eventmask
- self._select_args = None
-
- def unregister(self, fd):
- self._select_args = None
- del self._registered[fd]
-
- def poll(self, *args):
- if len(args) > 1:
- raise TypeError(
- "poll expected at most 2 arguments, got " + \
- repr(1 + len(args)))
-
- timeout = None
- if args:
- timeout = args[0]
-
- select_args = self._select_args
- if select_args is None:
- select_args = [list(self._registered), [], []]
-
- if timeout is not None:
- select_args = select_args[:]
- # Translate poll() timeout args to select() timeout args:
- #
- # | units | value(s) for indefinite block
- # ---------|--------------|------------------------------
- # poll | milliseconds | omitted, negative, or None
- # ---------|--------------|------------------------------
- # select | seconds | omitted
- # ---------|--------------|------------------------------
-
- if timeout is not None and timeout < 0:
- timeout = None
- if timeout is not None:
- select_args.append(timeout / 1000)
-
- select_events = select.select(*select_args)
- poll_events = []
- for fd in select_events[0]:
- poll_events.append((fd, PollConstants.POLLIN))
- return poll_events
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_eventloop/
@ 2021-03-07 9:29 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2021-03-07 9:29 UTC (permalink / raw
To: gentoo-commits
commit: e0b6fbdf6543c19c63bbca007278d48003b807ad
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Mar 7 09:28:08 2021 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Mar 7 09:28:37 2021 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=e0b6fbdf
AsyncioEventLoop: remove obsolete supports_multiprocessing attribute
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_eventloop/asyncio_event_loop.py | 7 +------
1 file changed, 1 insertion(+), 6 deletions(-)
diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py
index b77728088..6dfac3569 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -1,4 +1,4 @@
-# Copyright 2018-2020 Gentoo Authors
+# Copyright 2018-2021 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import os
@@ -17,11 +17,6 @@ class AsyncioEventLoop(_AbstractEventLoop):
event loop and is minimally compatible with _PortageEventLoop.
"""
- # Use portage's internal event loop in subprocesses, as a workaround
- # for https://bugs.python.org/issue22087, and also
- # https://bugs.python.org/issue29703 which affects pypy3-5.10.1.
- supports_multiprocessing = False
-
def __init__(self, loop=None):
loop = loop or _real_asyncio.get_event_loop()
self._loop = loop
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_eventloop/
@ 2023-09-26 5:36 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2023-09-26 5:36 UTC (permalink / raw
To: gentoo-commits
commit: 34e4d5655360d135dbf2cd3a073d07d96e23cf74
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Sep 26 05:09:53 2023 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Sep 26 05:14:21 2023 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=34e4d565
Remove deprecated AbstractChildWatcher usage
Fixes this DeprecationWarning triggered by
lib/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py:
lib/portage/util/_eventloop/asyncio_event_loop.py:138: DeprecationWarning: 'AbstractChildWatcher' is deprecated as of Python 3.12 and will be removed in Python 3.14.
class _ChildWatcherThreadSafetyWrapper(_AbstractChildWatcher):
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_eventloop/asyncio_event_loop.py | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py
index 88933af9d2..37b5b2706c 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -1,4 +1,4 @@
-# Copyright 2018-2021 Gentoo Authors
+# Copyright 2018-2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import os
@@ -6,7 +6,6 @@ import signal
import asyncio as _real_asyncio
from asyncio.events import AbstractEventLoop as _AbstractEventLoop
-from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
import portage
@@ -135,7 +134,7 @@ class AsyncioEventLoop(_AbstractEventLoop):
pass
-class _ChildWatcherThreadSafetyWrapper(_AbstractChildWatcher):
+class _ChildWatcherThreadSafetyWrapper:
def __init__(self, loop, real_watcher):
self._loop = loop
self._real_watcher = real_watcher
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_eventloop/
@ 2023-10-05 7:14 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2023-10-05 7:14 UTC (permalink / raw
To: gentoo-commits
commit: 690ac6e78c4099e83b84ef11c0b4064b077a8ef0
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Oct 5 06:27:27 2023 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Thu Oct 5 06:57:11 2023 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=690ac6e7
Eliminate deprecated asyncio.get_child_watcher() usage
Use PidfdChildWatcher if os.pidfd_open is available and works, and
otherwise use ThreadedChildWatcher which should work in any case.
The _ChildWatcherThreadSafetyWrapper class is not needed because
both PidfdChildWatcher and ThreadedChildWatcher use the running
event loop to invoke the child handler callback, and it is safe
to assume that the current AsyncioEventLoop instance is the
running loop when it is used to obtain a child watcher instance.
Bug: https://bugs.gentoo.org/914873
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_eventloop/asyncio_event_loop.py | 51 +++++++++++------------
1 file changed, 24 insertions(+), 27 deletions(-)
diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py
index 37b5b2706c..fe941b420a 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -6,6 +6,12 @@ import signal
import asyncio as _real_asyncio
from asyncio.events import AbstractEventLoop as _AbstractEventLoop
+from asyncio.unix_events import ThreadedChildWatcher
+
+try:
+ from asyncio.unix_events import PidfdChildWatcher
+except ImportError:
+ PidfdChildWatcher = None
import portage
@@ -90,9 +96,24 @@ class AsyncioEventLoop(_AbstractEventLoop):
@return: the internal event loop's AbstractChildWatcher interface
"""
if self._child_watcher is None:
- self._child_watcher = _ChildWatcherThreadSafetyWrapper(
- self, _real_asyncio.get_child_watcher()
- )
+ pidfd_works = False
+ if PidfdChildWatcher is not None and hasattr(os, "pidfd_open"):
+ try:
+ fd = os.pidfd_open(portage.getpid())
+ except Exception:
+ pass
+ else:
+ os.close(fd)
+ pidfd_works = True
+
+ if pidfd_works:
+ watcher = PidfdChildWatcher()
+ else:
+ watcher = ThreadedChildWatcher()
+
+ watcher.attach_loop(self._loop)
+ self._child_watcher = watcher
+
return self._child_watcher
@property
@@ -132,27 +153,3 @@ class AsyncioEventLoop(_AbstractEventLoop):
except ValueError:
# This is intended to fail when not called in the main thread.
pass
-
-
-class _ChildWatcherThreadSafetyWrapper:
- def __init__(self, loop, real_watcher):
- self._loop = loop
- self._real_watcher = real_watcher
-
- def close(self):
- pass
-
- def __enter__(self):
- return self
-
- def __exit__(self, a, b, c):
- pass
-
- def _child_exit(self, pid, status, callback, *args):
- self._loop.call_soon_threadsafe(callback, pid, status, *args)
-
- def add_child_handler(self, pid, callback, *args):
- self._real_watcher.add_child_handler(pid, self._child_exit, callback, *args)
-
- def remove_child_handler(self, pid):
- return self._real_watcher.remove_child_handler(pid)
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_eventloop/
@ 2023-10-05 9:21 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2023-10-05 9:21 UTC (permalink / raw
To: gentoo-commits
commit: c98240fba21b319e27fe595d70c671c9ddfb7fd0
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Oct 5 08:20:14 2023 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Thu Oct 5 08:21:47 2023 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=c98240fb
AsyncioEventLoop: Put back _ChildWatcherThreadSafetyWrapper
In case there are multiple loops running in different threads,
use _ChildWatcherThreadSafetyWrapper. This partially reverts
commit 690ac6e78c4099e83b84ef11c0b4064b077a8ef0.
Bug: https://bugs.gentoo.org/914873
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_eventloop/asyncio_event_loop.py | 30 ++++++++++++++++++++++-
1 file changed, 29 insertions(+), 1 deletion(-)
diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py
index fe941b420a..2b21e6dfaf 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -112,7 +112,7 @@ class AsyncioEventLoop(_AbstractEventLoop):
watcher = ThreadedChildWatcher()
watcher.attach_loop(self._loop)
- self._child_watcher = watcher
+ self._child_watcher = _ChildWatcherThreadSafetyWrapper(self, watcher)
return self._child_watcher
@@ -153,3 +153,31 @@ class AsyncioEventLoop(_AbstractEventLoop):
except ValueError:
# This is intended to fail when not called in the main thread.
pass
+
+
+class _ChildWatcherThreadSafetyWrapper:
+ """
+ This class provides safety if multiple loops are running in different threads.
+ """
+
+ def __init__(self, loop, real_watcher):
+ self._loop = loop
+ self._real_watcher = real_watcher
+
+ def close(self):
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, a, b, c):
+ pass
+
+ def _child_exit(self, pid, status, callback, *args):
+ self._loop.call_soon_threadsafe(callback, pid, status, *args)
+
+ def add_child_handler(self, pid, callback, *args):
+ self._real_watcher.add_child_handler(pid, self._child_exit, callback, *args)
+
+ def remove_child_handler(self, pid):
+ return self._real_watcher.remove_child_handler(pid)
^ permalink raw reply related [flat|nested] 9+ messages in thread
end of thread, other threads:[~2023-10-05 9:21 UTC | newest]
Thread overview: 9+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2023-10-05 7:14 [gentoo-commits] proj/portage:master commit in: lib/portage/util/_eventloop/ Zac Medico
-- strict thread matches above, loose matches on Subject: below --
2023-10-05 9:21 Zac Medico
2023-09-26 5:36 Zac Medico
2021-03-07 9:29 Zac Medico
2021-03-07 2:52 Zac Medico
2021-03-06 11:04 Zac Medico
2021-01-11 7:36 Zac Medico
2020-03-02 5:03 Zac Medico
2019-04-16 2:07 Zac Medico
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox