From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from lists.gentoo.org (pigeon.gentoo.org [208.92.234.80]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by finch.gentoo.org (Postfix) with ESMTPS id E27D315815E for ; Sat, 3 Feb 2024 02:03:34 +0000 (UTC) Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id 079CAE29D8; Sat, 3 Feb 2024 02:03:34 +0000 (UTC) Received: from smtp.gentoo.org (smtp.gentoo.org [IPv6:2001:470:ea4a:1:5054:ff:fec7:86e4]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (No client certificate requested) by pigeon.gentoo.org (Postfix) with ESMTPS id D5ADEE29D8 for ; Sat, 3 Feb 2024 02:03:33 +0000 (UTC) Received: from oystercatcher.gentoo.org (oystercatcher.gentoo.org [148.251.78.52]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (4096 bits) server-digest SHA256) (No client certificate requested) by smtp.gentoo.org (Postfix) with ESMTPS id F0C1534309D for ; Sat, 3 Feb 2024 02:03:31 +0000 (UTC) Received: from localhost.localdomain (localhost [IPv6:::1]) by oystercatcher.gentoo.org (Postfix) with ESMTP id 3D217358 for ; Sat, 3 Feb 2024 02:03:30 +0000 (UTC) From: "Zac Medico" To: gentoo-commits@lists.gentoo.org Content-Transfer-Encoding: 8bit Content-type: text/plain; charset=UTF-8 Reply-To: gentoo-dev@lists.gentoo.org, "Zac Medico" Message-ID: <1706889678.055c66ec9482064aaaf51bfb6b01e260ea27808e.zmedico@gentoo> Subject: [gentoo-commits] proj/portage:master commit in: lib/portage/, lib/portage/package/ebuild/, lib/_emerge/, ... X-VCS-Repository: proj/portage X-VCS-Files: lib/_emerge/EbuildMetadataPhase.py lib/_emerge/SpawnProcess.py lib/_emerge/SubProcess.py lib/portage/package/ebuild/doebuild.py lib/portage/process.py lib/portage/util/_async/ForkProcess.py lib/portage/util/_async/PopenProcess.py X-VCS-Directories: lib/_emerge/ lib/portage/package/ebuild/ lib/portage/ lib/portage/util/_async/ X-VCS-Committer: zmedico X-VCS-Committer-Name: Zac Medico X-VCS-Revision: 055c66ec9482064aaaf51bfb6b01e260ea27808e X-VCS-Branch: master Date: Sat, 3 Feb 2024 02:03:30 +0000 (UTC) Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-commits@lists.gentoo.org X-Auto-Response-Suppress: DR, RN, NRN, OOF, AutoReply X-Archives-Salt: a5f1a436-f1b2-44e6-a014-8a49dc090d13 X-Archives-Hash: 9a93421e5312e8986c8afc7f1391f31e commit: 055c66ec9482064aaaf51bfb6b01e260ea27808e Author: Zac Medico gentoo org> AuthorDate: Fri Feb 2 16:01:18 2024 +0000 Commit: Zac Medico gentoo org> CommitDate: Fri Feb 2 16:01:18 2024 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=055c66ec SpawnProcess: Use spawn returnproc parameter Migrate SpawnProcess to use the spawn returnproc parameter, and make adaptations to descendent classes as needed. Introduce a portage.process.MultiprocessingProcess class for ForkProcess to wrap multiprocessing.Process instances, needed because ForkProcess inherits from SpawnProcess. Use portage.process.Process to wrap the pid in EbuildMetadataPhase, so that returnproc support in the doebuild function can be reserved for a later commit. Bug: https://bugs.gentoo.org/916566 Signed-off-by: Zac Medico gentoo.org> lib/_emerge/EbuildMetadataPhase.py | 4 +- lib/_emerge/SpawnProcess.py | 16 ++--- lib/_emerge/SubProcess.py | 25 +++---- lib/portage/package/ebuild/doebuild.py | 4 +- lib/portage/process.py | 120 ++++++++++++++++++++++++++++---- lib/portage/util/_async/ForkProcess.py | 87 +++-------------------- lib/portage/util/_async/PopenProcess.py | 5 +- 7 files changed, 143 insertions(+), 118 deletions(-) diff --git a/lib/_emerge/EbuildMetadataPhase.py b/lib/_emerge/EbuildMetadataPhase.py index 8905a058fc..a7c9650d74 100644 --- a/lib/_emerge/EbuildMetadataPhase.py +++ b/lib/_emerge/EbuildMetadataPhase.py @@ -1,4 +1,4 @@ -# Copyright 1999-2020 Gentoo Authors +# Copyright 1999-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 from _emerge.SubProcess import SubProcess @@ -137,7 +137,7 @@ class EbuildMetadataPhase(SubProcess): self._async_wait() return - self.pid = retval[0] + self._proc = portage.process.Process(retval[0]) def _output_handler(self): while True: diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py index 40740df9aa..7f4a23892b 100644 --- a/lib/_emerge/SpawnProcess.py +++ b/lib/_emerge/SpawnProcess.py @@ -1,4 +1,4 @@ -# Copyright 2008-2023 Gentoo Authors +# Copyright 2008-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import functools @@ -123,24 +123,16 @@ class SpawnProcess(SubProcess): kwargs[k] = v kwargs["fd_pipes"] = fd_pipes - kwargs["returnpid"] = True + kwargs["returnproc"] = True kwargs.pop("logfile", None) - retval = self._spawn(self.args, **kwargs) + self._proc = self._spawn(self.args, **kwargs) if slave_fd is not None: os.close(slave_fd) if null_input is not None: os.close(null_input) - if isinstance(retval, int): - # spawn failed - self.returncode = retval - self._async_wait() - return - - self.pid = retval[0] - if not fd_pipes: self._registered = True self._async_waitpid() @@ -232,7 +224,7 @@ class SpawnProcess(SubProcess): got_pty, master_fd, slave_fd = _create_pty_or_pipe(copy_term_size=stdout_pipe) return (master_fd, slave_fd) - def _spawn(self, args, **kwargs): + def _spawn(self, args: list[str], **kwargs) -> portage.process.Process: spawn_func = portage.process.spawn if self._selinux_type is not None: diff --git a/lib/_emerge/SubProcess.py b/lib/_emerge/SubProcess.py index b734591d11..029bbc3f44 100644 --- a/lib/_emerge/SubProcess.py +++ b/lib/_emerge/SubProcess.py @@ -1,4 +1,4 @@ -# Copyright 1999-2020 Gentoo Authors +# Copyright 1999-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import logging @@ -12,12 +12,16 @@ import errno class SubProcess(AbstractPollTask): - __slots__ = ("pid",) + ("_dummy_pipe_fd", "_files", "_waitpid_id") + __slots__ = ("_dummy_pipe_fd", "_files", "_proc", "_waitpid_id") # This is how much time we allow for waitpid to succeed after # we've sent a kill signal to our subprocess. _cancel_timeout = 1 # seconds + @property + def pid(self): + return self._proc.pid + def _poll(self): # Simply rely on _async_waitpid_cb to set the returncode. return self.returncode @@ -58,15 +62,11 @@ class SubProcess(AbstractPollTask): if self.returncode is not None: self._async_wait() elif self._waitpid_id is None: - self._waitpid_id = self.pid - self.scheduler._asyncio_child_watcher.add_child_handler( - self.pid, self._async_waitpid_cb - ) - - def _async_waitpid_cb(self, pid, returncode): - if pid != self.pid: - raise AssertionError(f"expected pid {self.pid}, got {pid}") - self.returncode = returncode + self._waitpid_id = asyncio.ensure_future(self._proc.wait(), self.scheduler) + self._waitpid_id.add_done_callback(self._async_waitpid_cb) + + def _async_waitpid_cb(self, future): + self.returncode = future.result() self._async_wait() def _orphan_process_warn(self): @@ -80,7 +80,8 @@ class SubProcess(AbstractPollTask): self._registered = False if self._waitpid_id is not None: - self.scheduler._asyncio_child_watcher.remove_child_handler(self._waitpid_id) + if not self._waitpid_id.done(): + self._waitpid_id.cancel() self._waitpid_id = None if self._files is not None: diff --git a/lib/portage/package/ebuild/doebuild.py b/lib/portage/package/ebuild/doebuild.py index ed604415da..e10b884e08 100644 --- a/lib/portage/package/ebuild/doebuild.py +++ b/lib/portage/package/ebuild/doebuild.py @@ -1,4 +1,4 @@ -# Copyright 2010-2023 Gentoo Authors +# Copyright 2010-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 __all__ = ["doebuild", "doebuild_environment", "spawn", "spawnebuild"] @@ -2094,7 +2094,7 @@ def spawn( mysettings.configdict["env"]["LOGNAME"] = logname try: - if keywords.get("returnpid"): + if keywords.get("returnpid") or keywords.get("returnproc"): return spawn_func(mystring, env=mysettings.environ(), **keywords) proc = EbuildSpawnProcess( diff --git a/lib/portage/process.py b/lib/portage/process.py index 6ec52efc4a..01426179d7 100644 --- a/lib/portage/process.py +++ b/lib/portage/process.py @@ -29,6 +29,7 @@ import portage portage.proxy.lazyimport.lazyimport( globals(), "portage.util._eventloop.global_event_loop:global_event_loop", + "portage.util.futures:asyncio", "portage.util:dump_traceback,writemsg,writemsg_level", ) @@ -279,7 +280,21 @@ def calc_env_stats(env) -> EnvStats: env_too_large_warnings = 0 -class Process: +class AbstractProcess: + def send_signal(self, sig): + """Send a signal to the process.""" + if self.returncode is not None: + # Skip signalling a process that we know has already died. + return + + try: + os.kill(self.pid, sig) + except ProcessLookupError: + # Suppress the race condition error; bpo-40550. + pass + + +class Process(AbstractProcess): """ An object that wraps OS processes created by spawn. In the future, spawn will return objects of a different type @@ -289,7 +304,7 @@ class Process: the process lifecycle and need to persist until it exits. """ - def __init__(self, pid): + def __init__(self, pid: int): self.pid = pid self.returncode = None self._exit_waiters = [] @@ -323,25 +338,106 @@ class Process: waiter.set_result(returncode) self._exit_waiters = None - def send_signal(self, sig): - """Send a signal to the process.""" + def terminate(self): + """Terminate the process with SIGTERM""" + self.send_signal(signal.SIGTERM) + + def kill(self): + """Kill the process with SIGKILL""" + self.send_signal(signal.SIGKILL) + + +class MultiprocessingProcess(AbstractProcess): + """ + An object that wraps OS processes created by multiprocessing.Process. + """ + + # Number of seconds between poll attempts for process exit status + # (after the sentinel has become ready). + _proc_join_interval = 0.1 + + def __init__(self, proc: multiprocessing.Process): + self._proc = proc + self.pid = proc.pid + self.returncode = None + self._exit_waiters = [] + + def __repr__(self): + return f"<{self.__class__.__name__} {self.pid}>" + + async def wait(self): + """ + Wait for the child process to terminate. + + Set and return the returncode attribute. + """ if self.returncode is not None: - # Skip signalling a process that we know has already died. - return + return self.returncode + loop = global_event_loop() + if not self._exit_waiters: + asyncio.ensure_future(self._proc_join(), loop=loop).add_done_callback( + self._proc_join_done + ) + waiter = loop.create_future() + self._exit_waiters.append(waiter) + return await waiter + + async def _proc_join(self): + loop = global_event_loop() + sentinel_reader = loop.create_future() + proc = self._proc + loop.add_reader( + proc.sentinel, + lambda: sentinel_reader.done() or sentinel_reader.set_result(None), + ) try: - os.kill(self.pid, sig) - except ProcessLookupError: - # Suppress the race condition error; bpo-40550. - pass + await sentinel_reader + finally: + # If multiprocessing.Process supports the close method, then + # access to proc.sentinel will raise ValueError if the + # sentinel has been closed. In this case it's not safe to call + # remove_reader, since the file descriptor may have been closed + # and then reallocated to a concurrent coroutine. When the + # close method is not supported, proc.sentinel remains open + # until proc's finalizer is called. + try: + loop.remove_reader(proc.sentinel) + except ValueError: + pass + + # Now that proc.sentinel is ready, poll until process exit + # status has become available. + while True: + proc.join(0) + if proc.exitcode is not None: + break + await asyncio.sleep(self._proc_join_interval, loop=loop) + + def _proc_join_done(self, future): + # The join task should never be cancelled, so let it raise + # asyncio.CancelledError here if that somehow happens. + future.result() + + self.returncode = self._proc.exitcode + if hasattr(self._proc, "close"): + self._proc.close() + self._proc = None + + for waiter in self._exit_waiters: + if not waiter.cancelled(): + waiter.set_result(self.returncode) + self._exit_waiters = None def terminate(self): """Terminate the process with SIGTERM""" - self.send_signal(signal.SIGTERM) + if self._proc is not None: + self._proc.terminate() def kill(self): """Kill the process with SIGKILL""" - self.send_signal(signal.SIGKILL) + if self._proc is not None: + self._proc.kill() def spawn( diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py index 780545be0e..711bd2a7ba 100644 --- a/lib/portage/util/_async/ForkProcess.py +++ b/lib/portage/util/_async/ForkProcess.py @@ -1,13 +1,14 @@ -# Copyright 2012-2023 Gentoo Authors +# Copyright 2012-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import fcntl -import functools import multiprocessing import warnings import signal import sys +from typing import Optional + import portage from portage import os from portage.cache.mappings import slot_dict_class @@ -23,17 +24,11 @@ class ForkProcess(SpawnProcess): "kwargs", "target", "_child_connection", - "_proc", - "_proc_join_task", ) _file_names = ("connection", "slave_fd") _files_dict = slot_dict_class(_file_names, prefix="") - # Number of seconds between poll attempts for process exit status - # (after the sentinel has become ready). - _proc_join_interval = 0.1 - _HAVE_SEND_HANDLE = getattr(multiprocessing.reduction, "HAVE_SEND_HANDLE", False) def _start(self): @@ -58,9 +53,8 @@ class ForkProcess(SpawnProcess): duplex=self._HAVE_SEND_HANDLE ) - retval = self._spawn(self.args, fd_pipes=self.fd_pipes) + self._proc = self._spawn(self.args, fd_pipes=self.fd_pipes) - self.pid = retval[0] self._registered = True if self._child_connection is None: @@ -133,7 +127,9 @@ class ForkProcess(SpawnProcess): await super()._main(build_logger, pipe_logger, loop=loop) - def _spawn(self, args, fd_pipes=None, **kwargs): + def _spawn( + self, args: list[str], fd_pipes: Optional[dict[int, int]] = None, **kwargs + ) -> portage.process.MultiprocessingProcess: """ Override SpawnProcess._spawn to fork a subprocess that calls self._run(). This uses multiprocessing.Process in order to leverage @@ -175,7 +171,7 @@ class ForkProcess(SpawnProcess): # Handle fd_pipes in _main instead. fd_pipes = None - self._proc = multiprocessing.Process( + proc = multiprocessing.Process( target=self._bootstrap, args=( self._child_connection, @@ -186,19 +182,12 @@ class ForkProcess(SpawnProcess): kwargs, ), ) - self._proc.start() + proc.start() finally: if stdin_dup is not None: os.close(stdin_dup) - self._proc_join_task = asyncio.ensure_future( - self._proc_join(self._proc, loop=self.scheduler), loop=self.scheduler - ) - self._proc_join_task.add_done_callback( - functools.partial(self._proc_join_done, self._proc) - ) - - return [self._proc.pid] + return portage.process.MultiprocessingProcess(proc) def _cancel(self): if self._proc is None: @@ -206,64 +195,10 @@ class ForkProcess(SpawnProcess): else: self._proc.terminate() - def _async_wait(self): - if self._proc_join_task is None: - super()._async_wait() - - def _async_waitpid(self): - if self._proc_join_task is None: - super()._async_waitpid() - - async def _proc_join(self, proc, loop=None): - sentinel_reader = self.scheduler.create_future() - self.scheduler.add_reader( - proc.sentinel, - lambda: sentinel_reader.done() or sentinel_reader.set_result(None), - ) - try: - await sentinel_reader - finally: - # If multiprocessing.Process supports the close method, then - # access to proc.sentinel will raise ValueError if the - # sentinel has been closed. In this case it's not safe to call - # remove_reader, since the file descriptor may have been closed - # and then reallocated to a concurrent coroutine. When the - # close method is not supported, proc.sentinel remains open - # until proc's finalizer is called. - try: - self.scheduler.remove_reader(proc.sentinel) - except ValueError: - pass - - # Now that proc.sentinel is ready, poll until process exit - # status has become available. - while True: - proc.join(0) - if proc.exitcode is not None: - break - await asyncio.sleep(self._proc_join_interval, loop=loop) - - def _proc_join_done(self, proc, future): - future.cancelled() or future.result() - self._was_cancelled() - if self.returncode is None: - self.returncode = proc.exitcode - - self._proc = None - if hasattr(proc, "close"): - proc.close() - self._proc_join_task = None - self._async_wait() - def _unregister(self): super()._unregister() if self._proc is not None: - if self._proc.is_alive(): - self._proc.terminate() - self._proc = None - if self._proc_join_task is not None: - self._proc_join_task.cancel() - self._proc_join_task = None + self._proc.terminate() @staticmethod def _bootstrap(child_connection, have_send_handle, fd_pipes, target, args, kwargs): diff --git a/lib/portage/util/_async/PopenProcess.py b/lib/portage/util/_async/PopenProcess.py index c9bca1c524..a0e532e278 100644 --- a/lib/portage/util/_async/PopenProcess.py +++ b/lib/portage/util/_async/PopenProcess.py @@ -1,6 +1,7 @@ -# Copyright 2012-2021 Gentoo Authors +# Copyright 2012-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +import portage from _emerge.SubProcess import SubProcess @@ -11,7 +12,7 @@ class PopenProcess(SubProcess): ) def _start(self): - self.pid = self.proc.pid + self._proc = portage.process.Process(self.proc.pid) self._registered = True if self.pipe_reader is None: