From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from lists.gentoo.org (pigeon.gentoo.org [208.92.234.80]) by finch.gentoo.org (Postfix) with ESMTP id 91AF959CA3 for ; Mon, 14 Mar 2016 00:24:16 +0000 (UTC) Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id EB1F5E0826; Mon, 14 Mar 2016 00:24:13 +0000 (UTC) Received: from smtp.gentoo.org (smtp.gentoo.org [140.211.166.183]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by pigeon.gentoo.org (Postfix) with ESMTPS id 44F0AE0825 for ; Mon, 14 Mar 2016 00:24:13 +0000 (UTC) Received: from localhost.localdomain (ip68-5-185-102.oc.oc.cox.net [68.5.185.102]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-SHA256 (128/128 bits)) (No client certificate requested) (Authenticated sender: zmedico) by smtp.gentoo.org (Postfix) with ESMTPSA id DCA39335DED; Mon, 14 Mar 2016 00:24:11 +0000 (UTC) From: Zac Medico To: gentoo-portage-dev@lists.gentoo.org Cc: Zac Medico Subject: [gentoo-portage-dev] [PATCH] dblink: add locks for parallel-install with blockers (bug 576888) Date: Sun, 13 Mar 2016 17:23:07 -0700 Message-Id: <1457914987-3048-1-git-send-email-zmedico@gentoo.org> X-Mailer: git-send-email 2.7.2 Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-portage-dev@lists.gentoo.org Reply-to: gentoo-portage-dev@lists.gentoo.org X-Archives-Salt: 6228a31e-465e-439d-8e9e-3034ee56c94c X-Archives-Hash: 8687ecc13cc170a7b75c1da34a15d91d For parallel-install, lock package slots of the current package and blocked packages, in order to account for blocked packages being removed or replaced concurrently. Acquire locks in predictable order, preventing deadlocks with competitors that may be trying to acquire overlapping locks. X-Gentoo-Bug: 576888 X-Gentoo-Bug-url: https://bugs.gentoo.org/show_bug.cgi?id=576888 --- pym/_emerge/Scheduler.py | 9 +- pym/portage/dbapi/vartree.py | 99 +++++++++++- .../emerge/test_emerge_blocker_file_collision.py | 168 +++++++++++++++++++++ 3 files changed, 267 insertions(+), 9 deletions(-) create mode 100644 pym/portage/tests/emerge/test_emerge_blocker_file_collision.py diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py index 20a4e85..97b826a 100644 --- a/pym/_emerge/Scheduler.py +++ b/pym/_emerge/Scheduler.py @@ -586,18 +586,15 @@ class Scheduler(PollScheduler): blocker_db = self._blocker_db[new_pkg.root] - blocker_dblinks = [] + blocked_pkgs = [] for blocking_pkg in blocker_db.findInstalledBlockers(new_pkg): if new_pkg.slot_atom == blocking_pkg.slot_atom: continue if new_pkg.cpv == blocking_pkg.cpv: continue - blocker_dblinks.append(portage.dblink( - blocking_pkg.category, blocking_pkg.pf, blocking_pkg.root, - self.pkgsettings[blocking_pkg.root], treetype="vartree", - vartree=self.trees[blocking_pkg.root]["vartree"])) + blocked_pkgs.append(blocking_pkg) - return blocker_dblinks + return blocked_pkgs def _generate_digests(self): """ diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py index e7effca..6209a86 100644 --- a/pym/portage/dbapi/vartree.py +++ b/pym/portage/dbapi/vartree.py @@ -168,6 +168,7 @@ class vardbapi(dbapi): self._conf_mem_file = self._eroot + CONFIG_MEMORY_FILE self._fs_lock_obj = None self._fs_lock_count = 0 + self._slot_locks = {} if vartree is None: vartree = portage.db[settings['EROOT']]['vartree'] @@ -284,6 +285,38 @@ class vardbapi(dbapi): self._fs_lock_obj = None self._fs_lock_count -= 1 + def _slot_lock(self, slot_atom): + """ + Acquire a slot lock (reentrant). + + WARNING: The varbapi._slot_lock method is not safe to call + in the main process when that process is scheduling + install/uninstall tasks in parallel, since the locks would + be inherited by child processes. In order to avoid this sort + of problem, this method should be called in a subprocess + (typically spawned by the MergeProcess class). + """ + lock, counter = self._slot_locks.get(slot_atom, (None, 0)) + if lock is None: + lock_path = self.getpath("%s:%s" % (slot_atom.cp, slot_atom.slot)) + ensure_dirs(os.path.dirname(lock_path)) + lock = lockfile(lock_path, wantnewlockfile=True) + self._slot_locks[slot_atom] = (lock, counter + 1) + + def _slot_unlock(self, slot_atom): + """ + Release a slot lock (or decrementing recursion level). + """ + lock, counter = self._slot_locks.get(slot_atom, (None, 0)) + if lock is None: + raise AssertionError("not locked") + counter -= 1 + if counter == 0: + unlockfile(lock) + del self._slot_locks[slot_atom] + else: + self._slot_locks[slot_atom] = (lock, counter) + def _bump_mtime(self, cpv): """ This is called before an after any modifications, so that consumers @@ -1590,6 +1623,7 @@ class dblink(object): # compliance with RESTRICT=preserve-libs. self._preserve_libs = "preserve-libs" in mysettings.features self._contents = ContentsCaseSensitivityManager(self) + self._slot_locks = [] def __hash__(self): return hash(self._hash_key) @@ -1623,6 +1657,58 @@ class dblink(object): def unlockdb(self): self.vartree.dbapi.unlock() + def _slot_locked(f): + """ + A decorator function which, when parallel-install is enabled, + acquires and releases slot locks for the current package and + blocked packages. This is required in order to account for + interactions with blocked packages (involving resolution of + file collisions). + """ + def wrapper(self, *args, **kwargs): + if "parallel-install" in self.settings.features: + self._acquire_slot_locks( + kwargs.get("mydbapi", self.vartree.dbapi)) + try: + return f(self, *args, **kwargs) + finally: + self._release_slot_locks() + return wrapper + + def _acquire_slot_locks(self, db): + """ + Acquire slot locks for the current package and blocked packages. + """ + + slot_atoms = [] + + try: + slot = self.mycpv.slot + except AttributeError: + slot, = db.aux_get(self.mycpv, ["SLOT"]) + slot = slot.partition("/")[0] + + slot_atoms.append(portage.dep.Atom( + "%s:%s" % (self.mycpv.cp, slot))) + + for blocker in self._blockers or []: + slot_atoms.append(blocker.slot_atom) + + # Sort atoms so that locks are acquired in a predictable + # order, preventing deadlocks with competitors that may + # be trying to acquire overlapping locks. + slot_atoms.sort() + for slot_atom in slot_atoms: + self.vartree.dbapi._slot_lock(slot_atom) + self._slot_locks.append(slot_atom) + + def _release_slot_locks(self): + """ + Release all slot locks. + """ + while self._slot_locks: + self.vartree.dbapi._slot_unlock(self._slot_locks.pop()) + def getpath(self): "return path to location of db information (for >>> informational display)" return self.dbdir @@ -1863,6 +1949,7 @@ class dblink(object): plib_registry.unlock() self.vartree.dbapi._fs_unlock() + @_slot_locked def unmerge(self, pkgfiles=None, trimworld=None, cleanup=True, ldpath_mtimes=None, others_in_slot=None, needed=None, preserve_paths=None): @@ -3929,9 +4016,14 @@ class dblink(object): prepare_build_dirs(settings=self.settings, cleanup=cleanup) # check for package collisions - blockers = self._blockers - if blockers is None: - blockers = [] + blockers = [] + for blocker in self._blockers or []: + blocker = self.vartree.dbapi._dblink(blocker.cpv) + # It may have been unmerged before lock(s) + # were aquired. + if blocker.exists(): + blockers.append(blocker) + collisions, dirs_ro, symlink_collisions, plib_collisions = \ self._collision_protect(srcroot, destroot, others_in_slot + blockers, filelist, linklist) @@ -4993,6 +5085,7 @@ class dblink(object): else: proc.wait() + @_slot_locked def merge(self, mergeroot, inforoot, myroot=None, myebuild=None, cleanup=0, mydbapi=None, prev_mtimes=None, counter=None): """ diff --git a/pym/portage/tests/emerge/test_emerge_blocker_file_collision.py b/pym/portage/tests/emerge/test_emerge_blocker_file_collision.py new file mode 100644 index 0000000..10d09d8 --- /dev/null +++ b/pym/portage/tests/emerge/test_emerge_blocker_file_collision.py @@ -0,0 +1,168 @@ +# Copyright 2016 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import subprocess +import sys + +import portage +from portage import os +from portage import _unicode_decode +from portage.const import PORTAGE_PYM_PATH, USER_CONFIG_PATH +from portage.process import find_binary +from portage.tests import TestCase +from portage.tests.resolver.ResolverPlayground import ResolverPlayground +from portage.util import ensure_dirs + +class BlockerFileCollisionEmergeTestCase(TestCase): + + def testBlockerFileCollision(self): + + debug = False + + install_something = """ +S="${WORKDIR}" + +src_install() { + einfo "installing something..." + insinto /usr/lib + echo "${PN}" > "${T}/file-collision" + doins "${T}/file-collision" +} +""" + + ebuilds = { + "dev-libs/A-1" : { + "EAPI": "6", + "MISC_CONTENT": install_something, + "RDEPEND": "!dev-libs/B", + }, + "dev-libs/B-1" : { + "EAPI": "6", + "MISC_CONTENT": install_something, + "RDEPEND": "!dev-libs/A", + }, + } + + playground = ResolverPlayground(ebuilds=ebuilds, debug=debug) + settings = playground.settings + eprefix = settings["EPREFIX"] + eroot = settings["EROOT"] + var_cache_edb = os.path.join(eprefix, "var", "cache", "edb") + user_config_dir = os.path.join(eprefix, USER_CONFIG_PATH) + + portage_python = portage._python_interpreter + emerge_cmd = (portage_python, "-b", "-Wd", + os.path.join(self.bindir, "emerge")) + + file_collision = os.path.join(eroot, 'usr/lib/file-collision') + + test_commands = ( + emerge_cmd + ("--oneshot", "dev-libs/A",), + (lambda: portage.util.grablines(file_collision) == ["A\n"],), + emerge_cmd + ("--oneshot", "dev-libs/B",), + (lambda: portage.util.grablines(file_collision) == ["B\n"],), + emerge_cmd + ("--oneshot", "dev-libs/A",), + (lambda: portage.util.grablines(file_collision) == ["A\n"],), + ({"FEATURES":"parallel-install"},) + emerge_cmd + ("--oneshot", "dev-libs/B",), + (lambda: portage.util.grablines(file_collision) == ["B\n"],), + ({"FEATURES":"parallel-install"},) + emerge_cmd + ("-Cq", "dev-libs/B",), + (lambda: not os.path.exists(file_collision),), + ) + + fake_bin = os.path.join(eprefix, "bin") + portage_tmpdir = os.path.join(eprefix, "var", "tmp", "portage") + profile_path = settings.profile_path + + path = os.environ.get("PATH") + if path is not None and not path.strip(): + path = None + if path is None: + path = "" + else: + path = ":" + path + path = fake_bin + path + + pythonpath = os.environ.get("PYTHONPATH") + if pythonpath is not None and not pythonpath.strip(): + pythonpath = None + if pythonpath is not None and \ + pythonpath.split(":")[0] == PORTAGE_PYM_PATH: + pass + else: + if pythonpath is None: + pythonpath = "" + else: + pythonpath = ":" + pythonpath + pythonpath = PORTAGE_PYM_PATH + pythonpath + + env = { + "PORTAGE_OVERRIDE_EPREFIX" : eprefix, + "PATH" : path, + "PORTAGE_PYTHON" : portage_python, + "PORTAGE_REPOSITORIES" : settings.repositories.config_string(), + "PYTHONDONTWRITEBYTECODE" : os.environ.get("PYTHONDONTWRITEBYTECODE", ""), + "PYTHONPATH" : pythonpath, + } + + if "__PORTAGE_TEST_HARDLINK_LOCKS" in os.environ: + env["__PORTAGE_TEST_HARDLINK_LOCKS"] = \ + os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"] + + dirs = [playground.distdir, fake_bin, portage_tmpdir, + user_config_dir, var_cache_edb] + true_symlinks = ["chown", "chgrp"] + true_binary = find_binary("true") + self.assertEqual(true_binary is None, False, + "true command not found") + try: + for d in dirs: + ensure_dirs(d) + for x in true_symlinks: + os.symlink(true_binary, os.path.join(fake_bin, x)) + with open(os.path.join(var_cache_edb, "counter"), 'wb') as f: + f.write(b"100") + # non-empty system set keeps --depclean quiet + with open(os.path.join(profile_path, "packages"), 'w') as f: + f.write("*dev-libs/token-system-pkg") + + if debug: + # The subprocess inherits both stdout and stderr, for + # debugging purposes. + stdout = None + else: + # The subprocess inherits stderr so that any warnings + # triggered by python -Wd will be visible. + stdout = subprocess.PIPE + + for i, args in enumerate(test_commands): + + if hasattr(args[0], '__call__'): + self.assertTrue(args[0](), + "callable at index %s failed" % (i,)) + continue + + if isinstance(args[0], dict): + local_env = env.copy() + local_env.update(args[0]) + args = args[1:] + else: + local_env = env + + proc = subprocess.Popen(args, + env=local_env, stdout=stdout) + + if debug: + proc.wait() + else: + output = proc.stdout.readlines() + proc.wait() + proc.stdout.close() + if proc.returncode != os.EX_OK: + for line in output: + sys.stderr.write(_unicode_decode(line)) + + self.assertEqual(os.EX_OK, proc.returncode, + "emerge failed with args %s" % (args,)) + finally: + playground.debug = False + playground.cleanup() -- 2.7.2