* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2020-02-29 7:51 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2020-02-29 7:51 UTC (permalink / raw
To: gentoo-commits
commit: 7e7e22d8129b505787c1006501659262081c4aec
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat Feb 29 07:32:21 2020 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sat Feb 29 07:47:55 2020 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=7e7e22d8
FileDigester: eliminate _pipe_logger_exit with _async_waitpid
Eliminate the _pipe_logger_exit method by overriding the
_async_waitpid method.
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/FileDigester.py | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/lib/portage/util/_async/FileDigester.py b/lib/portage/util/_async/FileDigester.py
index 164dbdc23..d156a2015 100644
--- a/lib/portage/util/_async/FileDigester.py
+++ b/lib/portage/util/_async/FileDigester.py
@@ -57,10 +57,11 @@ class FileDigester(ForkProcess):
self.digests = digests
- def _pipe_logger_exit(self, pipe_logger):
+ def _async_waitpid(self):
# Ignore this event, since we want to ensure that we
# exit only after _digest_pipe_reader has reached EOF.
- self._pipe_logger = None
+ if self._digest_pipe_reader is None:
+ ForkProcess._async_waitpid(self)
def _digest_pipe_reader_exit(self, pipe_reader):
self._parse_digests(pipe_reader.getvalue())
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2020-02-29 7:51 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2020-02-29 7:51 UTC (permalink / raw
To: gentoo-commits
commit: d343bd2eebbb771cb6e43d4e3add85614db6b2a6
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat Feb 29 07:32:21 2020 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sat Feb 29 07:47:39 2020 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=d343bd2e
AsyncFunction: eliminate _pipe_logger_exit with _async_waitpid
Eliminate the _pipe_logger_exit method by overriding the
_async_waitpid method.
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/AsyncFunction.py | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/lib/portage/util/_async/AsyncFunction.py b/lib/portage/util/_async/AsyncFunction.py
index 9cfeeeada..bd7e84ea3 100644
--- a/lib/portage/util/_async/AsyncFunction.py
+++ b/lib/portage/util/_async/AsyncFunction.py
@@ -49,10 +49,11 @@ class AsyncFunction(ForkProcess):
return os.EX_OK
- def _pipe_logger_exit(self, pipe_logger):
+ def _async_waitpid(self):
# Ignore this event, since we want to ensure that we exit
# only after _async_func_reader_exit has reached EOF.
- self._pipe_logger = None
+ if self._async_func_reader is None:
+ ForkProcess._async_waitpid(self)
def _async_func_reader_exit(self, pipe_reader):
try:
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2020-03-01 1:22 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2020-03-01 1:22 UTC (permalink / raw
To: gentoo-commits
commit: 27a6ee3d09ff79b6addb2696a10fcc3f46e5ca91
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Mar 1 01:14:32 2020 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Mar 1 01:15:56 2020 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=27a6ee3d
AsyncScheduler: cancel task after _task_coroutine CancelledError
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/AsyncScheduler.py | 1 +
1 file changed, 1 insertion(+)
diff --git a/lib/portage/util/_async/AsyncScheduler.py b/lib/portage/util/_async/AsyncScheduler.py
index b9070061a..c31bda5f2 100644
--- a/lib/portage/util/_async/AsyncScheduler.py
+++ b/lib/portage/util/_async/AsyncScheduler.py
@@ -86,6 +86,7 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
try:
future.result()
except asyncio.CancelledError:
+ task.cancel()
self.cancel()
self._task_exit(task)
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2020-04-08 5:56 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2020-04-08 5:56 UTC (permalink / raw
To: gentoo-commits
commit: 4122a1e12884288e1d578a0cc8cfb400f00b8199
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Apr 8 04:51:23 2020 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Apr 8 05:29:46 2020 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=4122a1e1
Revert "AsyncScheduler: cancel task after _task_coroutine CancelledError"
This reverts commit 27a6ee3d09ff79b6addb2696a10fcc3f46e5ca91.
Bug: https://bugs.gentoo.org/716636
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/AsyncScheduler.py | 1 -
1 file changed, 1 deletion(-)
diff --git a/lib/portage/util/_async/AsyncScheduler.py b/lib/portage/util/_async/AsyncScheduler.py
index c31bda5f2..b9070061a 100644
--- a/lib/portage/util/_async/AsyncScheduler.py
+++ b/lib/portage/util/_async/AsyncScheduler.py
@@ -86,7 +86,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
try:
future.result()
except asyncio.CancelledError:
- task.cancel()
self.cancel()
self._task_exit(task)
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2020-06-14 0:02 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2020-06-14 0:02 UTC (permalink / raw
To: gentoo-commits
commit: 54cdf7e6cf9ec2030283ab23d659e10562267c6b
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat Jun 13 23:59:43 2020 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Jun 14 00:00:16 2020 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=54cdf7e6
PipeLogger: fix FEATURES=compress-build-logs (bug 709746)
For FEATURES=compress-build-logs, use log_file.write since data written
directly to the file descriptor bypasses compression.
Fixes: 3e46825a0470 ("PipeLogger: non-blocking write to pipe (bug 709746)")
Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/PipeLogger.py | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index 83669e05e..1776cc860 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -131,6 +131,13 @@ class PipeLogger(AbstractPollTask):
fcntl.F_GETFL) ^ os.O_NONBLOCK)
if log_file is not None:
+ if isinstance(log_file, gzip.GzipFile):
+ # Use log_file.write since data written directly
+ # to the file descriptor bypasses compression.
+ log_file.write(buf)
+ log_file.flush()
+ continue
+
write_buf = buf
while write_buf:
try:
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2020-06-24 4:36 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2020-06-24 4:36 UTC (permalink / raw
To: gentoo-commits
commit: 58cb50d4c9d318d164ed991cd4b60066c8646205
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Jun 24 04:29:46 2020 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Jun 24 04:32:26 2020 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=58cb50d4
PipeLogger._unregister: sanity check for closed file or loop
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/PipeLogger.py | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index 4271c8ee2..cc746bf52 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -162,15 +162,15 @@ class PipeLogger(AbstractPollTask):
def _unregister(self):
if self.input_fd is not None:
if isinstance(self.input_fd, int):
- self.scheduler.remove_reader(self.input_fd)
os.close(self.input_fd)
- else:
+ elif not self.input_fd.closed:
self.scheduler.remove_reader(self.input_fd.fileno())
self.input_fd.close()
self.input_fd = None
if self._io_loop_task is not None:
- self._io_loop_task.done() or self._io_loop_task.cancel()
+ if not self.scheduler.is_closed():
+ self._io_loop_task.done() or self._io_loop_task.cancel()
self._io_loop_task = None
if self.stdout_fd is not None:
@@ -178,8 +178,9 @@ class PipeLogger(AbstractPollTask):
self.stdout_fd = None
if self._log_file is not None:
- self.scheduler.remove_writer(self._log_file.fileno())
- self._log_file.close()
+ if not self._log_file.closed:
+ self.scheduler.remove_writer(self._log_file.fileno())
+ self._log_file.close()
self._log_file = None
if self._log_file_real is not None:
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2020-08-03 23:28 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2020-08-03 23:28 UTC (permalink / raw
To: gentoo-commits
commit: 34d5463dde34bbc5de9f818eb5214eea23d19b9e
Author: Aaron Bauman <bman <AT> gentoo <DOT> org>
AuthorDate: Mon Aug 3 22:43:03 2020 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Aug 3 23:28:00 2020 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=34d5463d
lib/portage/util/_async/PipeLogger.py: fix whitespace
Signed-off-by: Aaron Bauman <bman <AT> gentoo.org>
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/PipeLogger.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index 060483f0b..2bbdd3ddb 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -66,7 +66,7 @@ class PipeLogger(AbstractPollTask):
def _io_loop(self, input_file):
background = self.background
stdout_fd = self.stdout_fd
- log_file = self._log_file
+ log_file = self._log_file
fd = input_file.fileno()
while True:
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2020-08-09 0:46 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2020-08-09 0:46 UTC (permalink / raw
To: gentoo-commits
commit: 7b5bbf0c24eeb3fb64c4797905595401b8aa2731
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Jul 29 04:30:06 2020 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Aug 9 00:23:47 2020 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=7b5bbf0c
BuildLogger: fix _cancel to cleanup PipeLogger quickly
Cleanup PipeLogger as quickly as possible, in order to prevent
access to unclosed logs.
Bug: https://bugs.gentoo.org/711174
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/BuildLogger.py | 23 +++++++++++++++++------
1 file changed, 17 insertions(+), 6 deletions(-)
diff --git a/lib/portage/util/_async/BuildLogger.py b/lib/portage/util/_async/BuildLogger.py
index 20362cf6b..f25f70d5b 100644
--- a/lib/portage/util/_async/BuildLogger.py
+++ b/lib/portage/util/_async/BuildLogger.py
@@ -1,6 +1,7 @@
# Copyright 2020 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
+import functools
import subprocess
from _emerge.AsynchronousTask import AsynchronousTask
@@ -23,7 +24,7 @@ class BuildLogger(AsynchronousTask):
subprocess stdout and stderr streams).
"""
- __slots__ = ('env', 'log_path', 'log_filter_file', '_main_task', '_stdin')
+ __slots__ = ('env', 'log_path', 'log_filter_file', '_main_task', '_main_task_cancel', '_stdin')
@property
def stdin(self):
@@ -76,6 +77,7 @@ class BuildLogger(AsynchronousTask):
log_file_path=self.log_path)
pipe_logger.start()
+ self._main_task_cancel = functools.partial(self._main_cancel, filter_proc, pipe_logger)
self._main_task = asyncio.ensure_future(self._main(filter_proc, pipe_logger), loop=self.scheduler)
self._main_task.add_done_callback(self._main_exit)
@@ -87,19 +89,28 @@ class BuildLogger(AsynchronousTask):
if filter_proc is not None and filter_proc.poll() is None:
yield filter_proc.async_wait()
except asyncio.CancelledError:
- if pipe_logger.poll() is None:
- pipe_logger.cancel()
- if filter_proc is not None and filter_proc.poll() is None:
- filter_proc.cancel()
+ self._main_cancel(filter_proc, pipe_logger)
raise
+ def _main_cancel(self, filter_proc, pipe_logger):
+ if pipe_logger.poll() is None:
+ pipe_logger.cancel()
+ if filter_proc is not None and filter_proc.poll() is None:
+ filter_proc.cancel()
+
def _cancel(self):
if self._main_task is not None:
- self._main_task.done() or self._main_task.cancel()
+ if not self._main_task.done():
+ if self._main_task_cancel is not None:
+ self._main_task_cancel()
+ self._main_task_cancel = None
+ self._main_task.cancel()
if self._stdin is not None and not self._stdin.closed:
self._stdin.close()
def _main_exit(self, main_task):
+ self._main_task = None
+ self._main_task_cancel = None
try:
main_task.result()
except asyncio.CancelledError:
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2021-02-15 4:38 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2021-02-15 4:38 UTC (permalink / raw
To: gentoo-commits
commit: becd2d4f82ab212fd752848d673a72722af209ff
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Feb 15 04:28:40 2021 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Feb 15 04:35:56 2021 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=becd2d4f
PopenProcess: use call_soon for _async_waipid in _start
Use call_soon to delay the add_child_handler call that
_async_waipid will trigger, so that it will occur after
the event loop is running.
Bug: https://bugs.gentoo.org/770712
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/PopenProcess.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/lib/portage/util/_async/PopenProcess.py b/lib/portage/util/_async/PopenProcess.py
index c1931327a..7f4e17ea2 100644
--- a/lib/portage/util/_async/PopenProcess.py
+++ b/lib/portage/util/_async/PopenProcess.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2018 Gentoo Foundation
+# Copyright 2012-2021 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
from _emerge.SubProcess import SubProcess
@@ -13,7 +13,7 @@ class PopenProcess(SubProcess):
self._registered = True
if self.pipe_reader is None:
- self._async_waitpid()
+ self.scheduler.call_soon(self._async_waitpid)
else:
try:
self.pipe_reader.scheduler = self.scheduler
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2021-03-07 15:17 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2021-03-07 15:17 UTC (permalink / raw
To: gentoo-commits
commit: 03b4af7b288589d0fa475af2a2c9f66022203649
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Mar 7 14:43:59 2021 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Mar 7 14:44:16 2021 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=03b4af7b
BuildLogger: Use async and await syntax
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/BuildLogger.py | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git a/lib/portage/util/_async/BuildLogger.py b/lib/portage/util/_async/BuildLogger.py
index 5a9c076b6..896e4d7b5 100644
--- a/lib/portage/util/_async/BuildLogger.py
+++ b/lib/portage/util/_async/BuildLogger.py
@@ -1,4 +1,4 @@
-# Copyright 2020 Gentoo Authors
+# Copyright 2020-2021 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import functools
@@ -11,7 +11,6 @@ from portage.util import shlex_split
from portage.util._async.PipeLogger import PipeLogger
from portage.util._async.PopenProcess import PopenProcess
from portage.util.futures import asyncio
-from portage.util.futures.compat_coroutine import coroutine
class BuildLogger(AsynchronousTask):
"""
@@ -78,16 +77,15 @@ class BuildLogger(AsynchronousTask):
pipe_logger.start()
self._main_task_cancel = functools.partial(self._main_cancel, filter_proc, pipe_logger)
- self._main_task = asyncio.ensure_future(self._main(filter_proc, pipe_logger, loop=self.scheduler), loop=self.scheduler)
+ self._main_task = asyncio.ensure_future(self._main(filter_proc, pipe_logger), loop=self.scheduler)
self._main_task.add_done_callback(self._main_exit)
- @coroutine
- def _main(self, filter_proc, pipe_logger, loop=None):
+ async def _main(self, filter_proc, pipe_logger):
try:
if pipe_logger.poll() is None:
- yield pipe_logger.async_wait()
+ await pipe_logger.async_wait()
if filter_proc is not None and filter_proc.poll() is None:
- yield filter_proc.async_wait()
+ await filter_proc.async_wait()
except asyncio.CancelledError:
self._main_cancel(filter_proc, pipe_logger)
raise
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2021-03-07 15:17 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2021-03-07 15:17 UTC (permalink / raw
To: gentoo-commits
commit: ce2247b4de0342d7802d4744904bb82870cac1ac
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Mar 7 14:35:57 2021 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Mar 7 14:37:54 2021 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=ce2247b4
PipeLogger: Use async and await syntax
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/PipeLogger.py | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index e8203268c..b7c03043f 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -1,4 +1,4 @@
-# Copyright 2008-2020 Gentoo Authors
+# Copyright 2008-2021 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import fcntl
@@ -9,7 +9,6 @@ import portage
from portage import os, _encodings, _unicode_encode
from portage.util.futures import asyncio
from portage.util.futures._asyncio.streams import _writer
-from portage.util.futures.compat_coroutine import coroutine
from portage.util.futures.unix_events import _set_nonblocking
from _emerge.AbstractPollTask import AbstractPollTask
@@ -53,7 +52,7 @@ class PipeLogger(AbstractPollTask):
fcntl.fcntl(fd, fcntl.F_SETFL,
fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
- self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd, loop=self.scheduler), loop=self.scheduler)
+ self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd), loop=self.scheduler)
self._io_loop_task.add_done_callback(self._io_loop_done)
self._registered = True
@@ -62,8 +61,7 @@ class PipeLogger(AbstractPollTask):
if self.returncode is None:
self.returncode = self._cancelled_returncode
- @coroutine
- def _io_loop(self, input_file, loop=None):
+ async def _io_loop(self, input_file):
background = self.background
stdout_fd = self.stdout_fd
log_file = self._log_file
@@ -77,7 +75,7 @@ class PipeLogger(AbstractPollTask):
future = self.scheduler.create_future()
self.scheduler.add_reader(fd, future.set_result, None)
try:
- yield future
+ await future
finally:
# The loop and input file may have been closed.
if not self.scheduler.is_closed():
@@ -130,7 +128,7 @@ class PipeLogger(AbstractPollTask):
if self._log_file_nb:
# Use the _writer function which uses os.write, since the
# log_file.write method looses data when an EAGAIN occurs.
- yield _writer(log_file, buf, loop=self.scheduler)
+ await _writer(log_file, buf)
else:
# For gzip.GzipFile instances, the above _writer function
# will not work because data written directly to the file
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2021-05-02 0:00 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2021-05-02 0:00 UTC (permalink / raw
To: gentoo-commits
commit: bfe550f69ac4e0126412bc83f7e6f6f0d1c6dc1f
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat May 1 23:38:50 2021 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sat May 1 23:49:07 2021 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=bfe550f6
PipeLogger: handle FileNotFoundError when cancelled during _start
Bug: https://bugs.gentoo.org/787542
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/PipeLogger.py | 35 +++++++++++++++++++++++++----------
1 file changed, 25 insertions(+), 10 deletions(-)
diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index b7c03043f..a335ce96f 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -33,16 +33,31 @@ class PipeLogger(AbstractPollTask):
self._log_file = log_file_path
_set_nonblocking(self._log_file.fileno())
elif log_file_path is not None:
- self._log_file = open(_unicode_encode(log_file_path,
- encoding=_encodings['fs'], errors='strict'), mode='ab')
- if log_file_path.endswith('.gz'):
- self._log_file_real = self._log_file
- self._log_file = gzip.GzipFile(filename='', mode='ab',
- fileobj=self._log_file)
-
- portage.util.apply_secpass_permissions(log_file_path,
- uid=portage.portage_uid, gid=portage.portage_gid,
- mode=0o660)
+ try:
+ self._log_file = open(
+ _unicode_encode(
+ log_file_path, encoding=_encodings["fs"], errors="strict"
+ ),
+ mode="ab",
+ )
+
+ if log_file_path.endswith(".gz"):
+ self._log_file_real = self._log_file
+ self._log_file = gzip.GzipFile(
+ filename="", mode="ab", fileobj=self._log_file
+ )
+
+ portage.util.apply_secpass_permissions(
+ log_file_path,
+ uid=portage.portage_uid,
+ gid=portage.portage_gid,
+ mode=0o660,
+ )
+ except FileNotFoundError:
+ if self._was_cancelled():
+ self._async_wait()
+ return
+ raise
if isinstance(self.input_fd, int):
self.input_fd = os.fdopen(self.input_fd, 'rb', 0)
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2021-09-21 5:51 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2021-09-21 5:51 UTC (permalink / raw
To: gentoo-commits
commit: 3f2ba95a960ed66dde1a0e4cea99f5d54b018d0f
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Sep 21 04:49:23 2021 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Sep 21 04:49:45 2021 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=3f2ba95a
ForkProcess: convert compat coroutine to async
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/ForkProcess.py | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
index 674336935..e70238705 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2020 Gentoo Authors
+# Copyright 2012-2021 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import fcntl
@@ -10,7 +10,6 @@ import sys
import portage
from portage import os
from portage.util.futures import asyncio
-from portage.util.futures.compat_coroutine import coroutine
from _emerge.SpawnProcess import SpawnProcess
@@ -73,15 +72,14 @@ class ForkProcess(SpawnProcess):
if self._proc_join_task is None:
super(ForkProcess, self)._async_waitpid()
- @coroutine
- def _proc_join(self, proc, loop=None):
+ async def _proc_join(self, proc, loop=None):
sentinel_reader = self.scheduler.create_future()
self.scheduler.add_reader(
proc.sentinel,
lambda: sentinel_reader.done() or sentinel_reader.set_result(None),
)
try:
- yield sentinel_reader
+ await sentinel_reader
finally:
# If multiprocessing.Process supports the close method, then
# access to proc.sentinel will raise ValueError if the
@@ -101,7 +99,7 @@ class ForkProcess(SpawnProcess):
proc.join(0)
if proc.exitcode is not None:
break
- yield asyncio.sleep(self._proc_join_interval, loop=loop)
+ await asyncio.sleep(self._proc_join_interval, loop=loop)
def _proc_join_done(self, proc, future):
future.cancelled() or future.result()
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2023-10-03 14:48 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2023-10-03 14:48 UTC (permalink / raw
To: gentoo-commits
commit: 04184a03b74669fdb48403cb8002de6395cf8684
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Oct 3 07:33:29 2023 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Oct 3 14:48:01 2023 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=04184a03
AsyncFunction: Migrate to ForkProcess target parameter
Bug: https://bugs.gentoo.org/915099
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/AsyncFunction.py | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git a/lib/portage/util/_async/AsyncFunction.py b/lib/portage/util/_async/AsyncFunction.py
index f27b255a55..e13daaebb0 100644
--- a/lib/portage/util/_async/AsyncFunction.py
+++ b/lib/portage/util/_async/AsyncFunction.py
@@ -1,6 +1,7 @@
# Copyright 2015-2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
+import functools
import pickle
import traceback
@@ -19,26 +20,27 @@ class AsyncFunction(ForkProcess):
__slots__ = (
"result",
"_async_func_reader",
- "_async_func_reader_pw",
)
def _start(self):
pr, pw = os.pipe()
self.fd_pipes = {} if self.fd_pipes is None else self.fd_pipes
self.fd_pipes[pw] = pw
- self._async_func_reader_pw = pw
self._async_func_reader = PipeReader(
input_files={"input": pr}, scheduler=self.scheduler
)
self._async_func_reader.addExitListener(self._async_func_reader_exit)
self._async_func_reader.start()
+ # args and kwargs are passed as additional args by ForkProcess._bootstrap.
+ self.target = functools.partial(self._target_wrapper, pw, self.target)
ForkProcess._start(self)
os.close(pw)
- def _run(self):
+ @staticmethod
+ def _target_wrapper(pw, target, *args, **kwargs):
try:
- result = self.target(*(self.args or []), **(self.kwargs or {}))
- os.write(self._async_func_reader_pw, pickle.dumps(result))
+ result = target(*args, **kwargs)
+ os.write(pw, pickle.dumps(result))
except Exception:
traceback.print_exc()
return 1
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2023-10-03 14:48 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2023-10-03 14:48 UTC (permalink / raw
To: gentoo-commits
commit: 72760ab948047cf1f5b9e1c1fcf9d2c4934afc8c
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Oct 3 06:24:21 2023 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Oct 3 14:47:37 2023 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=72760ab9
ForkProcess: Add target constructor parameter
If the _run method is not implemented, then use a new
"target" constructor parameter like multiprocessing.Process.
Support "args" and "kwargs" constructor parameters as well.
If the _run method is implemented, then behave in a backward
compatible manner, and ignore the "args" and "kwargs" constructor
parameters which are used by the AsyncFunction subclass.
Bug: https://bugs.gentoo.org/915099
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/AsyncFunction.py | 7 +------
lib/portage/util/_async/ForkProcess.py | 35 +++++++++++++++++++++++++++-----
2 files changed, 31 insertions(+), 11 deletions(-)
diff --git a/lib/portage/util/_async/AsyncFunction.py b/lib/portage/util/_async/AsyncFunction.py
index 8c13b3f5ba..f27b255a55 100644
--- a/lib/portage/util/_async/AsyncFunction.py
+++ b/lib/portage/util/_async/AsyncFunction.py
@@ -1,4 +1,4 @@
-# Copyright 2015-2020 Gentoo Authors
+# Copyright 2015-2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import pickle
@@ -16,13 +16,8 @@ class AsyncFunction(ForkProcess):
"result" attribute after the forked process has exited.
"""
- # NOTE: This class overrides the meaning of the SpawnProcess 'args'
- # attribute, and uses it to hold the positional arguments for the
- # 'target' function.
__slots__ = (
- "kwargs",
"result",
- "target",
"_async_func_reader",
"_async_func_reader_pw",
)
diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
index 22a0e0cd85..3deaf18fd0 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -1,4 +1,4 @@
-# Copyright 2012-2021 Gentoo Authors
+# Copyright 2012-2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import fcntl
@@ -14,7 +14,15 @@ from _emerge.SpawnProcess import SpawnProcess
class ForkProcess(SpawnProcess):
- __slots__ = ("_proc", "_proc_join_task")
+ # NOTE: This class overrides the meaning of the SpawnProcess 'args'
+ # attribute, and uses it to hold the positional arguments for the
+ # 'target' function.
+ __slots__ = (
+ "kwargs",
+ "target",
+ "_proc",
+ "_proc_join_task",
+ )
# Number of seconds between poll attempts for process exit status
# (after the sentinel has become ready).
@@ -27,6 +35,18 @@ class ForkProcess(SpawnProcess):
any pre-fork and post-fork interpreter housekeeping that it provides,
promoting a healthy state for the forked interpreter.
"""
+
+ if self.__class__._run is ForkProcess._run:
+ # target replaces the deprecated self._run method
+ target = self.target
+ args = self.args
+ kwargs = self.kwargs
+ else:
+ # _run implementation triggers backward-compatibility mode
+ target = self._run
+ args = None
+ kwargs = None
+
# Since multiprocessing.Process closes sys.__stdin__, create a
# temporary duplicate of fd_pipes[0] so that sys.__stdin__ can
# be restored in the subprocess, in case this is needed for
@@ -41,7 +61,8 @@ class ForkProcess(SpawnProcess):
)
fd_pipes[0] = stdin_dup
self._proc = multiprocessing.Process(
- target=self._bootstrap, args=(fd_pipes,)
+ target=self._bootstrap,
+ args=(fd_pipes, target, args, kwargs),
)
self._proc.start()
finally:
@@ -122,7 +143,8 @@ class ForkProcess(SpawnProcess):
self._proc_join_task.cancel()
self._proc_join_task = None
- def _bootstrap(self, fd_pipes):
+ @staticmethod
+ def _bootstrap(fd_pipes, target, args, kwargs):
# Use default signal handlers in order to avoid problems
# killing subprocesses as reported in bug #353239.
signal.signal(signal.SIGINT, signal.SIG_DFL)
@@ -159,7 +181,10 @@ class ForkProcess(SpawnProcess):
)
sys.__stdin__ = sys.stdin
- sys.exit(self._run())
+ sys.exit(target(*(args or []), **(kwargs or {})))
def _run(self):
+ """
+ Deprecated and replaced with the "target" constructor parameter.
+ """
raise NotImplementedError(self)
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2023-10-04 3:25 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2023-10-04 3:25 UTC (permalink / raw
To: gentoo-commits
commit: 3dfe5e326eaaca877bc7a45ec84d6b39fc8d137a
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Oct 4 02:56:10 2023 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Oct 4 03:07:12 2023 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=3dfe5e32
FileDigester: Migrate to AsyncFunction
Bug: https://bugs.gentoo.org/915099
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/FileDigester.py | 71 +++++----------------------------
1 file changed, 11 insertions(+), 60 deletions(-)
diff --git a/lib/portage/util/_async/FileDigester.py b/lib/portage/util/_async/FileDigester.py
index ce334ee95a..6491423ae4 100644
--- a/lib/portage/util/_async/FileDigester.py
+++ b/lib/portage/util/_async/FileDigester.py
@@ -1,13 +1,13 @@
-# Copyright 2013 Gentoo Foundation
+# Copyright 2013-2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
-from portage import os
+import functools
+
from portage.checksum import perform_multiple_checksums
-from portage.util._async.ForkProcess import ForkProcess
-from _emerge.PipeReader import PipeReader
+from portage.util._async.AsyncFunction import AsyncFunction
-class FileDigester(ForkProcess):
+class FileDigester(AsyncFunction):
"""
Asynchronously generate file digests. Pass in file_path and
hash_names, and after successful execution, the digests
@@ -17,64 +17,15 @@ class FileDigester(ForkProcess):
__slots__ = (
"file_path",
- "digests",
"hash_names",
- "_digest_pipe_reader",
- "_digest_pw",
)
def _start(self):
- pr, pw = os.pipe()
- self.fd_pipes = {}
- self.fd_pipes[pw] = pw
- self._digest_pw = pw
- self._digest_pipe_reader = PipeReader(
- input_files={"input": pr}, scheduler=self.scheduler
+ self.target = functools.partial(
+ perform_multiple_checksums, self.file_path, hashes=self.hash_names
)
- self._digest_pipe_reader.addExitListener(self._digest_pipe_reader_exit)
- self._digest_pipe_reader.start()
- ForkProcess._start(self)
- os.close(pw)
-
- def _run(self):
- digests = perform_multiple_checksums(self.file_path, hashes=self.hash_names)
-
- buf = "".join("%s=%s\n" % item for item in digests.items()).encode("utf_8")
-
- while buf:
- buf = buf[os.write(self._digest_pw, buf) :]
-
- return os.EX_OK
-
- def _parse_digests(self, data):
- digests = {}
- for line in data.decode("utf_8").splitlines():
- parts = line.split("=", 1)
- if len(parts) == 2:
- digests[parts[0]] = parts[1]
-
- self.digests = digests
-
- def _async_waitpid(self):
- # Ignore this event, since we want to ensure that we
- # exit only after _digest_pipe_reader has reached EOF.
- if self._digest_pipe_reader is None:
- ForkProcess._async_waitpid(self)
-
- def _digest_pipe_reader_exit(self, pipe_reader):
- self._parse_digests(pipe_reader.getvalue())
- self._digest_pipe_reader = None
- if self.pid is None:
- self._unregister()
- self._async_wait()
- else:
- self._async_waitpid()
-
- def _unregister(self):
- ForkProcess._unregister(self)
+ super()._start()
- pipe_reader = self._digest_pipe_reader
- if pipe_reader is not None:
- self._digest_pipe_reader = None
- pipe_reader.removeExitListener(self._digest_pipe_reader_exit)
- pipe_reader.cancel()
+ @property
+ def digests(self):
+ return self.result
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2023-10-04 4:01 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2023-10-04 4:01 UTC (permalink / raw
To: gentoo-commits
commit: 73eb44fce683a8cbfca195f01783b94eea6e7eca
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Oct 4 03:42:41 2023 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Oct 4 03:43:18 2023 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=73eb44fc
FileCopier: multiprocessing spawn compat
Bug: https://bugs.gentoo.org/915099
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/FileCopier.py | 18 +++++++++++-------
1 file changed, 11 insertions(+), 7 deletions(-)
diff --git a/lib/portage/util/_async/FileCopier.py b/lib/portage/util/_async/FileCopier.py
index da0e85ee4e..d53ff08591 100644
--- a/lib/portage/util/_async/FileCopier.py
+++ b/lib/portage/util/_async/FileCopier.py
@@ -1,4 +1,4 @@
-# Copyright 2013-2019 Gentoo Authors
+# Copyright 2013-2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import os as _os
@@ -20,16 +20,20 @@ class FileCopier(AsyncTaskFuture):
def _start(self):
self.future = asyncio.ensure_future(
- self.scheduler.run_in_executor(ForkExecutor(loop=self.scheduler), self._run)
+ self.scheduler.run_in_executor(
+ ForkExecutor(loop=self.scheduler),
+ self._target,
+ self.src_path,
+ self.dest_path,
+ )
)
super()._start()
- def _run(self):
- src_path = _unicode_encode(
- self.src_path, encoding=_encodings["fs"], errors="strict"
- )
+ @staticmethod
+ def _target(src_path, dest_path):
+ src_path = _unicode_encode(src_path, encoding=_encodings["fs"], errors="strict")
dest_path = _unicode_encode(
- self.dest_path, encoding=_encodings["fs"], errors="strict"
+ dest_path, encoding=_encodings["fs"], errors="strict"
)
copyfile(src_path, dest_path)
apply_stat_permissions(dest_path, _os.stat(src_path))
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2023-10-05 6:28 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2023-10-05 6:28 UTC (permalink / raw
To: gentoo-commits
commit: ed458fa634c37c13cadd436b38498678f4ee103d
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Oct 5 05:54:40 2023 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Thu Oct 5 05:59:39 2023 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=ed458fa6
ForkProcess: Warn if _run method is used
Bug: https://bugs.gentoo.org/915099
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/ForkProcess.py | 6 ++++++
1 file changed, 6 insertions(+)
diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
index 3deaf18fd0..1d2d220ed4 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -4,6 +4,7 @@
import fcntl
import functools
import multiprocessing
+import warnings
import signal
import sys
@@ -46,6 +47,11 @@ class ForkProcess(SpawnProcess):
target = self._run
args = None
kwargs = None
+ warnings.warn(
+ 'portage.util._async.ForkProcess.ForkProcess._run is deprecated in favor of the "target" parameter',
+ UserWarning,
+ stacklevel=2,
+ )
# Since multiprocessing.Process closes sys.__stdin__, create a
# temporary duplicate of fd_pipes[0] so that sys.__stdin__ can
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2023-12-06 16:25 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2023-12-06 16:25 UTC (permalink / raw
To: gentoo-commits
commit: df212738bbb209356472911cda79902f0e25918e
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Dec 5 04:23:56 2023 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Dec 6 16:22:56 2023 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=df212738
BuildLogger: Close self._stdin after fork
In order to ensure that we can observe EOF on the read end of the pipe,
close self._stdin after fork. Since portage.locks._close_fds() already
does something similar for _lock_manager instances which have a close()
method, it will also work with these _file_close_wrapper objects.
The portage.locks._close_fds() function calls close after fork, in
the ForkProcess._bootstrap method. For more general fork coverage,
we could move the _close_fds() call to the _ForkWatcher.hook method
in portage/__init__.py, but I've reserved that for a later change
since _close_fds() has been working fine for us where we call it now.
Bug: https://bugs.gentoo.org/919072
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/BuildLogger.py | 34 +++++++++++++++++++++++++++++++---
1 file changed, 31 insertions(+), 3 deletions(-)
diff --git a/lib/portage/util/_async/BuildLogger.py b/lib/portage/util/_async/BuildLogger.py
index 502b3390e5..9f8a21ab2b 100644
--- a/lib/portage/util/_async/BuildLogger.py
+++ b/lib/portage/util/_async/BuildLogger.py
@@ -1,4 +1,4 @@
-# Copyright 2020-2021 Gentoo Authors
+# Copyright 2020-2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import functools
@@ -6,13 +6,41 @@ import subprocess
from _emerge.AsynchronousTask import AsynchronousTask
+import portage
from portage import os
+from portage.proxy.objectproxy import ObjectProxy
from portage.util import shlex_split
from portage.util._async.PipeLogger import PipeLogger
from portage.util._async.PopenProcess import PopenProcess
from portage.util.futures import asyncio
+class _file_close_wrapper(ObjectProxy):
+ """
+ Prevent fd inheritance via fork, ensuring that we can observe
+ EOF on the read end of the pipe (bug 919072).
+ """
+
+ __slots__ = ("_file",)
+
+ def __init__(self, file):
+ ObjectProxy.__init__(self)
+ object.__setattr__(self, "_file", file)
+ portage.locks._open_fds[file.fileno()] = self
+
+ def _get_target(self):
+ return object.__getattribute__(self, "_file")
+
+ def close(self):
+ file = object.__getattribute__(self, "_file")
+ if not file.closed:
+ # This must only be called if the file is open,
+ # which ensures that file.fileno() does not
+ # collide with an open lock file descriptor.
+ del portage.locks._open_fds[file.fileno()]
+ file.close()
+
+
class BuildLogger(AsynchronousTask):
"""
Write to a log file, with compression support provided by PipeLogger.
@@ -67,7 +95,7 @@ class BuildLogger(AsynchronousTask):
os.close(log_input)
os.close(filter_output)
else:
- self._stdin = os.fdopen(stdin, "wb", 0)
+ self._stdin = _file_close_wrapper(os.fdopen(stdin, "wb", 0))
os.close(filter_input)
os.close(filter_output)
@@ -76,7 +104,7 @@ class BuildLogger(AsynchronousTask):
# that is missing or broken somehow, create a pipe that
# logs directly to pipe_logger.
log_input, stdin = os.pipe()
- self._stdin = os.fdopen(stdin, "wb", 0)
+ self._stdin = _file_close_wrapper(os.fdopen(stdin, "wb", 0))
# Set background=True so that pipe_logger does not log to stdout.
pipe_logger = PipeLogger(
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2024-02-03 22:54 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2024-02-03 22:54 UTC (permalink / raw
To: gentoo-commits
commit: 9f96ce5105e7bd2580ae9acc34d6ebad914dae47
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat Feb 3 19:36:05 2024 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sat Feb 3 19:55:19 2024 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=9f96ce51
ForkProcess: Use duplicate fd_pipes in _send_fd_pipes thread
In order to allow callers to manage the lifecycle of fd_pipes
file descriptors, create duplicates for _send_fd_pipes to
close when it has finished sending them.
This fixes bug 916601 in a nice way, allowing commit
3b1234ba69a31709cd5aec1ae070901e3a28bb7c to be reverted.
Bug: https://bugs.gentoo.org/916601
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/ForkProcess.py | 55 +++++++++++++++++++++++++++++-----
1 file changed, 48 insertions(+), 7 deletions(-)
diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
index 711bd2a7ba..3acbe34fc6 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -24,6 +24,8 @@ class ForkProcess(SpawnProcess):
"kwargs",
"target",
"_child_connection",
+ # Duplicate file descriptors for use by _send_fd_pipes background thread.
+ "_fd_pipes",
)
_file_names = ("connection", "slave_fd")
@@ -53,7 +55,13 @@ class ForkProcess(SpawnProcess):
duplex=self._HAVE_SEND_HANDLE
)
- self._proc = self._spawn(self.args, fd_pipes=self.fd_pipes)
+ # Handle fd_pipes in _main instead, since file descriptors are
+ # not inherited with the multiprocessing "spawn" start method.
+ # Pass fd_pipes=None to spawn here so that it doesn't leave
+ # a closed stdin duplicate in fd_pipes (that would trigger
+ # "Bad file descriptor" error if we tried to send it via
+ # send_handle).
+ self._proc = self._spawn(self.args, fd_pipes=None)
self._registered = True
@@ -74,6 +82,25 @@ class ForkProcess(SpawnProcess):
self.fd_pipes[1] = slave_fd
self.fd_pipes[2] = slave_fd
self._files = self._files_dict(connection=connection, slave_fd=slave_fd)
+
+ # Create duplicate file descriptors in self._fd_pipes
+ # so that the caller is free to manage the lifecycle
+ # of the original fd_pipes.
+ self._fd_pipes = {}
+ fd_map = {}
+ for dest, src in list(self.fd_pipes.items()):
+ if src not in fd_map:
+ src_new = fd_map[src] = os.dup(src)
+ old_fdflags = fcntl.fcntl(src, fcntl.F_GETFD)
+ fcntl.fcntl(src_new, fcntl.F_SETFD, old_fdflags)
+ os.set_inheritable(
+ src_new, not bool(old_fdflags & fcntl.FD_CLOEXEC)
+ )
+ self._fd_pipes[dest] = fd_map[src]
+
+ asyncio.ensure_future(
+ self._proc.wait(), self.scheduler
+ ).add_done_callback(self._close_fd_pipes)
else:
master_fd = connection
@@ -81,6 +108,19 @@ class ForkProcess(SpawnProcess):
master_fd, log_file_path=self.logfile, stdout_fd=stdout_fd
)
+ def _close_fd_pipes(self, future):
+ """
+ Cleanup self._fd_pipes if needed, since _send_fd_pipes could
+ have been cancelled.
+ """
+ # future.result() raises asyncio.CancelledError if
+ # future.cancelled(), but that should not happen.
+ future.result()
+ if self._fd_pipes is not None:
+ for fd in set(self._fd_pipes.values()):
+ os.close(fd)
+ self._fd_pipes = None
+
@property
def _fd_pipes_send_handle(self):
"""Returns True if we have a connection to implement fd_pipes via send_handle."""
@@ -95,9 +135,9 @@ class ForkProcess(SpawnProcess):
Communicate with _bootstrap to send fd_pipes via send_handle.
This performs blocking IO, intended for invocation via run_in_executor.
"""
- fd_list = list(set(self.fd_pipes.values()))
+ fd_list = list(set(self._fd_pipes.values()))
self._files.connection.send(
- (self.fd_pipes, fd_list),
+ (self._fd_pipes, fd_list),
)
for fd in fd_list:
multiprocessing.reduction.send_handle(
@@ -106,6 +146,11 @@ class ForkProcess(SpawnProcess):
self.pid,
)
+ # self._fd_pipes contains duplicates that must be closed.
+ for fd in fd_list:
+ os.close(fd)
+ self._fd_pipes = None
+
async def _main(self, build_logger, pipe_logger, loop=None):
try:
if self._fd_pipes_send_handle:
@@ -167,10 +212,6 @@ class ForkProcess(SpawnProcess):
)
fd_pipes[0] = stdin_dup
- if self._fd_pipes_send_handle:
- # Handle fd_pipes in _main instead.
- fd_pipes = None
-
proc = multiprocessing.Process(
target=self._bootstrap,
args=(
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2024-02-07 2:35 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2024-02-07 2:35 UTC (permalink / raw
To: gentoo-commits
commit: 7d7ef237f3dddcf450fedab5aabfd57d1fb3406d
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Feb 6 01:35:25 2024 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Feb 7 00:36:56 2024 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=7d7ef237
ForkProcess: Handle BrokenPipeError in _send_fd_pipes
Convert _send_fd_pipes BrokenPipeError to asyncio.CancelledError,
in order to gracefully handle a concurrently terminated child
process as in testAsynchronousLockWaitCancel. Even if the child
terminated abnormally, then there is no harm in suppressing the
exception here, since the child error should have gone to stderr.
Bug: https://bugs.gentoo.org/923852
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/ForkProcess.py | 25 +++++++++++++++++--------
1 file changed, 17 insertions(+), 8 deletions(-)
diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
index cb240d0712..ebcbd94107 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -153,15 +153,24 @@ class ForkProcess(SpawnProcess):
This performs blocking IO, intended for invocation via run_in_executor.
"""
fd_list = list(set(self._fd_pipes.values()))
- self._files.connection.send(
- (self._fd_pipes, fd_list),
- )
- for fd in fd_list:
- multiprocessing.reduction.send_handle(
- self._files.connection,
- fd,
- self.pid,
+ try:
+ self._files.connection.send(
+ (self._fd_pipes, fd_list),
)
+ for fd in fd_list:
+ multiprocessing.reduction.send_handle(
+ self._files.connection,
+ fd,
+ self.pid,
+ )
+ except BrokenPipeError as e:
+ # This case is triggered by testAsynchronousLockWaitCancel
+ # when the test case terminates the child process while
+ # this thread is still sending the fd_pipes (bug 923852).
+ # Even if the child terminated abnormally, then there is
+ # no harm in suppressing the exception here, since the
+ # child error should have gone to stderr.
+ raise asyncio.CancelledError from e
# self._fd_pipes contains duplicates that must be closed.
for fd in fd_list:
^ permalink raw reply related [flat|nested] 22+ messages in thread
* [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/
@ 2024-02-14 15:45 Zac Medico
0 siblings, 0 replies; 22+ messages in thread
From: Zac Medico @ 2024-02-14 15:45 UTC (permalink / raw
To: gentoo-commits
commit: 038ad1029ea574b106906380e47479db1041bee2
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Feb 14 05:55:31 2024 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Feb 14 06:04:22 2024 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=038ad102
BuildLogger: Fix portage.locks._open_fds memory leak
The _file_close_wrapper __getattribute__ method needs to
be overridden to expose its close method, otherwise the
underlying file's close method is called and the closed
file object remains as a memory leak in the global
portage.locks._open_fds dict. For reference, see similar
classes like portage.util.atomic_ofstream which overrides
methods in the same way.
Bug: https://bugs.gentoo.org/919072
Fixes: df212738bbb2 ("BuildLogger: Close self._stdin after fork")
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/portage/util/_async/BuildLogger.py | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
diff --git a/lib/portage/util/_async/BuildLogger.py b/lib/portage/util/_async/BuildLogger.py
index 9f8a21ab2b..0cfc90a942 100644
--- a/lib/portage/util/_async/BuildLogger.py
+++ b/lib/portage/util/_async/BuildLogger.py
@@ -1,4 +1,4 @@
-# Copyright 2020-2023 Gentoo Authors
+# Copyright 2020-2024 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2
import functools
@@ -31,6 +31,11 @@ class _file_close_wrapper(ObjectProxy):
def _get_target(self):
return object.__getattribute__(self, "_file")
+ def __getattribute__(self, attr):
+ if attr == "close":
+ return object.__getattribute__(self, attr)
+ return getattr(object.__getattribute__(self, "_file"), attr)
+
def close(self):
file = object.__getattribute__(self, "_file")
if not file.closed:
^ permalink raw reply related [flat|nested] 22+ messages in thread
end of thread, other threads:[~2024-02-14 15:46 UTC | newest]
Thread overview: 22+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2020-06-14 0:02 [gentoo-commits] proj/portage:master commit in: lib/portage/util/_async/ Zac Medico
-- strict thread matches above, loose matches on Subject: below --
2024-02-14 15:45 Zac Medico
2024-02-07 2:35 Zac Medico
2024-02-03 22:54 Zac Medico
2023-12-06 16:25 Zac Medico
2023-10-05 6:28 Zac Medico
2023-10-04 4:01 Zac Medico
2023-10-04 3:25 Zac Medico
2023-10-03 14:48 Zac Medico
2023-10-03 14:48 Zac Medico
2021-09-21 5:51 Zac Medico
2021-05-02 0:00 Zac Medico
2021-03-07 15:17 Zac Medico
2021-03-07 15:17 Zac Medico
2021-02-15 4:38 Zac Medico
2020-08-09 0:46 Zac Medico
2020-08-03 23:28 Zac Medico
2020-06-24 4:36 Zac Medico
2020-04-08 5:56 Zac Medico
2020-03-01 1:22 Zac Medico
2020-02-29 7:51 Zac Medico
2020-02-29 7:51 Zac Medico
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox