From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from lists.gentoo.org (pigeon.gentoo.org [208.92.234.80]) by finch.gentoo.org (Postfix) with ESMTP id A43ED1386F3 for ; Thu, 13 Aug 2015 08:35:11 +0000 (UTC) Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id 5A32BE086D; Thu, 13 Aug 2015 08:35:09 +0000 (UTC) Received: from smtp.gentoo.org (smtp.gentoo.org [140.211.166.183]) (using TLSv1.2 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by pigeon.gentoo.org (Postfix) with ESMTPS id AE526E0853 for ; Thu, 13 Aug 2015 08:35:08 +0000 (UTC) Received: from localhost.localdomain (ip174-67-205-96.oc.oc.cox.net [174.67.205.96]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-SHA256 (128/128 bits)) (No client certificate requested) (Authenticated sender: zmedico) by smtp.gentoo.org (Postfix) with ESMTPSA id 8E5E13408F9; Thu, 13 Aug 2015 08:35:07 +0000 (UTC) From: Zac Medico To: gentoo-portage-dev@lists.gentoo.org Cc: Zac Medico Subject: [gentoo-portage-dev] [PATCH] sync repositories in parallel (bug 557426) Date: Thu, 13 Aug 2015 01:34:36 -0700 Message-Id: <1439454876-23070-1-git-send-email-zmedico@gentoo.org> X-Mailer: git-send-email 2.4.6 Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-portage-dev@lists.gentoo.org Reply-to: gentoo-portage-dev@lists.gentoo.org X-Archives-Salt: c6cbcd10-c68b-49b9-a18d-1cf0137820a5 X-Archives-Hash: 93cfc4b36abc6bab56daec15cdfb93ba Repos are synced in parallel (includes their post-sync hooks). Output of concurrent processes is currently mixed (irrelevant with --quiet). Support for FEATURES=metadata-transfer is handled in the main process, which may be required for some backends (such as sqlite). Repos are synced only after their master(s) have synced (in case that matters for hooks). X-Gentoo-Bug: 557426 X-Gentoo-Bug-url: https://bugs.gentoo.org/show_bug.cgi?id=557426 --- pym/portage/emaint/modules/sync/sync.py | 125 ++++++++++++++++++++++++++++-- pym/portage/sync/controller.py | 31 ++++++-- pym/portage/tests/sync/test_sync_local.py | 6 +- pym/portage/util/_async/AsyncFunction.py | 62 +++++++++++++++ 4 files changed, 210 insertions(+), 14 deletions(-) create mode 100644 pym/portage/util/_async/AsyncFunction.py diff --git a/pym/portage/emaint/modules/sync/sync.py b/pym/portage/emaint/modules/sync/sync.py index b463073..8a28f17 100644 --- a/pym/portage/emaint/modules/sync/sync.py +++ b/pym/portage/emaint/modules/sync/sync.py @@ -13,6 +13,10 @@ from portage.output import bold, red, create_color_func from portage._global_updates import _global_updates from portage.sync.controller import SyncManager from portage.util import writemsg_level +from portage.util.digraph import digraph +from portage.util._async.AsyncScheduler import AsyncScheduler +from portage.util._eventloop.global_event_loop import global_event_loop +from portage.util._eventloop.EventLoop import EventLoop import _emerge from _emerge.emergelog import emergelog @@ -201,6 +205,7 @@ class SyncRepos(object): k = "--" + k.replace("_", "-") self.emerge_config.opts[k] = v + selected_repos = [repo for repo in selected_repos if repo.sync_type is not None] msgs = [] if not selected_repos: msgs.append("Emaint sync, nothing to sync... returning") @@ -213,13 +218,17 @@ class SyncRepos(object): sync_manager = SyncManager( self.emerge_config.target_config.settings, emergelog) - retvals = [] - for repo in selected_repos: - if repo.sync_type is not None: - returncode, message = sync_manager.sync(self.emerge_config, repo) - retvals.append((repo.name, returncode)) - if message: - msgs.append(message) + + sync_scheduler = SyncScheduler(emerge_config=self.emerge_config, + selected_repos=selected_repos, sync_manager=sync_manager, + max_jobs=self.emerge_config.opts.get('--jobs', 1), + event_loop=global_event_loop() if portage._internal_caller else + EventLoop(main=False)) + + sync_scheduler.start() + sync_scheduler.wait() + retvals = sync_scheduler.retvals + msgs.extend(sync_scheduler.msgs) # Reload the whole config. portage._sync_mode = False @@ -287,3 +296,105 @@ class SyncRepos(object): messages.append("Action: %s for repo: %s, returned code = %s" % (action, rval[0], rval[1])) return messages + + +class SyncScheduler(AsyncScheduler): + ''' + Sync repos in parallel, but don't sync a given repo until all + of it's masters have synced. + ''' + def __init__(self, **kwargs): + ''' + @param emerge_config: an emerge_config instance + @param selected_repos: list of RepoConfig instances + @param sync_manager: a SyncManger instance + ''' + self._emerge_config = kwargs.pop('emerge_config') + self._selected_repos = kwargs.pop('selected_repos') + self._sync_manager = kwargs.pop('sync_manager') + AsyncScheduler.__init__(self, **kwargs) + self._init_graph() + self._leaf_nodes = self._sync_graph.leaf_nodes() + self.retvals = [] + self.msgs = [] + + def _init_graph(self): + ''' + Graph relationships between repos and their masters. + ''' + self._sync_graph = digraph() + self._repo_map = {} + self._running_repos = set() + for repo in self._selected_repos: + self._repo_map[repo.name] = repo + self._sync_graph.add(repo.name, None) + for master in repo.masters: + self._repo_map[master.name] = master + self._sync_graph.add(master.name, repo.name) + + def _task_exit(self, task): + ''' + Remove the task from the graph, in order to expose + more leaf nodes. + ''' + self._running_tasks.discard(task) + returncode = task.returncode + if task.returncode == os.EX_OK: + returncode, message, updatecache_flg = task.result + if message: + self.msgs.append(message) + repo = task.kwargs['repo'].name + self._running_repos.remove(repo) + self.retvals.append((repo, returncode)) + self._sync_graph.remove(repo) + self._update_leaf_nodes() + super(SyncScheduler, self)._task_exit(self) + + def _update_leaf_nodes(self): + ''' + Populate self._leaf_nodes with current leaves from + self._sync_graph. If a circular master relationship + is discovered, choose a random node to break the cycle. + ''' + if self._sync_graph and not self._leaf_nodes: + self._leaf_nodes = [obj for obj in + self._sync_graph.leaf_nodes() + if obj not in self._running_repos] + + if not (self._leaf_nodes or self._running_repos): + # If there is a circular master relationship, + # choose a random node to break the cycle. + self._leaf_nodes = [next(iter(self._sync_graph))] + + def _next_task(self): + ''' + Return a task for the next available leaf node. + ''' + if not self._sync_graph: + raise StopIteration() + # If self._sync_graph is non-empty, then self._leaf_nodes + # is guaranteed to be non-empty, since otherwise + # _can_add_job would have returned False and prevented + # _next_task from being immediately called. + node = self._leaf_nodes.pop() + self._running_repos.add(node) + self._update_leaf_nodes() + + task = self._sync_manager.async( + self._emerge_config, self._repo_map[node]) + return task + + def _can_add_job(self): + ''' + Returns False if there are no leaf nodes available. + ''' + if not AsyncScheduler._can_add_job(self): + return False + return bool(self._leaf_nodes) and not self._terminated.is_set() + + def _keep_scheduling(self): + ''' + Schedule as long as the graph is non-empty, and we haven't + been terminated. + ''' + return bool(self._sync_graph) and not self._terminated.is_set() diff --git a/pym/portage/sync/controller.py b/pym/portage/sync/controller.py index 307487f..e992cc4 100644 --- a/pym/portage/sync/controller.py +++ b/pym/portage/sync/controller.py @@ -21,6 +21,7 @@ bad = create_color_func("BAD") warn = create_color_func("WARN") from portage.package.ebuild.doebuild import _check_temp_dir from portage.metadata import action_metadata +from portage.util._async.AsyncFunction import AsyncFunction from portage import OrderedDict from portage import _unicode_decode from portage import util @@ -113,12 +114,18 @@ class SyncManager(object): return desc return [] + def async(self, emerge_config=None, repo=None): + proc = AsyncFunction(target=self.sync, + kwargs=dict(emerge_config=emerge_config, repo=repo)) + proc.addExitListener(self._sync_callback) + return proc - def sync(self, emerge_config=None, repo=None, callback=None): + def sync(self, emerge_config=None, repo=None): self.emerge_config = emerge_config - self.callback = callback or self._sync_callback + self.callback = None self.repo = repo self.exitcode = 1 + self.updatecache_flg = False if repo.sync_type in self.module_names: tasks = [self.module_controller.get_class(repo.sync_type)] else: @@ -149,13 +156,14 @@ class SyncManager(object): self.perform_post_sync_hook(repo.name, repo.sync_uri, repo.location) - return self.exitcode, None + return self.exitcode, None, self.updatecache_flg def do_callback(self, result): #print("result:", result, "callback()", self.callback) exitcode, updatecache_flg = result self.exitcode = exitcode + self.updatecache_flg = updatecache_flg if exitcode == 0: msg = "=== Sync completed for %s" % self.repo.name self.logger(self.xterm_titles, msg) @@ -310,17 +318,28 @@ class SyncManager(object): os.umask(0o022) return os.EX_OK + def _sync_callback(self, proc): + """ + This is called in the parent process, serially, for each of the + sync jobs when they complete. Some cache backends such as sqlite + may require that cache access be performed serially in the + parent process like this. + """ + repo = proc.kwargs['repo'] + exitcode = proc.returncode + updatecache_flg = False + if proc.returncode == os.EX_OK: + exitcode, message, updatecache_flg = proc.result - def _sync_callback(self, exitcode, updatecache_flg): if updatecache_flg and "metadata-transfer" not in self.settings.features: updatecache_flg = False if updatecache_flg and \ os.path.exists(os.path.join( - self.repo.location, 'metadata', 'md5-cache')): + repo.location, 'metadata', 'md5-cache')): # Only update cache for repo.location since that's # the only one that's been synced here. action_metadata(self.settings, self.portdb, self.emerge_config.opts, - porttrees=[self.repo.location]) + porttrees=[repo.location]) diff --git a/pym/portage/tests/sync/test_sync_local.py b/pym/portage/tests/sync/test_sync_local.py index f50caba..7753a26 100644 --- a/pym/portage/tests/sync/test_sync_local.py +++ b/pym/portage/tests/sync/test_sync_local.py @@ -55,8 +55,12 @@ class SyncLocalTestCase(TestCase): "dev-libs/A-0": {} } + user_config = { + 'make.conf': ('FEATURES="metadata-transfer"',) + } + playground = ResolverPlayground(ebuilds=ebuilds, - profile=profile, user_config={}, debug=debug) + profile=profile, user_config=user_config, debug=debug) settings = playground.settings eprefix = settings["EPREFIX"] eroot = settings["EROOT"] diff --git a/pym/portage/util/_async/AsyncFunction.py b/pym/portage/util/_async/AsyncFunction.py new file mode 100644 index 0000000..fda1fe0 --- /dev/null +++ b/pym/portage/util/_async/AsyncFunction.py @@ -0,0 +1,62 @@ +# Copyright 2015 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import pickle +import traceback + +from portage import os +from portage.util._async.ForkProcess import ForkProcess +from _emerge.PipeReader import PipeReader + +class AsyncFunction(ForkProcess): + """ + Execute a function call in a fork, and retrieve the function + return value via pickling/unpickling, accessible as the + "result" attribute after the forked process has exited. + """ + + __slots__ = ('args', 'kwargs', 'result', 'target', + '_async_func_reader', '_async_func_reader_pw') + + def _start(self): + pr, pw = os.pipe() + self.fd_pipes = {} + self.fd_pipes[pw] = pw + self._async_func_reader_pw = pw + self._async_func_reader = PipeReader( + input_files={"input":pr}, + scheduler=self.scheduler) + self._async_func_reader.addExitListener(self._async_func_reader_exit) + self._async_func_reader.start() + ForkProcess._start(self) + os.close(pw) + + def _run(self): + try: + result = self.target(*(self.args or []), **(self.kwargs or {})) + os.write(self._async_func_reader_pw, pickle.dumps(result)) + except Exception: + traceback.print_exc() + return 1 + + return os.EX_OK + + def _pipe_logger_exit(self, pipe_logger): + # Ignore this event, since we want to ensure that we exit + # only after _async_func_reader_exit has reached EOF. + self._pipe_logger = None + + def _async_func_reader_exit(self, pipe_reader): + self.result = pickle.loads(pipe_reader.getvalue()) + self._async_func_reader = None + self._unregister() + self.wait() + + def _unregister(self): + ForkProcess._unregister(self) + + pipe_reader = self._async_func_reader + if pipe_reader is not None: + self._async_func_reader = None + pipe_reader.removeExitListener(self._async_func_reader_exit) + pipe_reader.cancel() -- 2.4.6