From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from lists.gentoo.org (pigeon.gentoo.org [208.92.234.80]) by finch.gentoo.org (Postfix) with ESMTP id E0864138825 for ; Fri, 7 Nov 2014 08:46:03 +0000 (UTC) Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id 82C40E08D3; Fri, 7 Nov 2014 08:46:02 +0000 (UTC) Received: from smtp.gentoo.org (smtp.gentoo.org [140.211.166.183]) (using TLSv1.2 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by pigeon.gentoo.org (Postfix) with ESMTPS id F141CE08D1 for ; Fri, 7 Nov 2014 08:46:01 +0000 (UTC) Received: from localhost.localdomain (ip70-181-96-121.oc.oc.cox.net [70.181.96.121]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) (Authenticated sender: zmedico) by smtp.gentoo.org (Postfix) with ESMTPSA id 8F86A3404B9; Fri, 7 Nov 2014 08:46:00 +0000 (UTC) From: Zac Medico To: gentoo-portage-dev@lists.gentoo.org Cc: Zac Medico Subject: [gentoo-portage-dev] [PATCH] Log changes between vdb_metadata.pickle updates Date: Fri, 7 Nov 2014 00:45:55 -0800 Message-Id: <1415349955-10728-1-git-send-email-zmedico@gentoo.org> X-Mailer: git-send-email 2.0.4 In-Reply-To: <5459F511.9020409@gentoo.org> References: <5459F511.9020409@gentoo.org> Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-portage-dev@lists.gentoo.org Reply-to: gentoo-portage-dev@lists.gentoo.org X-Archives-Salt: 1f79ee7c-ffc3-4f7d-99d5-f12783ed7c45 X-Archives-Hash: e5881b30ed1b4461209e4c715a336534 This adds add support to generate a vdb_metadata_delta.json file which tracks package merges / unmerges that occur between updates to vdb_metadata.pickle. IndexedVardb can use the delta together with vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg, so that it can avoid expensive listdir calls in /var/db/pkg/*. Note that vdb_metadata.pickle is only updated periodically, in order to avoid excessive re-writes of a large file. In order to test the performance gains from this patch, you need to generate /var/cache/edb/vdb_metadata_delta.json first, which will happen automatically if you run 'emerge -p anything' with root privileges. --- pym/portage/dbapi/IndexedVardb.py | 35 ++++++++- pym/portage/dbapi/vartree.py | 161 +++++++++++++++++++++++++++++++++++--- 2 files changed, 185 insertions(+), 11 deletions(-) diff --git a/pym/portage/dbapi/IndexedVardb.py b/pym/portage/dbapi/IndexedVardb.py index 424defc..e225ca1 100644 --- a/pym/portage/dbapi/IndexedVardb.py +++ b/pym/portage/dbapi/IndexedVardb.py @@ -3,6 +3,7 @@ import portage from portage.dep import Atom +from portage.exception import InvalidData from portage.versions import _pkg_str class IndexedVardb(object): @@ -42,7 +43,39 @@ class IndexedVardb(object): if self._cp_map is not None: return iter(sorted(self._cp_map)) - return self._iter_cp_all() + cache_delta = self._vardb._cache_delta_load_race() + if cache_delta is None: + return self._iter_cp_all() + + packages = self._vardb._aux_cache["packages"] + for delta in cache_delta["deltas"]: + cpv = delta["package"] + "-" + delta["version"] + event = delta["event"] + if event == "add": + # Use aux_get to populate the cache + # for this cpv. + if cpv not in packages: + try: + self._vardb.aux_get(cpv, ["DESCRIPTION"]) + except KeyError: + pass + elif event == "remove": + packages.pop(cpv, None) + + self._cp_map = cp_map = {} + for cpv in packages: + try: + cpv = _pkg_str(cpv) + except InvalidData: + continue + + cp_list = cp_map.get(cpv.cp) + if cp_list is None: + cp_list = [] + cp_map[cpv.cp] = cp_list + cp_list.append(cpv) + + return iter(sorted(self._cp_map)) def _iter_cp_all(self): self._cp_map = cp_map = {} diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py index 6ab4b92..fd4b099 100644 --- a/pym/portage/dbapi/vartree.py +++ b/pym/portage/dbapi/vartree.py @@ -76,6 +76,7 @@ import gc import grp import io from itertools import chain +import json import logging import os as _os import platform @@ -109,6 +110,7 @@ class vardbapi(dbapi): "|".join(_excluded_dirs) + r')$') _aux_cache_version = "1" + _aux_cache_delta_version = "1" _owners_cache_version = "1" # Number of uncached packages to trigger cache update, since @@ -177,6 +179,8 @@ class vardbapi(dbapi): self._aux_cache_obj = None self._aux_cache_filename = os.path.join(self._eroot, CACHE_PATH, "vdb_metadata.pickle") + self._cache_delta_filename = os.path.join(self._eroot, + CACHE_PATH, "vdb_metadata_delta.json") self._counter_path = os.path.join(self._eroot, CACHE_PATH, "counter") @@ -511,6 +515,120 @@ class vardbapi(dbapi): self.cpcache.pop(pkg_dblink.mysplit[0], None) dircache.pop(pkg_dblink.dbcatdir, None) + def _cache_delta(self, event, cpv, slot, counter): + + self.lock() + try: + deltas_obj = self._cache_delta_load() + + if deltas_obj is None: + # We can't record meaningful deltas without + # a pre-existing state. + return + + delta_node = { + "event": event, + "package": cpv.cp, + "version": cpv.version, + "slot": slot, + "counter": "%s" % counter + } + + deltas_obj["deltas"].append(delta_node) + + # Eliminate earlier nodes cancelled out by later nodes + # that have identical package and slot attributes. + filtered_list = [] + slot_keys = set() + version_keys = set() + for delta_node in reversed(deltas_obj["deltas"]): + slot_key = (delta_node["package"], + delta_node["slot"]) + version_key = (delta_node["package"], + delta_node["version"]) + if not (slot_key in slot_keys or \ + version_key in version_keys): + filtered_list.append(delta_node) + slot_keys.add(slot_key) + version_keys.add(version_key) + + filtered_list.reverse() + deltas_obj["deltas"] = filtered_list + + f = atomic_ofstream(self._cache_delta_filename, + mode='w', encoding=_encodings['repo.content']) + json.dump(deltas_obj, f, ensure_ascii=False) + f.close() + + finally: + self.unlock() + + def _cache_delta_load(self): + + if not os.path.exists(self._aux_cache_filename): + # If the primary cache doesn't exist yet, then + # we can't record a delta against it. + return None + + try: + with io.open(self._cache_delta_filename, 'r', + encoding=_encodings['repo.content'], + errors='strict') as f: + cache_obj = json.load(f) + except EnvironmentError as e: + if e.errno not in (errno.ENOENT, errno.ESTALE): + raise + except (SystemExit, KeyboardInterrupt): + raise + except Exception: + # Corrupt, or not json format. + pass + else: + try: + version = cache_obj["version"] + except KeyError: + pass + else: + # If the timestamp recorded in the deltas file + # doesn't match aux_cache_timestamp, then the + # deltas are not valid. This means that deltas + # cannot be recorded until after the next + # vdb_metadata.pickle update, in order to + # guarantee consistency. + if version == self._aux_cache_delta_version: + try: + deltas = cache_obj["deltas"] + except KeyError: + cache_obj["deltas"] = deltas = [] + + if isinstance(deltas, list): + return cache_obj + + return None + + def _cache_delta_load_race(self): + """ + This calls _cache_delta_load and validates the timestamp + against the currently loaded _aux_cache. If a concurrent + update causes the timestamps to be inconsistent, then + it reloads the caches and tries one more time before + it aborts. In practice, the race is very unlikely, so + this will usually succeed on the first try. + """ + + tries = 2 + while tries: + tries -= 1 + cache_delta = self._cache_delta_load() + if cache_delta is not None and \ + cache_delta.get("timestamp") != \ + self._aux_cache.get("timestamp", False): + self._aux_cache_obj = None + else: + return cache_delta + + return None + def match(self, origdep, use_cache=1): "caching match function" mydep = dep_expand( @@ -556,22 +674,37 @@ class vardbapi(dbapi): long as at least part of the cache is still valid).""" if self._flush_cache_enabled and \ self._aux_cache is not None and \ - len(self._aux_cache["modified"]) >= self._aux_cache_threshold and \ - secpass >= 2: + secpass >= 2 and \ + (len(self._aux_cache["modified"]) >= self._aux_cache_threshold or + not os.path.exists(self._cache_delta_filename)): + + ensure_dirs(os.path.dirname(self._aux_cache_filename)) + self._owners.populate() # index any unindexed contents valid_nodes = set(self.cpv_all()) for cpv in list(self._aux_cache["packages"]): if cpv not in valid_nodes: del self._aux_cache["packages"][cpv] del self._aux_cache["modified"] - try: - f = atomic_ofstream(self._aux_cache_filename, 'wb') - pickle.dump(self._aux_cache, f, protocol=2) - f.close() - apply_secpass_permissions( - self._aux_cache_filename, gid=portage_gid, mode=0o644) - except (IOError, OSError) as e: - pass + timestamp = time.time() + self._aux_cache["timestamp"] = timestamp + + f = atomic_ofstream(self._aux_cache_filename, 'wb') + pickle.dump(self._aux_cache, f, protocol=2) + f.close() + apply_secpass_permissions( + self._aux_cache_filename, mode=0o644) + + f = atomic_ofstream(self._cache_delta_filename, 'w', + encoding=_encodings['repo.content'], errors='strict') + json.dump({ + "version": self._aux_cache_delta_version, + "timestamp": timestamp + }, f, ensure_ascii=False) + f.close() + apply_secpass_permissions( + self._cache_delta_filename, mode=0o644) + self._aux_cache["modified"] = set() @property @@ -1590,6 +1723,12 @@ class dblink(object): self.dbdir, noiselevel=-1) return + if self.dbdir is self.dbpkgdir: + counter, = self.vartree.dbapi.aux_get( + self.mycpv, ["COUNTER"]) + self.vartree.dbapi._cache_delta("remove", self.mycpv, + self.settings["SLOT"].split("/")[0], counter) + shutil.rmtree(self.dbdir) # If empty, remove parent category directory. try: @@ -4196,6 +4335,8 @@ class dblink(object): self.delete() _movefile(self.dbtmpdir, self.dbpkgdir, mysettings=self.settings) self._merged_path(self.dbpkgdir, os.lstat(self.dbpkgdir)) + self.vartree.dbapi._cache_delta("add", + self.mycpv, slot, counter) finally: self.unlockdb() -- 2.0.4