* [gentoo-commits] proj/portage:master commit in: lib/portage/tests/process/, lib/portage/util/futures/_asyncio/, ...
@ 2020-08-19 4:47 Zac Medico
0 siblings, 0 replies; only message in thread
From: Zac Medico @ 2020-08-19 4:47 UTC (permalink / raw
To: gentoo-commits
commit: dc7919541712d846574e6b7d672a3bed0ca7ef1a
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Aug 18 06:31:54 2020 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Aug 19 04:01:46 2020 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=dc791954
coroutine: use explicit loop parameter (bug 737698)
In order to support local event loops within API functions
like doebuild, use an explicit loop parameter when calling a
coroutine. Internal code will now raise an AssertionError if
the loop parameter is omitted for a coroutine, but API
consumers may omit it.
Bug: https://bugs.gentoo.org/737698
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>
lib/_emerge/Binpkg.py | 8 ++--
lib/_emerge/EbuildPhase.py | 16 ++++----
lib/_emerge/Scheduler.py | 4 +-
lib/_emerge/SequentialTaskQueue.py | 4 +-
lib/_emerge/SpawnProcess.py | 5 ++-
lib/portage/dbapi/bintree.py | 12 +++---
lib/portage/dbapi/vartree.py | 8 ++--
.../repository/storage/hardlink_quarantine.py | 26 ++++++-------
lib/portage/repository/storage/hardlink_rcu.py | 34 ++++++++--------
lib/portage/repository/storage/inplace.py | 10 ++---
lib/portage/repository/storage/interface.py | 10 ++---
lib/portage/sync/syncbase.py | 2 +-
lib/portage/tests/dbapi/test_auxdb.py | 9 +++--
lib/portage/tests/emerge/test_simple.py | 6 +--
lib/portage/tests/process/test_AsyncFunction.py | 4 +-
lib/portage/tests/process/test_PipeLogger.py | 2 +-
.../util/futures/asyncio/test_child_watcher.py | 4 +-
.../tests/util/futures/test_compat_coroutine.py | 45 ++++++++++++----------
lib/portage/tests/util/test_socks5.py | 2 +-
lib/portage/util/_async/BuildLogger.py | 4 +-
lib/portage/util/_async/ForkProcess.py | 6 +--
lib/portage/util/_async/PipeLogger.py | 4 +-
lib/portage/util/_async/SchedulerInterface.py | 4 +-
lib/portage/util/futures/_asyncio/process.py | 16 ++++----
lib/portage/util/futures/_sync_decorator.py | 3 +-
lib/portage/util/futures/compat_coroutine.py | 7 +++-
lib/portage/util/socks5.py | 4 +-
repoman/lib/repoman/modules/scan/depend/profile.py | 4 +-
28 files changed, 138 insertions(+), 125 deletions(-)
diff --git a/lib/_emerge/Binpkg.py b/lib/_emerge/Binpkg.py
index b5a69f8e7..9d2909d42 100644
--- a/lib/_emerge/Binpkg.py
+++ b/lib/_emerge/Binpkg.py
@@ -250,11 +250,11 @@ class Binpkg(CompositeTask):
return
self._start_task(
- AsyncTaskFuture(future=self._unpack_metadata()),
+ AsyncTaskFuture(future=self._unpack_metadata(loop=self.scheduler)),
self._unpack_metadata_exit)
@coroutine
- def _unpack_metadata(self):
+ def _unpack_metadata(self, loop=None):
dir_path = self.settings['PORTAGE_BUILDDIR']
@@ -271,7 +271,7 @@ class Binpkg(CompositeTask):
portage.prepare_build_dirs(self.settings["ROOT"], self.settings, 1)
self._writemsg_level(">>> Extracting info\n")
- yield self._bintree.dbapi.unpack_metadata(self.settings, infloc)
+ yield self._bintree.dbapi.unpack_metadata(self.settings, infloc, loop=self.scheduler)
check_missing_metadata = ("CATEGORY", "PF")
for k, v in zip(check_missing_metadata,
self._bintree.dbapi.aux_get(self.pkg.cpv, check_missing_metadata)):
@@ -333,7 +333,7 @@ class Binpkg(CompositeTask):
self._start_task(
AsyncTaskFuture(future=self._bintree.dbapi.unpack_contents(
self.settings,
- self._image_dir)),
+ self._image_dir, loop=self.scheduler)),
self._unpack_contents_exit)
def _unpack_contents_exit(self, unpack_contents):
diff --git a/lib/_emerge/EbuildPhase.py b/lib/_emerge/EbuildPhase.py
index 4bc2749bd..e4c0428a6 100644
--- a/lib/_emerge/EbuildPhase.py
+++ b/lib/_emerge/EbuildPhase.py
@@ -70,11 +70,11 @@ class EbuildPhase(CompositeTask):
_locked_phases = ("setup", "preinst", "postinst", "prerm", "postrm")
def _start(self):
- future = asyncio.ensure_future(self._async_start(), loop=self.scheduler)
+ future = asyncio.ensure_future(self._async_start(loop=self.scheduler), loop=self.scheduler)
self._start_task(AsyncTaskFuture(future=future), self._async_start_exit)
@coroutine
- def _async_start(self):
+ def _async_start(self, loop=None):
need_builddir = self.phase not in EbuildProcess._phases_without_builddir
@@ -132,7 +132,7 @@ class EbuildPhase(CompositeTask):
# Force background=True for this header since it's intended
# for the log and it doesn't necessarily need to be visible
# elsewhere.
- yield self._elog('einfo', msg, background=True)
+ yield self._elog('einfo', msg, background=True, loop=self.scheduler)
if self.phase == 'package':
if 'PORTAGE_BINPKG_TMPFILE' not in self.settings:
@@ -403,7 +403,7 @@ class EbuildPhase(CompositeTask):
self.wait()
@coroutine
- def _elog(self, elog_funcname, lines, background=None):
+ def _elog(self, elog_funcname, lines, background=None, loop=None):
if background is None:
background = self.background
out = io.StringIO()
@@ -435,7 +435,7 @@ class EbuildPhase(CompositeTask):
log_file = build_logger.stdin
yield self.scheduler.async_output(msg, log_file=log_file,
- background=background)
+ background=background, loop=self.scheduler)
if build_logger is not None:
build_logger.stdin.close()
@@ -487,7 +487,7 @@ class _PostPhaseCommands(CompositeTask):
if 'qa-unresolved-soname-deps' in self.settings.features:
# This operates on REQUIRES metadata generated by the above function call.
- future = self._soname_deps_qa()
+ future = asyncio.ensure_future(self._soname_deps_qa(loop=self.scheduler), loop=self.scheduler)
# If an unexpected exception occurs, then this will raise it.
future.add_done_callback(lambda future: future.cancelled() or future.result())
self._start_task(AsyncTaskFuture(future=future), self._default_final_exit)
@@ -497,7 +497,7 @@ class _PostPhaseCommands(CompositeTask):
self._default_final_exit(task)
@coroutine
- def _soname_deps_qa(self):
+ def _soname_deps_qa(self, loop=None):
vardb = QueryCommand.get_db()[self.settings['EROOT']]['vartree'].dbapi
@@ -512,4 +512,4 @@ class _PostPhaseCommands(CompositeTask):
qa_msg.extend("\t%s: %s" % (filename, " ".join(sorted(soname_deps)))
for filename, soname_deps in unresolved)
qa_msg.append("")
- yield self.elog("eqawarn", qa_msg)
+ yield self.elog("eqawarn", qa_msg, loop=self.scheduler)
diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py
index 2427d953c..a69421288 100644
--- a/lib/_emerge/Scheduler.py
+++ b/lib/_emerge/Scheduler.py
@@ -871,7 +871,7 @@ class Scheduler(PollScheduler):
infloc = os.path.join(build_dir_path, "build-info")
ensure_dirs(infloc)
self._sched_iface.run_until_complete(
- bintree.dbapi.unpack_metadata(settings, infloc))
+ bintree.dbapi.unpack_metadata(settings, infloc, loop=self._sched_iface))
ebuild_path = os.path.join(infloc, x.pf + ".ebuild")
settings.configdict["pkg"]["EMERGE_FROM"] = "binary"
settings.configdict["pkg"]["MERGE_TYPE"] = "binary"
@@ -1621,7 +1621,7 @@ class Scheduler(PollScheduler):
if (self._task_queues.merge and (self._schedule_merge_wakeup_task is None
or self._schedule_merge_wakeup_task.done())):
self._schedule_merge_wakeup_task = asyncio.ensure_future(
- self._task_queues.merge.wait(), loop=self._event_loop)
+ self._task_queues.merge.wait(loop=self._event_loop), loop=self._event_loop)
self._schedule_merge_wakeup_task.add_done_callback(
self._schedule_merge_wakeup)
diff --git a/lib/_emerge/SequentialTaskQueue.py b/lib/_emerge/SequentialTaskQueue.py
index 40590b76c..02fe19912 100644
--- a/lib/_emerge/SequentialTaskQueue.py
+++ b/lib/_emerge/SequentialTaskQueue.py
@@ -69,7 +69,7 @@ class SequentialTaskQueue(SlotObject):
task.cancel()
@coroutine
- def wait(self):
+ def wait(self, loop=None):
"""
Wait for the queue to become empty. This method is a coroutine.
"""
@@ -77,7 +77,7 @@ class SequentialTaskQueue(SlotObject):
task = next(iter(self.running_tasks), None)
if task is None:
# Wait for self.running_tasks to populate.
- yield asyncio.sleep(0)
+ yield asyncio.sleep(0, loop=loop)
else:
yield task.async_wait()
diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py
index cea16df27..c43d60d3f 100644
--- a/lib/_emerge/SpawnProcess.py
+++ b/lib/_emerge/SpawnProcess.py
@@ -140,11 +140,12 @@ class SpawnProcess(SubProcess):
self._registered = True
self._main_task_cancel = functools.partial(self._main_cancel, build_logger, pipe_logger)
- self._main_task = asyncio.ensure_future(self._main(build_logger, pipe_logger), loop=self.scheduler)
+ self._main_task = asyncio.ensure_future(
+ self._main(build_logger, pipe_logger, loop=self.scheduler), loop=self.scheduler)
self._main_task.add_done_callback(self._main_exit)
@coroutine
- def _main(self, build_logger, pipe_logger):
+ def _main(self, build_logger, pipe_logger, loop=None):
try:
if pipe_logger.poll() is None:
yield pipe_logger.async_wait()
diff --git a/lib/portage/dbapi/bintree.py b/lib/portage/dbapi/bintree.py
index 59c265688..620865a79 100644
--- a/lib/portage/dbapi/bintree.py
+++ b/lib/portage/dbapi/bintree.py
@@ -217,7 +217,7 @@ class bindbapi(fakedbapi):
@coroutine
- def unpack_metadata(self, pkg, dest_dir):
+ def unpack_metadata(self, pkg, dest_dir, loop=None):
"""
Unpack package metadata to a directory. This method is a coroutine.
@@ -226,7 +226,7 @@ class bindbapi(fakedbapi):
@param dest_dir: destination directory
@type dest_dir: str
"""
- loop = asyncio._wrap_loop()
+ loop = asyncio._wrap_loop(loop)
if isinstance(pkg, _pkg_str):
cpv = pkg
else:
@@ -234,14 +234,14 @@ class bindbapi(fakedbapi):
key = self._instance_key(cpv)
add_pkg = self.bintree._additional_pkgs.get(key)
if add_pkg is not None:
- yield add_pkg._db.unpack_metadata(pkg, dest_dir)
+ yield add_pkg._db.unpack_metadata(pkg, dest_dir, loop=loop)
else:
tbz2_file = self.bintree.getname(cpv)
yield loop.run_in_executor(ForkExecutor(loop=loop),
portage.xpak.tbz2(tbz2_file).unpackinfo, dest_dir)
@coroutine
- def unpack_contents(self, pkg, dest_dir):
+ def unpack_contents(self, pkg, dest_dir, loop=None):
"""
Unpack package contents to a directory. This method is a coroutine.
@@ -250,7 +250,7 @@ class bindbapi(fakedbapi):
@param dest_dir: destination directory
@type dest_dir: str
"""
- loop = asyncio._wrap_loop()
+ loop = asyncio._wrap_loop(loop)
if isinstance(pkg, _pkg_str):
settings = self.settings
cpv = pkg
@@ -280,7 +280,7 @@ class bindbapi(fakedbapi):
add_pkg = self.bintree._additional_pkgs.get(instance_key)
if add_pkg is None:
raise portage.exception.PackageNotFound(cpv)
- yield add_pkg._db.unpack_contents(pkg, dest_dir)
+ yield add_pkg._db.unpack_contents(pkg, dest_dir, loop=loop)
def cp_list(self, *pargs, **kwargs):
if not self.bintree.populated:
diff --git a/lib/portage/dbapi/vartree.py b/lib/portage/dbapi/vartree.py
index 3eee025ad..1547d2f6d 100644
--- a/lib/portage/dbapi/vartree.py
+++ b/lib/portage/dbapi/vartree.py
@@ -931,7 +931,7 @@ class vardbapi(dbapi):
self._bump_mtime(cpv)
@coroutine
- def unpack_metadata(self, pkg, dest_dir):
+ def unpack_metadata(self, pkg, dest_dir, loop=None):
"""
Unpack package metadata to a directory. This method is a coroutine.
@@ -940,7 +940,7 @@ class vardbapi(dbapi):
@param dest_dir: destination directory
@type dest_dir: str
"""
- loop = asyncio._wrap_loop()
+ loop = asyncio._wrap_loop(loop)
if not isinstance(pkg, portage.config):
cpv = pkg
else:
@@ -956,7 +956,7 @@ class vardbapi(dbapi):
@coroutine
def unpack_contents(self, pkg, dest_dir,
- include_config=None, include_unmodified_config=None):
+ include_config=None, include_unmodified_config=None, loop=None):
"""
Unpack package contents to a directory. This method is a coroutine.
@@ -982,7 +982,7 @@ class vardbapi(dbapi):
by QUICKPKG_DEFAULT_OPTS).
@type include_unmodified_config: bool
"""
- loop = asyncio._wrap_loop()
+ loop = asyncio._wrap_loop(loop)
if not isinstance(pkg, portage.config):
settings = self.settings
cpv = pkg
diff --git a/lib/portage/repository/storage/hardlink_quarantine.py b/lib/portage/repository/storage/hardlink_quarantine.py
index 3594cb1c9..165ab8324 100644
--- a/lib/portage/repository/storage/hardlink_quarantine.py
+++ b/lib/portage/repository/storage/hardlink_quarantine.py
@@ -39,59 +39,59 @@ class HardlinkQuarantineRepoStorage(RepoStorageInterface):
self._current_update = None
@coroutine
- def _check_call(self, cmd):
+ def _check_call(self, cmd, loop=None):
"""
Run cmd and raise RepoStorageException on failure.
@param cmd: command to executre
@type cmd: list
"""
- p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), **self._spawn_kwargs)
+ p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(loop), **self._spawn_kwargs)
p.start()
if (yield p.async_wait()) != os.EX_OK:
raise RepoStorageException('command exited with status {}: {}'.\
format(p.returncode, ' '.join(cmd)))
@coroutine
- def init_update(self):
+ def init_update(self, loop=None):
update_location = os.path.join(self._user_location, '.tmp-unverified-download-quarantine')
- yield self._check_call(['rm', '-rf', update_location])
+ yield self._check_call(['rm', '-rf', update_location], loop=loop)
# Use rsync --link-dest to hardlink a files into self._update_location,
# since cp -l is not portable.
yield self._check_call(['rsync', '-a', '--link-dest', self._user_location,
'--exclude=/distfiles', '--exclude=/local', '--exclude=/lost+found', '--exclude=/packages',
'--exclude', '/{}'.format(os.path.basename(update_location)),
- self._user_location + '/', update_location + '/'])
+ self._user_location + '/', update_location + '/'], loop=loop)
self._update_location = update_location
coroutine_return(self._update_location)
@property
- def current_update(self):
+ def current_update(self, loop=None):
if self._update_location is None:
raise RepoStorageException('current update does not exist')
return self._update_location
@coroutine
- def commit_update(self):
+ def commit_update(self, loop=None):
update_location = self.current_update
self._update_location = None
yield self._check_call(['rsync', '-a', '--delete',
'--exclude=/distfiles', '--exclude=/local', '--exclude=/lost+found', '--exclude=/packages',
'--exclude', '/{}'.format(os.path.basename(update_location)),
- update_location + '/', self._user_location + '/'])
+ update_location + '/', self._user_location + '/'], loop=loop)
- yield self._check_call(['rm', '-rf', update_location])
+ yield self._check_call(['rm', '-rf', update_location], loop=loop)
@coroutine
- def abort_update(self):
+ def abort_update(self, loop=None):
if self._update_location is not None:
update_location = self._update_location
self._update_location = None
- yield self._check_call(['rm', '-rf', update_location])
+ yield self._check_call(['rm', '-rf', update_location], loop=loop)
@coroutine
- def garbage_collection(self):
- yield self.abort_update()
+ def garbage_collection(self, loop=None):
+ yield self.abort_update(loop=loop)
diff --git a/lib/portage/repository/storage/hardlink_rcu.py b/lib/portage/repository/storage/hardlink_rcu.py
index bb2c8496b..68081494c 100644
--- a/lib/portage/repository/storage/hardlink_rcu.py
+++ b/lib/portage/repository/storage/hardlink_rcu.py
@@ -105,7 +105,7 @@ class HardlinkRcuRepoStorage(RepoStorageInterface):
self._snapshots_dir = os.path.join(self._storage_location, 'snapshots')
@coroutine
- def _check_call(self, cmd, privileged=False):
+ def _check_call(self, cmd, privileged=False, loop=None):
"""
Run cmd and raise RepoStorageException on failure.
@@ -118,16 +118,16 @@ class HardlinkRcuRepoStorage(RepoStorageInterface):
kwargs = dict(fd_pipes=self._spawn_kwargs.get('fd_pipes'))
else:
kwargs = self._spawn_kwargs
- p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(), **kwargs)
+ p = SpawnProcess(args=cmd, scheduler=asyncio._wrap_loop(loop), **kwargs)
p.start()
if (yield p.async_wait()) != os.EX_OK:
raise RepoStorageException('command exited with status {}: {}'.\
format(p.returncode, ' '.join(cmd)))
@coroutine
- def init_update(self):
+ def init_update(self, loop=None):
update_location = os.path.join(self._storage_location, 'update')
- yield self._check_call(['rm', '-rf', update_location])
+ yield self._check_call(['rm', '-rf', update_location], loop=loop)
# This assumes normal umask permissions if it doesn't exist yet.
portage.util.ensure_dirs(self._storage_location)
@@ -139,18 +139,18 @@ class HardlinkRcuRepoStorage(RepoStorageInterface):
# Use rsync --link-dest to hardlink a files into update_location,
# since cp -l is not portable.
yield self._check_call(['rsync', '-a', '--link-dest', self._latest_canonical,
- self._latest_canonical + '/', update_location + '/'])
+ self._latest_canonical + '/', update_location + '/'], loop=loop)
elif not os.path.islink(self._user_location):
- yield self._migrate(update_location)
- update_location = (yield self.init_update())
+ yield self._migrate(update_location, loop=loop)
+ update_location = (yield self.init_update(loop=loop))
self._update_location = update_location
coroutine_return(self._update_location)
@coroutine
- def _migrate(self, update_location):
+ def _migrate(self, update_location, loop=None):
"""
When repo.user_location is a normal directory, migrate it to
storage so that it can be replaced with a symlink. After migration,
@@ -164,26 +164,26 @@ class HardlinkRcuRepoStorage(RepoStorageInterface):
os.stat(self._user_location))
# It's probably on a different device, so copy it.
yield self._check_call(['rsync', '-a',
- self._user_location + '/', update_location + '/'])
+ self._user_location + '/', update_location + '/'], loop=loop)
# Remove the old copy so that symlink can be created. Run with
# maximum privileges, since removal requires write access to
# the parent directory.
- yield self._check_call(['rm', '-rf', user_location], privileged=True)
+ yield self._check_call(['rm', '-rf', user_location], privileged=True, loop=loop)
self._update_location = update_location
# Make this copy the latest snapshot
- yield self.commit_update()
+ yield self.commit_update(loop=loop)
@property
- def current_update(self):
+ def current_update(self, loop=None):
if self._update_location is None:
raise RepoStorageException('current update does not exist')
return self._update_location
@coroutine
- def commit_update(self):
+ def commit_update(self, loop=None):
update_location = self.current_update
self._update_location = None
try:
@@ -235,14 +235,14 @@ class HardlinkRcuRepoStorage(RepoStorageInterface):
yield None
@coroutine
- def abort_update(self):
+ def abort_update(self, loop=None):
if self._update_location is not None:
update_location = self._update_location
self._update_location = None
- yield self._check_call(['rm', '-rf', update_location])
+ yield self._check_call(['rm', '-rf', update_location], loop=loop)
@coroutine
- def garbage_collection(self):
+ def garbage_collection(self, loop=None):
snap_ttl = datetime.timedelta(days=self._ttl_days)
snapshots = sorted(int(name) for name in os.listdir(self._snapshots_dir))
# always preserve the latest snapshot
@@ -259,4 +259,4 @@ class HardlinkRcuRepoStorage(RepoStorageInterface):
snap_timestamp = datetime.datetime.utcfromtimestamp(st.st_mtime)
if (datetime.datetime.utcnow() - snap_timestamp) < snap_ttl:
continue
- yield self._check_call(['rm', '-rf', snap_path])
+ yield self._check_call(['rm', '-rf', snap_path], loop=loop)
diff --git a/lib/portage/repository/storage/inplace.py b/lib/portage/repository/storage/inplace.py
index f1117ad03..3dbcbd7ad 100644
--- a/lib/portage/repository/storage/inplace.py
+++ b/lib/portage/repository/storage/inplace.py
@@ -19,31 +19,31 @@ class InplaceRepoStorage(RepoStorageInterface):
self._update_location = None
@coroutine
- def init_update(self):
+ def init_update(self, loop=None):
self._update_location = self._user_location
coroutine_return(self._update_location)
yield None
@property
- def current_update(self):
+ def current_update(self, loop=None):
if self._update_location is None:
raise RepoStorageException('current update does not exist')
return self._update_location
@coroutine
- def commit_update(self):
+ def commit_update(self, loop=None):
self.current_update
self._update_location = None
coroutine_return()
yield None
@coroutine
- def abort_update(self):
+ def abort_update(self, loop=None):
self._update_location = None
coroutine_return()
yield None
@coroutine
- def garbage_collection(self):
+ def garbage_collection(self, loop=None):
coroutine_return()
yield None
diff --git a/lib/portage/repository/storage/interface.py b/lib/portage/repository/storage/interface.py
index ce8a2a170..4f5be6dbc 100644
--- a/lib/portage/repository/storage/interface.py
+++ b/lib/portage/repository/storage/interface.py
@@ -33,7 +33,7 @@ class RepoStorageInterface:
raise NotImplementedError
@coroutine
- def init_update(self):
+ def init_update(self, loop=None):
"""
Create an update directory as a destination to sync updates to.
The directory will be populated with files from the previous
@@ -50,7 +50,7 @@ class RepoStorageInterface:
raise NotImplementedError
@property
- def current_update(self):
+ def current_update(self, loop=None):
"""
Get the current update directory which would have been returned
from the most recent call to the init_update method. This raises
@@ -63,7 +63,7 @@ class RepoStorageInterface:
raise NotImplementedError
@coroutine
- def commit_update(self):
+ def commit_update(self, loop=None):
"""
Commit the current update directory, so that is becomes the
latest immutable snapshot.
@@ -71,7 +71,7 @@ class RepoStorageInterface:
raise NotImplementedError
@coroutine
- def abort_update(self):
+ def abort_update(self, loop=None):
"""
Delete the current update directory. If there was not an update
in progress, or it has already been committed, then this has
@@ -80,7 +80,7 @@ class RepoStorageInterface:
raise NotImplementedError
@coroutine
- def garbage_collection(self):
+ def garbage_collection(self, loop=None):
"""
Remove expired snapshots.
"""
diff --git a/lib/portage/sync/syncbase.py b/lib/portage/sync/syncbase.py
index 5f18e5ba3..8e83b94fb 100644
--- a/lib/portage/sync/syncbase.py
+++ b/lib/portage/sync/syncbase.py
@@ -108,7 +108,7 @@ class SyncBase:
"""
if self._repo_storage is None:
storage_cls = portage.load_mod(self._select_storage_module())
- self._repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs))
+ self._repo_storage = _sync_methods(storage_cls(self.repo, self.spawn_kwargs), loop=global_event_loop())
return self._repo_storage
@property
diff --git a/lib/portage/tests/dbapi/test_auxdb.py b/lib/portage/tests/dbapi/test_auxdb.py
index 907c289fb..1029de70d 100644
--- a/lib/portage/tests/dbapi/test_auxdb.py
+++ b/lib/portage/tests/dbapi/test_auxdb.py
@@ -63,8 +63,9 @@ class AuxdbTestCase(TestCase):
portdb = playground.trees[playground.eroot]["porttree"].dbapi
def test_func():
- return asyncio._wrap_loop().run_until_complete(self._test_mod_async(
- ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb))
+ loop = asyncio._wrap_loop()
+ return loop.run_until_complete(self._test_mod_async(
+ ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb, loop=loop))
self.assertTrue(test_func())
@@ -91,10 +92,10 @@ class AuxdbTestCase(TestCase):
self.assertEqual(auxdb[cpv]['RESTRICT'], 'test')
@coroutine
- def _test_mod_async(self, ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb):
+ def _test_mod_async(self, ebuilds, ebuild_inherited, eclass_defined_phases, eclass_depend, portdb, loop=None):
for cpv, metadata in ebuilds.items():
- defined_phases, depend, eapi, inherited = yield portdb.async_aux_get(cpv, ['DEFINED_PHASES', 'DEPEND', 'EAPI', 'INHERITED'])
+ defined_phases, depend, eapi, inherited = yield portdb.async_aux_get(cpv, ['DEFINED_PHASES', 'DEPEND', 'EAPI', 'INHERITED'], loop=loop)
self.assertEqual(defined_phases, eclass_defined_phases)
self.assertEqual(depend, eclass_depend)
self.assertEqual(eapi, metadata['EAPI'])
diff --git a/lib/portage/tests/emerge/test_simple.py b/lib/portage/tests/emerge/test_simple.py
index 94b3076c1..c24f5c603 100644
--- a/lib/portage/tests/emerge/test_simple.py
+++ b/lib/portage/tests/emerge/test_simple.py
@@ -225,10 +225,10 @@ call_has_and_best_version() {
loop = asyncio._wrap_loop()
loop.run_until_complete(asyncio.ensure_future(
- self._async_test_simple(loop, playground, metadata_xml_files), loop=loop))
+ self._async_test_simple(playground, metadata_xml_files, loop=loop), loop=loop))
@coroutine
- def _async_test_simple(self, loop, playground, metadata_xml_files):
+ def _async_test_simple(self, playground, metadata_xml_files, loop=None):
debug = playground.debug
settings = playground.settings
@@ -540,7 +540,7 @@ move dev-util/git dev-vcs/git
local_env = env
proc = yield asyncio.create_subprocess_exec(*args,
- env=local_env, stderr=None, stdout=stdout)
+ env=local_env, stderr=None, stdout=stdout, loop=loop)
if debug:
yield proc.wait()
diff --git a/lib/portage/tests/process/test_AsyncFunction.py b/lib/portage/tests/process/test_AsyncFunction.py
index 3b360e02f..b3f80b8ac 100644
--- a/lib/portage/tests/process/test_AsyncFunction.py
+++ b/lib/portage/tests/process/test_AsyncFunction.py
@@ -21,7 +21,7 @@ class AsyncFunctionTestCase(TestCase):
return ''.join(sys.stdin)
@coroutine
- def _testAsyncFunctionStdin(self, loop):
+ def _testAsyncFunctionStdin(self, loop=None):
test_string = '1\n2\n3\n'
pr, pw = os.pipe()
fd_pipes = {0:pr}
@@ -36,7 +36,7 @@ class AsyncFunctionTestCase(TestCase):
def testAsyncFunctionStdin(self):
loop = asyncio._wrap_loop()
- loop.run_until_complete(self._testAsyncFunctionStdin(loop))
+ loop.run_until_complete(self._testAsyncFunctionStdin(loop=loop))
def _test_getpid_fork(self):
"""
diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py
index 2bd94cf39..acc3b8af9 100644
--- a/lib/portage/tests/process/test_PipeLogger.py
+++ b/lib/portage/tests/process/test_PipeLogger.py
@@ -37,7 +37,7 @@ class PipeLoggerTestCase(TestCase):
# Before starting the reader, wait here for a moment, in order
# to exercise PipeLogger's handling of EAGAIN during write.
- yield asyncio.wait([writer], timeout=0.01)
+ yield asyncio.wait([writer], timeout=0.01, loop=loop)
reader = _reader(pr, loop=loop)
yield writer
diff --git a/lib/portage/tests/util/futures/asyncio/test_child_watcher.py b/lib/portage/tests/util/futures/asyncio/test_child_watcher.py
index 8a8fb3d4f..cd547f008 100644
--- a/lib/portage/tests/util/futures/asyncio/test_child_watcher.py
+++ b/lib/portage/tests/util/futures/asyncio/test_child_watcher.py
@@ -38,7 +38,7 @@ class ChildWatcherTestCase(TestCase):
future.set_result((pid, returncode, args))
@coroutine
- def watch_pid():
+ def watch_pid(loop=None):
with asyncio.get_child_watcher() as watcher:
pids = spawn([true_binary], returnpid=True)
@@ -47,7 +47,7 @@ class ChildWatcherTestCase(TestCase):
(yield future),
(pids[0], os.EX_OK, args_tuple))
- loop.run_until_complete(watch_pid())
+ loop.run_until_complete(watch_pid(loop=loop))
finally:
asyncio.set_event_loop_policy(initial_policy)
if loop not in (None, global_event_loop()):
diff --git a/lib/portage/tests/util/futures/test_compat_coroutine.py b/lib/portage/tests/util/futures/test_compat_coroutine.py
index 5a8230432..0fd459cbf 100644
--- a/lib/portage/tests/util/futures/test_compat_coroutine.py
+++ b/lib/portage/tests/util/futures/test_compat_coroutine.py
@@ -14,12 +14,13 @@ class CompatCoroutineTestCase(TestCase):
def test_returning_coroutine(self):
@coroutine
- def returning_coroutine():
- yield asyncio.sleep(0)
+ def returning_coroutine(loop=None):
+ yield asyncio.sleep(0, loop=loop)
coroutine_return('success')
+ loop = asyncio.get_event_loop()
self.assertEqual('success',
- asyncio.get_event_loop().run_until_complete(returning_coroutine()))
+ asyncio.get_event_loop().run_until_complete(returning_coroutine(loop=loop)))
def test_raising_coroutine(self):
@@ -27,12 +28,13 @@ class CompatCoroutineTestCase(TestCase):
pass
@coroutine
- def raising_coroutine():
- yield asyncio.sleep(0)
+ def raising_coroutine(loop=None):
+ yield asyncio.sleep(0, loop=loop)
raise TestException('exception')
+ loop = asyncio.get_event_loop()
self.assertRaises(TestException,
- asyncio.get_event_loop().run_until_complete, raising_coroutine())
+ loop.run_until_complete, raising_coroutine(loop=loop))
def test_catching_coroutine(self):
@@ -109,17 +111,18 @@ class CompatCoroutineTestCase(TestCase):
yield future
loop = asyncio.get_event_loop()
- future = loop.run_until_complete(asyncio.wait([cancelled_future_coroutine()]))[0].pop()
+ future = loop.run_until_complete(asyncio.wait([cancelled_future_coroutine(loop=loop)], loop=loop))[0].pop()
self.assertTrue(future.cancelled())
def test_yield_expression_result(self):
@coroutine
- def yield_expression_coroutine():
+ def yield_expression_coroutine(loop=None):
for i in range(3):
- x = yield asyncio.sleep(0, result=i)
+ x = yield asyncio.sleep(0, result=i, loop=loop)
self.assertEqual(x, i)
- asyncio.get_event_loop().run_until_complete(yield_expression_coroutine())
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(yield_expression_coroutine(loop=loop))
def test_method_coroutine(self):
@@ -144,7 +147,7 @@ class CompatCoroutineTestCase(TestCase):
return waiter
@coroutine
- def read(self):
+ def read(self, loop=None):
while self._value is self._empty:
yield self._wait()
@@ -154,7 +157,7 @@ class CompatCoroutineTestCase(TestCase):
coroutine_return(value)
@coroutine
- def write(self, value):
+ def write(self, value, loop=None):
while self._value is not self._empty:
yield self._wait()
@@ -162,16 +165,16 @@ class CompatCoroutineTestCase(TestCase):
self._notify()
@coroutine
- def writer_coroutine(cubby, values, sentinel):
+ def writer_coroutine(cubby, values, sentinel, loop=None):
for value in values:
- yield cubby.write(value)
- yield cubby.write(sentinel)
+ yield cubby.write(value, loop=loop)
+ yield cubby.write(sentinel, loop=loop)
@coroutine
- def reader_coroutine(cubby, sentinel):
+ def reader_coroutine(cubby, sentinel, loop=None):
results = []
while True:
- result = yield cubby.read()
+ result = yield cubby.read(loop=loop)
if result == sentinel:
break
results.append(result)
@@ -180,9 +183,9 @@ class CompatCoroutineTestCase(TestCase):
loop = asyncio.get_event_loop()
cubby = Cubby(loop)
values = list(range(3))
- writer = asyncio.ensure_future(writer_coroutine(cubby, values, None), loop=loop)
- reader = asyncio.ensure_future(reader_coroutine(cubby, None), loop=loop)
- loop.run_until_complete(asyncio.wait([writer, reader]))
+ writer = asyncio.ensure_future(writer_coroutine(cubby, values, None, loop=loop), loop=loop)
+ reader = asyncio.ensure_future(reader_coroutine(cubby, None, loop=loop), loop=loop)
+ loop.run_until_complete(asyncio.wait([writer, reader], loop=loop))
self.assertEqual(reader.result(), values)
@@ -191,7 +194,7 @@ class CompatCoroutineTestCase(TestCase):
# blend with synchronous code.
sync_cubby = _sync_methods(cubby, loop=loop)
sync_reader = _sync_decorator(reader_coroutine, loop=loop)
- writer = asyncio.ensure_future(writer_coroutine(cubby, values, None), loop=loop)
+ writer = asyncio.ensure_future(writer_coroutine(cubby, values, None, loop=loop), loop=loop)
self.assertEqual(sync_reader(cubby, None), values)
self.assertTrue(writer.done())
diff --git a/lib/portage/tests/util/test_socks5.py b/lib/portage/tests/util/test_socks5.py
index ca32651a7..44d522013 100644
--- a/lib/portage/tests/util/test_socks5.py
+++ b/lib/portage/tests/util/test_socks5.py
@@ -185,7 +185,7 @@ class Socks5ServerTestCase(TestCase):
}
proxy = socks5.get_socks5_proxy(settings)
- loop.run_until_complete(socks5.proxy.ready())
+ loop.run_until_complete(socks5.proxy.ready(loop=loop))
result = loop.run_until_complete(loop.run_in_executor(None,
self._fetch_via_proxy, proxy, host, server.server_port, path))
diff --git a/lib/portage/util/_async/BuildLogger.py b/lib/portage/util/_async/BuildLogger.py
index f25f70d5b..5a9c076b6 100644
--- a/lib/portage/util/_async/BuildLogger.py
+++ b/lib/portage/util/_async/BuildLogger.py
@@ -78,11 +78,11 @@ class BuildLogger(AsynchronousTask):
pipe_logger.start()
self._main_task_cancel = functools.partial(self._main_cancel, filter_proc, pipe_logger)
- self._main_task = asyncio.ensure_future(self._main(filter_proc, pipe_logger), loop=self.scheduler)
+ self._main_task = asyncio.ensure_future(self._main(filter_proc, pipe_logger, loop=self.scheduler), loop=self.scheduler)
self._main_task.add_done_callback(self._main_exit)
@coroutine
- def _main(self, filter_proc, pipe_logger):
+ def _main(self, filter_proc, pipe_logger, loop=None):
try:
if pipe_logger.poll() is None:
yield pipe_logger.async_wait()
diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
index eb01a6232..3c9c6e22b 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -47,7 +47,7 @@ class ForkProcess(SpawnProcess):
os.close(stdin_dup)
self._proc_join_task = asyncio.ensure_future(
- self._proc_join(self._proc))
+ self._proc_join(self._proc, loop=self.scheduler), loop=self.scheduler)
self._proc_join_task.add_done_callback(
functools.partial(self._proc_join_done, self._proc))
@@ -68,7 +68,7 @@ class ForkProcess(SpawnProcess):
super(ForkProcess, self)._async_waitpid()
@coroutine
- def _proc_join(self, proc):
+ def _proc_join(self, proc, loop=None):
sentinel_reader = self.scheduler.create_future()
self.scheduler.add_reader(proc.sentinel,
lambda: sentinel_reader.done() or sentinel_reader.set_result(None))
@@ -93,7 +93,7 @@ class ForkProcess(SpawnProcess):
proc.join(0)
if proc.exitcode is not None:
break
- yield asyncio.sleep(self._proc_join_interval)
+ yield asyncio.sleep(self._proc_join_interval, loop=loop)
def _proc_join_done(self, proc, future):
future.cancelled() or future.result()
diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py
index 2bbdd3ddb..e8203268c 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -53,7 +53,7 @@ class PipeLogger(AbstractPollTask):
fcntl.fcntl(fd, fcntl.F_SETFL,
fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
- self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd), loop=self.scheduler)
+ self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd, loop=self.scheduler), loop=self.scheduler)
self._io_loop_task.add_done_callback(self._io_loop_done)
self._registered = True
@@ -63,7 +63,7 @@ class PipeLogger(AbstractPollTask):
self.returncode = self._cancelled_returncode
@coroutine
- def _io_loop(self, input_file):
+ def _io_loop(self, input_file, loop=None):
background = self.background
stdout_fd = self.stdout_fd
log_file = self._log_file
diff --git a/lib/portage/util/_async/SchedulerInterface.py b/lib/portage/util/_async/SchedulerInterface.py
index 3ff250d1d..2865266eb 100644
--- a/lib/portage/util/_async/SchedulerInterface.py
+++ b/lib/portage/util/_async/SchedulerInterface.py
@@ -57,7 +57,7 @@ class SchedulerInterface(SlotObject):
@coroutine
def async_output(self, msg, log_file=None, background=None,
- level=0, noiselevel=-1):
+ level=0, noiselevel=-1, loop=None):
"""
Output a msg to stdio (if not in background) and to a log file
if provided.
@@ -81,7 +81,7 @@ class SchedulerInterface(SlotObject):
writemsg_level(msg, level=level, noiselevel=noiselevel)
if log_file is not None:
- yield _writer(log_file, _unicode_encode(msg))
+ yield _writer(log_file, _unicode_encode(msg), loop=loop)
def output(self, msg, log_path=None, background=None,
level=0, noiselevel=-1):
diff --git a/lib/portage/util/futures/_asyncio/process.py b/lib/portage/util/futures/_asyncio/process.py
index 6ff156c9d..275c9031a 100644
--- a/lib/portage/util/futures/_asyncio/process.py
+++ b/lib/portage/util/futures/_asyncio/process.py
@@ -39,7 +39,7 @@ class _Process:
return self._proc.returncode
@coroutine
- def communicate(self, input=None): # pylint: disable=redefined-builtin
+ def communicate(self, input=None, loop=None): # pylint: disable=redefined-builtin
"""
Read data from stdout and stderr, until end-of-file is reached.
Wait for process to terminate.
@@ -49,13 +49,14 @@ class _Process:
@return: tuple (stdout_data, stderr_data)
@rtype: asyncio.Future (or compatible)
"""
+ loop = asyncio._wrap_loop(loop or self._loop)
futures = []
for input_file in (self._proc.stdout, self._proc.stderr):
if input_file is None:
- future = self._loop.create_future()
+ future = loop.create_future()
future.set_result(None)
else:
- future = _reader(input_file, loop=self._loop)
+ future = _reader(input_file, loop=loop)
futures.append(future)
writer = None
@@ -65,11 +66,11 @@ class _Process:
stdin = self._proc.stdin
stdin = os.fdopen(stdin, 'wb', 0) if isinstance(stdin, int) else stdin
_set_nonblocking(stdin.fileno())
- writer = asyncio.ensure_future(_writer(stdin, input, loop=self._loop), loop=self._loop)
+ writer = asyncio.ensure_future(_writer(stdin, input, loop=loop), loop=loop)
writer.add_done_callback(lambda writer: stdin.close())
try:
- yield asyncio.wait(futures + [self.wait()], loop=self._loop)
+ yield asyncio.wait(futures + [self.wait(loop=loop)], loop=loop)
finally:
if writer is not None:
if writer.done():
@@ -84,14 +85,15 @@ class _Process:
coroutine_return(tuple(future.result() for future in futures))
- def wait(self):
+ def wait(self, loop=None):
"""
Wait for child process to terminate. Set and return returncode attribute.
@return: returncode
@rtype: asyncio.Future (or compatible)
"""
- waiter = self._loop.create_future()
+ loop = asyncio._wrap_loop(loop or self._loop)
+ waiter = loop.create_future()
if self.returncode is None:
self._waiters.append(waiter)
waiter.add_done_callback(self._waiter_cancel)
diff --git a/lib/portage/util/futures/_sync_decorator.py b/lib/portage/util/futures/_sync_decorator.py
index 02a0963a7..3da065789 100644
--- a/lib/portage/util/futures/_sync_decorator.py
+++ b/lib/portage/util/futures/_sync_decorator.py
@@ -15,9 +15,10 @@ def _sync_decorator(func, loop=None):
function that returns a Future) with a wrapper that runs the function
synchronously.
"""
- loop = asyncio._wrap_loop(loop)
@functools.wraps(func)
def wrapper(*args, **kwargs):
+ nonlocal loop
+ loop = kwargs['loop'] = asyncio._wrap_loop(kwargs.get('loop') or loop)
return loop.run_until_complete(func(*args, **kwargs))
return wrapper
diff --git a/lib/portage/util/futures/compat_coroutine.py b/lib/portage/util/futures/compat_coroutine.py
index 79bd0da68..9a0c5c1c8 100644
--- a/lib/portage/util/futures/compat_coroutine.py
+++ b/lib/portage/util/futures/compat_coroutine.py
@@ -67,7 +67,12 @@ def _generator_future(generator_func, *args, **kwargs):
keyword argument named 'loop' is given, then it is used instead of
the default event loop.
"""
- loop = asyncio._wrap_loop(kwargs.get('loop'))
+ loop = kwargs.get('loop')
+ if loop is None and portage._internal_caller:
+ # Require an explicit loop parameter, in order to support
+ # local event loops (bug 737698).
+ raise AssertionError("Missing required argument 'loop'")
+ loop = asyncio._wrap_loop(loop)
result = loop.create_future()
_GeneratorTask(generator_func(*args, **kwargs), result, loop=loop)
return result
diff --git a/lib/portage/util/socks5.py b/lib/portage/util/socks5.py
index 65d2400e8..9f22c1dbe 100644
--- a/lib/portage/util/socks5.py
+++ b/lib/portage/util/socks5.py
@@ -76,7 +76,7 @@ class ProxyManager:
@coroutine
- def ready(self):
+ def ready(self, loop=None):
"""
Wait for the proxy socket to become ready. This method is a coroutine.
"""
@@ -98,7 +98,7 @@ class ProxyManager:
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
- yield asyncio.sleep(0.2)
+ yield asyncio.sleep(0.2, loop=loop)
else:
break
finally:
diff --git a/repoman/lib/repoman/modules/scan/depend/profile.py b/repoman/lib/repoman/modules/scan/depend/profile.py
index 1eb69422a..468bc55e2 100644
--- a/repoman/lib/repoman/modules/scan/depend/profile.py
+++ b/repoman/lib/repoman/modules/scan/depend/profile.py
@@ -146,7 +146,7 @@ class ProfileDependsChecks(ScanBase):
% (ebuild.relative_path, mytype, ", ".join(sorted(atoms))))
@coroutine
- def _task(self, task):
+ def _task(self, task, loop=None):
yield task.future
coroutine_return((task, task.future.result()))
@@ -222,7 +222,7 @@ class ProfileDependsChecks(ScanBase):
yield (task, target())
else:
task.future = asyncio.ensure_future(loop.run_in_executor(executor, target), loop=loop)
- yield self._task(task)
+ yield self._task(task, loop=loop)
def _task_subprocess(self, task, pkg, dep_settings):
^ permalink raw reply related [flat|nested] only message in thread
only message in thread, other threads:[~2020-08-19 4:47 UTC | newest]
Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2020-08-19 4:47 [gentoo-commits] proj/portage:master commit in: lib/portage/tests/process/, lib/portage/util/futures/_asyncio/, Zac Medico
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox