From: Zac Medico <zmedico@gentoo.org>
To: gentoo-portage-dev@lists.gentoo.org
Cc: Zac Medico <zmedico@gentoo.org>
Subject: [gentoo-portage-dev] [PATCH] Log changes between vdb_metadata.pickle updates
Date: Fri, 7 Nov 2014 00:45:55 -0800 [thread overview]
Message-ID: <1415349955-10728-1-git-send-email-zmedico@gentoo.org> (raw)
In-Reply-To: <5459F511.9020409@gentoo.org>
This adds add support to generate a vdb_metadata_delta.json file
which tracks package merges / unmerges that occur between updates to
vdb_metadata.pickle. IndexedVardb can use the delta together with
vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg,
so that it can avoid expensive listdir calls in /var/db/pkg/*.
Note that vdb_metadata.pickle is only updated periodically, in
order to avoid excessive re-writes of a large file.
In order to test the performance gains from this patch, you need to
generate /var/cache/edb/vdb_metadata_delta.json first, which will
happen automatically if you run 'emerge -p anything' with root
privileges.
---
pym/portage/dbapi/IndexedVardb.py | 35 ++++++++-
pym/portage/dbapi/vartree.py | 161 +++++++++++++++++++++++++++++++++++---
2 files changed, 185 insertions(+), 11 deletions(-)
diff --git a/pym/portage/dbapi/IndexedVardb.py b/pym/portage/dbapi/IndexedVardb.py
index 424defc..e225ca1 100644
--- a/pym/portage/dbapi/IndexedVardb.py
+++ b/pym/portage/dbapi/IndexedVardb.py
@@ -3,6 +3,7 @@
import portage
from portage.dep import Atom
+from portage.exception import InvalidData
from portage.versions import _pkg_str
class IndexedVardb(object):
@@ -42,7 +43,39 @@ class IndexedVardb(object):
if self._cp_map is not None:
return iter(sorted(self._cp_map))
- return self._iter_cp_all()
+ cache_delta = self._vardb._cache_delta_load_race()
+ if cache_delta is None:
+ return self._iter_cp_all()
+
+ packages = self._vardb._aux_cache["packages"]
+ for delta in cache_delta["deltas"]:
+ cpv = delta["package"] + "-" + delta["version"]
+ event = delta["event"]
+ if event == "add":
+ # Use aux_get to populate the cache
+ # for this cpv.
+ if cpv not in packages:
+ try:
+ self._vardb.aux_get(cpv, ["DESCRIPTION"])
+ except KeyError:
+ pass
+ elif event == "remove":
+ packages.pop(cpv, None)
+
+ self._cp_map = cp_map = {}
+ for cpv in packages:
+ try:
+ cpv = _pkg_str(cpv)
+ except InvalidData:
+ continue
+
+ cp_list = cp_map.get(cpv.cp)
+ if cp_list is None:
+ cp_list = []
+ cp_map[cpv.cp] = cp_list
+ cp_list.append(cpv)
+
+ return iter(sorted(self._cp_map))
def _iter_cp_all(self):
self._cp_map = cp_map = {}
diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py
index 6ab4b92..fd4b099 100644
--- a/pym/portage/dbapi/vartree.py
+++ b/pym/portage/dbapi/vartree.py
@@ -76,6 +76,7 @@ import gc
import grp
import io
from itertools import chain
+import json
import logging
import os as _os
import platform
@@ -109,6 +110,7 @@ class vardbapi(dbapi):
"|".join(_excluded_dirs) + r')$')
_aux_cache_version = "1"
+ _aux_cache_delta_version = "1"
_owners_cache_version = "1"
# Number of uncached packages to trigger cache update, since
@@ -177,6 +179,8 @@ class vardbapi(dbapi):
self._aux_cache_obj = None
self._aux_cache_filename = os.path.join(self._eroot,
CACHE_PATH, "vdb_metadata.pickle")
+ self._cache_delta_filename = os.path.join(self._eroot,
+ CACHE_PATH, "vdb_metadata_delta.json")
self._counter_path = os.path.join(self._eroot,
CACHE_PATH, "counter")
@@ -511,6 +515,120 @@ class vardbapi(dbapi):
self.cpcache.pop(pkg_dblink.mysplit[0], None)
dircache.pop(pkg_dblink.dbcatdir, None)
+ def _cache_delta(self, event, cpv, slot, counter):
+
+ self.lock()
+ try:
+ deltas_obj = self._cache_delta_load()
+
+ if deltas_obj is None:
+ # We can't record meaningful deltas without
+ # a pre-existing state.
+ return
+
+ delta_node = {
+ "event": event,
+ "package": cpv.cp,
+ "version": cpv.version,
+ "slot": slot,
+ "counter": "%s" % counter
+ }
+
+ deltas_obj["deltas"].append(delta_node)
+
+ # Eliminate earlier nodes cancelled out by later nodes
+ # that have identical package and slot attributes.
+ filtered_list = []
+ slot_keys = set()
+ version_keys = set()
+ for delta_node in reversed(deltas_obj["deltas"]):
+ slot_key = (delta_node["package"],
+ delta_node["slot"])
+ version_key = (delta_node["package"],
+ delta_node["version"])
+ if not (slot_key in slot_keys or \
+ version_key in version_keys):
+ filtered_list.append(delta_node)
+ slot_keys.add(slot_key)
+ version_keys.add(version_key)
+
+ filtered_list.reverse()
+ deltas_obj["deltas"] = filtered_list
+
+ f = atomic_ofstream(self._cache_delta_filename,
+ mode='w', encoding=_encodings['repo.content'])
+ json.dump(deltas_obj, f, ensure_ascii=False)
+ f.close()
+
+ finally:
+ self.unlock()
+
+ def _cache_delta_load(self):
+
+ if not os.path.exists(self._aux_cache_filename):
+ # If the primary cache doesn't exist yet, then
+ # we can't record a delta against it.
+ return None
+
+ try:
+ with io.open(self._cache_delta_filename, 'r',
+ encoding=_encodings['repo.content'],
+ errors='strict') as f:
+ cache_obj = json.load(f)
+ except EnvironmentError as e:
+ if e.errno not in (errno.ENOENT, errno.ESTALE):
+ raise
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except Exception:
+ # Corrupt, or not json format.
+ pass
+ else:
+ try:
+ version = cache_obj["version"]
+ except KeyError:
+ pass
+ else:
+ # If the timestamp recorded in the deltas file
+ # doesn't match aux_cache_timestamp, then the
+ # deltas are not valid. This means that deltas
+ # cannot be recorded until after the next
+ # vdb_metadata.pickle update, in order to
+ # guarantee consistency.
+ if version == self._aux_cache_delta_version:
+ try:
+ deltas = cache_obj["deltas"]
+ except KeyError:
+ cache_obj["deltas"] = deltas = []
+
+ if isinstance(deltas, list):
+ return cache_obj
+
+ return None
+
+ def _cache_delta_load_race(self):
+ """
+ This calls _cache_delta_load and validates the timestamp
+ against the currently loaded _aux_cache. If a concurrent
+ update causes the timestamps to be inconsistent, then
+ it reloads the caches and tries one more time before
+ it aborts. In practice, the race is very unlikely, so
+ this will usually succeed on the first try.
+ """
+
+ tries = 2
+ while tries:
+ tries -= 1
+ cache_delta = self._cache_delta_load()
+ if cache_delta is not None and \
+ cache_delta.get("timestamp") != \
+ self._aux_cache.get("timestamp", False):
+ self._aux_cache_obj = None
+ else:
+ return cache_delta
+
+ return None
+
def match(self, origdep, use_cache=1):
"caching match function"
mydep = dep_expand(
@@ -556,22 +674,37 @@ class vardbapi(dbapi):
long as at least part of the cache is still valid)."""
if self._flush_cache_enabled and \
self._aux_cache is not None and \
- len(self._aux_cache["modified"]) >= self._aux_cache_threshold and \
- secpass >= 2:
+ secpass >= 2 and \
+ (len(self._aux_cache["modified"]) >= self._aux_cache_threshold or
+ not os.path.exists(self._cache_delta_filename)):
+
+ ensure_dirs(os.path.dirname(self._aux_cache_filename))
+
self._owners.populate() # index any unindexed contents
valid_nodes = set(self.cpv_all())
for cpv in list(self._aux_cache["packages"]):
if cpv not in valid_nodes:
del self._aux_cache["packages"][cpv]
del self._aux_cache["modified"]
- try:
- f = atomic_ofstream(self._aux_cache_filename, 'wb')
- pickle.dump(self._aux_cache, f, protocol=2)
- f.close()
- apply_secpass_permissions(
- self._aux_cache_filename, gid=portage_gid, mode=0o644)
- except (IOError, OSError) as e:
- pass
+ timestamp = time.time()
+ self._aux_cache["timestamp"] = timestamp
+
+ f = atomic_ofstream(self._aux_cache_filename, 'wb')
+ pickle.dump(self._aux_cache, f, protocol=2)
+ f.close()
+ apply_secpass_permissions(
+ self._aux_cache_filename, mode=0o644)
+
+ f = atomic_ofstream(self._cache_delta_filename, 'w',
+ encoding=_encodings['repo.content'], errors='strict')
+ json.dump({
+ "version": self._aux_cache_delta_version,
+ "timestamp": timestamp
+ }, f, ensure_ascii=False)
+ f.close()
+ apply_secpass_permissions(
+ self._cache_delta_filename, mode=0o644)
+
self._aux_cache["modified"] = set()
@property
@@ -1590,6 +1723,12 @@ class dblink(object):
self.dbdir, noiselevel=-1)
return
+ if self.dbdir is self.dbpkgdir:
+ counter, = self.vartree.dbapi.aux_get(
+ self.mycpv, ["COUNTER"])
+ self.vartree.dbapi._cache_delta("remove", self.mycpv,
+ self.settings["SLOT"].split("/")[0], counter)
+
shutil.rmtree(self.dbdir)
# If empty, remove parent category directory.
try:
@@ -4196,6 +4335,8 @@ class dblink(object):
self.delete()
_movefile(self.dbtmpdir, self.dbpkgdir, mysettings=self.settings)
self._merged_path(self.dbpkgdir, os.lstat(self.dbpkgdir))
+ self.vartree.dbapi._cache_delta("add",
+ self.mycpv, slot, counter)
finally:
self.unlockdb()
--
2.0.4
next prev parent reply other threads:[~2014-11-07 8:46 UTC|newest]
Thread overview: 29+ messages / expand[flat|nested] mbox.gz Atom feed top
2014-10-18 3:28 [gentoo-portage-dev] [PATCH] emerge --search: use description index Zac Medico
2014-10-18 5:59 ` [gentoo-portage-dev] " Zac Medico
2014-10-19 21:51 ` Zac Medico
2014-10-23 8:55 ` Brian Dolbec
2014-10-23 9:22 ` Zac Medico
2014-11-01 6:15 ` Zac Medico
2014-11-01 22:46 ` [gentoo-portage-dev] Zac Medico
2014-11-01 22:46 ` [gentoo-portage-dev] [PATCH 1/5] Add egencache --update-pkg-desc-index action Zac Medico
2014-11-04 9:03 ` [gentoo-portage-dev] [PATCH 1/5 v2] " Zac Medico
2014-11-01 22:46 ` [gentoo-portage-dev] [PATCH 2/5] Add IndexStreamIterator and MultiIterGroupBy Zac Medico
2014-11-02 0:18 ` Zac Medico
2014-11-02 22:50 ` [gentoo-portage-dev] [PATCH 2/5 v3] " Zac Medico
2014-11-03 3:07 ` [gentoo-portage-dev] [PATCH 2/5 v4] " Zac Medico
2014-11-01 22:46 ` [gentoo-portage-dev] [PATCH 3/5] Add IndexedPortdb class Zac Medico
2014-11-04 5:07 ` [gentoo-portage-dev] [PATCH 3/5 v2] " Zac Medico
2014-11-04 20:34 ` [gentoo-portage-dev] [PATCH 3/5 v3] " Zac Medico
2014-11-01 22:46 ` [gentoo-portage-dev] [PATCH 4/5] Add IndexedVardb class Zac Medico
2014-11-05 9:59 ` [gentoo-portage-dev] " Zac Medico
2014-11-07 8:45 ` Zac Medico [this message]
2014-11-07 16:51 ` [gentoo-portage-dev] [PATCH] Log changes between vdb_metadata.pickle updates Brian Dolbec
2014-11-07 20:17 ` Zac Medico
2014-11-08 9:16 ` [gentoo-portage-dev] [PATCH v2] " Zac Medico
2014-11-01 22:46 ` [gentoo-portage-dev] [PATCH 5/5] Add emerge --search-index option Zac Medico
2014-11-01 23:04 ` Zac Medico
2014-11-04 5:42 ` [gentoo-portage-dev] [PATCH 5/5 v3] " Zac Medico
2014-11-04 9:10 ` [gentoo-portage-dev] " Zac Medico
2014-11-04 22:09 ` [gentoo-portage-dev] [PATCH 5/5 v4] " Zac Medico
2014-11-03 21:42 ` [gentoo-portage-dev] Brian Dolbec
2014-11-04 9:19 ` [gentoo-portage-dev] Zac Medico
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1415349955-10728-1-git-send-email-zmedico@gentoo.org \
--to=zmedico@gentoo.org \
--cc=gentoo-portage-dev@lists.gentoo.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox