public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/portage:master commit in: lib/portage/tests/util/, lib/portage/util/_eventloop/, ...
@ 2024-08-13 21:03 Zac Medico
  0 siblings, 0 replies; only message in thread
From: Zac Medico @ 2024-08-13 21:03 UTC (permalink / raw
  To: gentoo-commits

commit:     cb0c09d8cecbcc086786e3e2c7cdd8ffc023a48a
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Aug 11 07:50:49 2024 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Aug 11 07:50:49 2024 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=cb0c09d8

Support coroutine exitfuncs for non-main loops

Since an API consumer can cause loops to be instantiated
for non-main threads, support coroutine exitfuncs for each
loop. The included Socks5ServerAtExitThreadedTestCase calls
get_socks5_proxy from a non-main thread, and demonstrates
that coroutine exitfuncs for the resulting non-main loop
will reliably stop the socks5 proxy via atexit hook.

The _thread_weakrefs_atexit function will now make a
temporary adjustment to _thread_weakrefs.loops so that a
loop is associated with the current thread when it is
closing. Also, the _get_running_loop function will now
store weak references to all _AsyncioEventLoop instances
it creates, since each has a _coroutine_exithandlers
attribute that can be modified by atexit_register calls.

Bug: https://bugs.gentoo.org/937740
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/process.py                            | 11 ++++--
 lib/portage/tests/util/test_socks5.py             | 38 +++++++++++++++------
 lib/portage/util/_eventloop/asyncio_event_loop.py | 15 ++-------
 lib/portage/util/futures/_asyncio/__init__.py     | 41 ++++++++++++++++++++---
 4 files changed, 76 insertions(+), 29 deletions(-)

diff --git a/lib/portage/process.py b/lib/portage/process.py
index 23e2507b53..38adebda66 100644
--- a/lib/portage/process.py
+++ b/lib/portage/process.py
@@ -194,7 +194,6 @@ def spawn_fakeroot(mycommand, fakeroot_state=None, opt_name=None, **keywords):
 
 
 _exithandlers = []
-_coroutine_exithandlers = []
 
 
 def atexit_register(func, *args, **kargs):
@@ -205,7 +204,9 @@ def atexit_register(func, *args, **kargs):
     # The internal asyncio wrapper module would trigger a circular import
     # if used here.
     if _asyncio.iscoroutinefunction(func):
-        _coroutine_exithandlers.append((func, args, kargs))
+        # Add this coroutine function to the exit handlers for the loop
+        # which is associated with the current thread.
+        global_event_loop()._coroutine_exithandlers.append((func, args, kargs))
     else:
         _exithandlers.append((func, args, kargs))
 
@@ -238,13 +239,17 @@ async def run_coroutine_exitfuncs():
     """
     This is the same as run_exitfuncs but it uses asyncio.iscoroutinefunction
     to check which functions to run. It is called by the AsyncioEventLoop
-    _close_main method just before the loop is closed.
+    _close method just before the loop is closed.
 
     If the loop is explicitly closed before exit, then that will cause
     run_coroutine_exitfuncs to run before run_exitfuncs. Otherwise, a
     run_exitfuncs hook will close it, causing run_coroutine_exitfuncs to be
     called via run_exitfuncs.
     """
+    # The _thread_weakrefs_atexit function makes an adjustment to ensure
+    # that global_event_loop() returns the correct loop when it is closing,
+    # regardless of which thread the loop was initially associated with.
+    _coroutine_exithandlers = global_event_loop()._coroutine_exithandlers
     tasks = []
     while _coroutine_exithandlers:
         func, targs, kargs = _coroutine_exithandlers.pop()

diff --git a/lib/portage/tests/util/test_socks5.py b/lib/portage/tests/util/test_socks5.py
index 35f919d970..078e3b1a23 100644
--- a/lib/portage/tests/util/test_socks5.py
+++ b/lib/portage/tests/util/test_socks5.py
@@ -194,17 +194,17 @@ class Socks5ServerTestCase(TestCase):
         asyncio.run(self._test_socks5_proxy())
 
     async def _test_socks5_proxy(self):
-        loop = asyncio.get_running_loop()
+        loop = global_event_loop()
 
         host = "127.0.0.1"
         content = b"Hello World!"
         path = "/index.html"
         proxy = None
         tempdir = tempfile.mkdtemp()
-        previous_exithandlers = portage.process._coroutine_exithandlers
+        previous_exithandlers = loop._coroutine_exithandlers
 
         try:
-            portage.process._coroutine_exithandlers = []
+            loop._coroutine_exithandlers = []
             with AsyncHTTPServer(host, {path: content}, loop) as server:
                 settings = {
                     "PORTAGE_TMPDIR": tempdir,
@@ -227,11 +227,11 @@ class Socks5ServerTestCase(TestCase):
         finally:
             try:
                 # Also run_coroutine_exitfuncs to test atexit hook cleanup.
-                self.assertNotEqual(portage.process._coroutine_exithandlers, [])
+                self.assertNotEqual(loop._coroutine_exithandlers, [])
                 await portage.process.run_coroutine_exitfuncs()
-                self.assertEqual(portage.process._coroutine_exithandlers, [])
+                self.assertEqual(loop._coroutine_exithandlers, [])
             finally:
-                portage.process._coroutine_exithandlers = previous_exithandlers
+                loop._coroutine_exithandlers = previous_exithandlers
                 shutil.rmtree(tempdir)
 
 
@@ -284,6 +284,8 @@ class Socks5ServerAtExitTestCase(TestCase):
     so this test uses python -c to ensure that atexit hooks will work.
     """
 
+    _threaded = False
+
     def testSocks5ServerAtExit(self):
         tempdir = tempfile.mkdtemp()
         try:
@@ -295,24 +297,36 @@ class Socks5ServerAtExitTestCase(TestCase):
                     "-c",
                     """
 import sys
+import threading
 
 from portage.const import PORTAGE_BIN_PATH
 from portage.util import socks5
 from portage.util._eventloop.global_event_loop import global_event_loop
 
 tempdir = sys.argv[0]
-loop = global_event_loop()
+threaded = bool(sys.argv[1])
 
 settings = {
     "PORTAGE_TMPDIR": tempdir,
     "PORTAGE_BIN_PATH": PORTAGE_BIN_PATH,
 }
 
-socks5.get_socks5_proxy(settings)
-loop.run_until_complete(socks5.proxy.ready())
-print(socks5.proxy._proc.pid, flush=True)
+def main():
+    loop = global_event_loop()
+    socks5.get_socks5_proxy(settings)
+    loop.run_until_complete(socks5.proxy.ready())
+    print(socks5.proxy._proc.pid, flush=True)
+
+if __name__ == "__main__":
+    if threaded:
+        t = threading.Thread(target=main)
+        t.start()
+        t.join()
+    else:
+        main()
 """,
                     tempdir,
+                    str(self._threaded),
                 ],
                 env=env,
             )
@@ -323,3 +337,7 @@ print(socks5.proxy._proc.pid, flush=True)
                 os.kill(pid, 0)
         finally:
             shutil.rmtree(tempdir)
+
+
+class Socks5ServerAtExitThreadedTestCase(Socks5ServerAtExitTestCase):
+    _threaded = True

diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py
index 821cc7f102..c69e5c2f01 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -3,7 +3,6 @@
 
 import os
 import signal
-import threading
 
 import asyncio as _real_asyncio
 from asyncio.events import AbstractEventLoop as _AbstractEventLoop
@@ -54,8 +53,7 @@ class AsyncioEventLoop(_AbstractEventLoop):
         self._child_watcher = None
         # Used to drop recursive calls to _close.
         self._closing = False
-        # Initialized in _run_until_complete.
-        self._is_main = None
+        self._coroutine_exithandlers = []
 
         if portage._internal_caller:
             loop.set_exception_handler(self._internal_caller_exception_handler)
@@ -68,15 +66,11 @@ class AsyncioEventLoop(_AbstractEventLoop):
         """
         if not (self._closing or self.is_closed()):
             self._closing = True
-            if self._is_main:
-                self.run_until_complete(self._close_main())
+            if self._coroutine_exithandlers:
+                self.run_until_complete(portage.process.run_coroutine_exitfuncs())
             self._loop.close()
             self._closing = False
 
-    async def _close_main(self):
-        await portage.process.run_coroutine_exitfuncs()
-        portage.process.run_exitfuncs()
-
     @staticmethod
     def _internal_caller_exception_handler(loop, context):
         """
@@ -157,9 +151,6 @@ class AsyncioEventLoop(_AbstractEventLoop):
         In order to avoid potential interference with API consumers, this
         implementation is only used when portage._internal_caller is True.
         """
-        if self._is_main is None:
-            self._is_main = threading.current_thread() is threading.main_thread()
-
         if not portage._internal_caller:
             return self._loop.run_until_complete(future)
 

diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py
index 8942bcb67e..8805e35756 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -339,15 +339,30 @@ def _get_running_loop():
                 elif _loop is None:
                     return loop if loop.is_running() else None
 
-    # If _loop it not None here it means it was probably a temporary
-    # loop created by asyncio.run, so we don't try to cache it, and
-    # just return a temporary wrapper.
-    return None if _loop is None else _AsyncioEventLoop(loop=_loop)
+        if _loop is None:
+            return None
+
+        # If _loop it not None here it means it was probably a temporary
+        # loop created by asyncio.run. Still keep a weak reference in case
+        # we need to lookup this _AsyncioEventLoop instance later to add
+        # _coroutine_exithandlers in the atexit_register function.
+        if _thread_weakrefs.pid != portage.getpid():
+            _thread_weakrefs.pid = portage.getpid()
+            _thread_weakrefs.mainloop = None
+            _thread_weakrefs.loops = weakref.WeakValueDictionary()
+
+        loop = _thread_weakrefs.loops[threading.get_ident()] = _AsyncioEventLoop(
+            loop=_loop
+        )
+
+        return loop
 
 
 def _thread_weakrefs_atexit():
     while True:
         loop = None
+        thread_key = None
+        restore_loop = None
         with _thread_weakrefs.lock:
             if _thread_weakrefs.pid != portage.getpid():
                 return
@@ -356,11 +371,29 @@ def _thread_weakrefs_atexit():
                 thread_key, loop = _thread_weakrefs.loops.popitem()
             except KeyError:
                 return
+            else:
+                # Temporarily associate it as the loop for the current thread so
+                # that it can be looked up during run_coroutine_exitfuncs calls.
+                # Also create a reference to a different loop if one is associated
+                # with this thread so we can restore it later.
+                try:
+                    restore_loop = _thread_weakrefs.loops[threading.get_ident()]
+                except KeyError:
+                    pass
+                _thread_weakrefs.loops[threading.get_ident()] = loop
 
         # Release the lock while closing the loop, since it may call
         # run_coroutine_exitfuncs interally.
         if loop is not None:
             loop.close()
+            with _thread_weakrefs.lock:
+                try:
+                    if _thread_weakrefs.loops[threading.get_ident()] is loop:
+                        del _thread_weakrefs.loops[threading.get_ident()]
+                except KeyError:
+                    pass
+                if restore_loop is not None:
+                    _thread_weakrefs.loops[threading.get_ident()] = restore_loop
 
 
 _thread_weakrefs = types.SimpleNamespace(


^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2024-08-13 21:03 UTC | newest]

Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2024-08-13 21:03 [gentoo-commits] proj/portage:master commit in: lib/portage/tests/util/, lib/portage/util/_eventloop/, Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox