public inbox for gentoo-portage-dev@lists.gentoo.org
 help / color / mirror / Atom feed
From: Zac Medico <zmedico@gentoo.org>
To: gentoo-portage-dev@lists.gentoo.org
Cc: Zac Medico <zmedico@gentoo.org>
Subject: [gentoo-portage-dev] [PATCH] Log changes between vdb_metadata.pickle updates
Date: Fri,  7 Nov 2014 00:45:55 -0800	[thread overview]
Message-ID: <1415349955-10728-1-git-send-email-zmedico@gentoo.org> (raw)
In-Reply-To: <5459F511.9020409@gentoo.org>

This adds add support to generate a vdb_metadata_delta.json file
which tracks package merges / unmerges that occur between updates to
vdb_metadata.pickle. IndexedVardb can use the delta together with
vdb_metadata.pickle to reconstruct a complete view of /var/db/pkg,
so that it can avoid expensive listdir calls in /var/db/pkg/*.
Note that vdb_metadata.pickle is only updated periodically, in
order to avoid excessive re-writes of a large file.

In order to test the performance gains from this patch, you need to
generate /var/cache/edb/vdb_metadata_delta.json first, which will
happen automatically if you run 'emerge -p anything' with root
privileges.
---
 pym/portage/dbapi/IndexedVardb.py |  35 ++++++++-
 pym/portage/dbapi/vartree.py      | 161 +++++++++++++++++++++++++++++++++++---
 2 files changed, 185 insertions(+), 11 deletions(-)

diff --git a/pym/portage/dbapi/IndexedVardb.py b/pym/portage/dbapi/IndexedVardb.py
index 424defc..e225ca1 100644
--- a/pym/portage/dbapi/IndexedVardb.py
+++ b/pym/portage/dbapi/IndexedVardb.py
@@ -3,6 +3,7 @@
 
 import portage
 from portage.dep import Atom
+from portage.exception import InvalidData
 from portage.versions import _pkg_str
 
 class IndexedVardb(object):
@@ -42,7 +43,39 @@ class IndexedVardb(object):
 		if self._cp_map is not None:
 			return iter(sorted(self._cp_map))
 
-		return self._iter_cp_all()
+		cache_delta = self._vardb._cache_delta_load_race()
+		if cache_delta is None:
+			return self._iter_cp_all()
+
+		packages = self._vardb._aux_cache["packages"]
+		for delta in cache_delta["deltas"]:
+			cpv = delta["package"] + "-" + delta["version"]
+			event = delta["event"]
+			if event == "add":
+				# Use aux_get to populate the cache
+				# for this cpv.
+				if cpv not in packages:
+					try:
+						self._vardb.aux_get(cpv, ["DESCRIPTION"])
+					except KeyError:
+						pass
+			elif event == "remove":
+				packages.pop(cpv, None)
+
+		self._cp_map = cp_map = {}
+		for cpv in packages:
+			try:
+				cpv = _pkg_str(cpv)
+			except InvalidData:
+				continue
+
+			cp_list = cp_map.get(cpv.cp)
+			if cp_list is None:
+				cp_list = []
+				cp_map[cpv.cp] = cp_list
+			cp_list.append(cpv)
+
+		return iter(sorted(self._cp_map))
 
 	def _iter_cp_all(self):
 		self._cp_map = cp_map = {}
diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py
index 6ab4b92..fd4b099 100644
--- a/pym/portage/dbapi/vartree.py
+++ b/pym/portage/dbapi/vartree.py
@@ -76,6 +76,7 @@ import gc
 import grp
 import io
 from itertools import chain
+import json
 import logging
 import os as _os
 import platform
@@ -109,6 +110,7 @@ class vardbapi(dbapi):
 		"|".join(_excluded_dirs) + r')$')
 
 	_aux_cache_version        = "1"
+	_aux_cache_delta_version  = "1"
 	_owners_cache_version     = "1"
 
 	# Number of uncached packages to trigger cache update, since
@@ -177,6 +179,8 @@ class vardbapi(dbapi):
 		self._aux_cache_obj = None
 		self._aux_cache_filename = os.path.join(self._eroot,
 			CACHE_PATH, "vdb_metadata.pickle")
+		self._cache_delta_filename = os.path.join(self._eroot,
+			CACHE_PATH, "vdb_metadata_delta.json")
 		self._counter_path = os.path.join(self._eroot,
 			CACHE_PATH, "counter")
 
@@ -511,6 +515,120 @@ class vardbapi(dbapi):
 		self.cpcache.pop(pkg_dblink.mysplit[0], None)
 		dircache.pop(pkg_dblink.dbcatdir, None)
 
+	def _cache_delta(self, event, cpv, slot, counter):
+
+		self.lock()
+		try:
+			deltas_obj = self._cache_delta_load()
+
+			if deltas_obj is None:
+				# We can't record meaningful deltas without
+				# a pre-existing state.
+				return
+
+			delta_node = {
+				"event": event,
+				"package": cpv.cp,
+				"version": cpv.version,
+				"slot": slot,
+				"counter": "%s" % counter
+			}
+
+			deltas_obj["deltas"].append(delta_node)
+
+			# Eliminate earlier nodes cancelled out by later nodes
+			# that have identical package and slot attributes.
+			filtered_list = []
+			slot_keys = set()
+			version_keys = set()
+			for delta_node in reversed(deltas_obj["deltas"]):
+				slot_key = (delta_node["package"],
+					delta_node["slot"])
+				version_key = (delta_node["package"],
+					delta_node["version"])
+				if not (slot_key in slot_keys or \
+					version_key in version_keys):
+					filtered_list.append(delta_node)
+					slot_keys.add(slot_key)
+					version_keys.add(version_key)
+
+			filtered_list.reverse()
+			deltas_obj["deltas"] = filtered_list
+
+			f = atomic_ofstream(self._cache_delta_filename,
+				mode='w', encoding=_encodings['repo.content'])
+			json.dump(deltas_obj, f, ensure_ascii=False)
+			f.close()
+
+		finally:
+			self.unlock()
+
+	def _cache_delta_load(self):
+
+		if not os.path.exists(self._aux_cache_filename):
+			# If the primary cache doesn't exist yet, then
+			# we can't record a delta against it.
+			return None
+
+		try:
+			with io.open(self._cache_delta_filename, 'r',
+				encoding=_encodings['repo.content'],
+				errors='strict') as f:
+				cache_obj = json.load(f)
+		except EnvironmentError as e:
+			if e.errno not in (errno.ENOENT, errno.ESTALE):
+				raise
+		except (SystemExit, KeyboardInterrupt):
+			raise
+		except Exception:
+			# Corrupt, or not json format.
+			pass
+		else:
+			try:
+				version = cache_obj["version"]
+			except KeyError:
+				pass
+			else:
+				# If the timestamp recorded in the deltas file
+				# doesn't match aux_cache_timestamp, then the
+				# deltas are not valid. This means that deltas
+				# cannot be recorded until after the next
+				# vdb_metadata.pickle update, in order to
+				# guarantee consistency.
+				if version == self._aux_cache_delta_version:
+					try:
+						deltas = cache_obj["deltas"]
+					except KeyError:
+						cache_obj["deltas"] = deltas = []
+
+					if isinstance(deltas, list):
+						return cache_obj
+
+		return None
+
+	def _cache_delta_load_race(self):
+		"""
+		This calls _cache_delta_load and validates the timestamp
+		against the currently loaded _aux_cache. If a concurrent
+		update causes the timestamps to be inconsistent, then
+		it reloads the caches and tries one more time before
+		it aborts. In practice, the race is very unlikely, so
+		this will usually succeed on the first try.
+		"""
+
+		tries = 2
+		while tries:
+			tries -= 1
+			cache_delta = self._cache_delta_load()
+			if cache_delta is not None and \
+				cache_delta.get("timestamp") != \
+				self._aux_cache.get("timestamp", False):
+				self._aux_cache_obj = None
+			else:
+				return cache_delta
+
+		return None
+
 	def match(self, origdep, use_cache=1):
 		"caching match function"
 		mydep = dep_expand(
@@ -556,22 +674,37 @@ class vardbapi(dbapi):
 		long as at least part of the cache is still valid)."""
 		if self._flush_cache_enabled and \
 			self._aux_cache is not None and \
-			len(self._aux_cache["modified"]) >= self._aux_cache_threshold and \
-			secpass >= 2:
+			secpass >= 2 and \
+			(len(self._aux_cache["modified"]) >= self._aux_cache_threshold or
+			not os.path.exists(self._cache_delta_filename)):
+
+			ensure_dirs(os.path.dirname(self._aux_cache_filename))
+
 			self._owners.populate() # index any unindexed contents
 			valid_nodes = set(self.cpv_all())
 			for cpv in list(self._aux_cache["packages"]):
 				if cpv not in valid_nodes:
 					del self._aux_cache["packages"][cpv]
 			del self._aux_cache["modified"]
-			try:
-				f = atomic_ofstream(self._aux_cache_filename, 'wb')
-				pickle.dump(self._aux_cache, f, protocol=2)
-				f.close()
-				apply_secpass_permissions(
-					self._aux_cache_filename, gid=portage_gid, mode=0o644)
-			except (IOError, OSError) as e:
-				pass
+			timestamp = time.time()
+			self._aux_cache["timestamp"] = timestamp
+
+			f = atomic_ofstream(self._aux_cache_filename, 'wb')
+			pickle.dump(self._aux_cache, f, protocol=2)
+			f.close()
+			apply_secpass_permissions(
+				self._aux_cache_filename, mode=0o644)
+
+			f = atomic_ofstream(self._cache_delta_filename, 'w',
+				encoding=_encodings['repo.content'], errors='strict')
+			json.dump({
+				"version": self._aux_cache_delta_version,
+				"timestamp": timestamp
+				}, f, ensure_ascii=False)
+			f.close()
+			apply_secpass_permissions(
+				self._cache_delta_filename, mode=0o644)
+
 			self._aux_cache["modified"] = set()
 
 	@property
@@ -1590,6 +1723,12 @@ class dblink(object):
 				self.dbdir, noiselevel=-1)
 			return
 
+		if self.dbdir is self.dbpkgdir:
+			counter, = self.vartree.dbapi.aux_get(
+				self.mycpv, ["COUNTER"])
+			self.vartree.dbapi._cache_delta("remove", self.mycpv,
+				self.settings["SLOT"].split("/")[0], counter)
+
 		shutil.rmtree(self.dbdir)
 		# If empty, remove parent category directory.
 		try:
@@ -4196,6 +4335,8 @@ class dblink(object):
 			self.delete()
 			_movefile(self.dbtmpdir, self.dbpkgdir, mysettings=self.settings)
 			self._merged_path(self.dbpkgdir, os.lstat(self.dbpkgdir))
+			self.vartree.dbapi._cache_delta("add",
+				self.mycpv, slot, counter)
 		finally:
 			self.unlockdb()
 
-- 
2.0.4



  reply	other threads:[~2014-11-07  8:46 UTC|newest]

Thread overview: 29+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2014-10-18  3:28 [gentoo-portage-dev] [PATCH] emerge --search: use description index Zac Medico
2014-10-18  5:59 ` [gentoo-portage-dev] " Zac Medico
2014-10-19 21:51   ` Zac Medico
2014-10-23  8:55     ` Brian Dolbec
2014-10-23  9:22       ` Zac Medico
2014-11-01  6:15         ` Zac Medico
2014-11-01 22:46 ` [gentoo-portage-dev] Zac Medico
2014-11-01 22:46   ` [gentoo-portage-dev] [PATCH 1/5] Add egencache --update-pkg-desc-index action Zac Medico
2014-11-04  9:03     ` [gentoo-portage-dev] [PATCH 1/5 v2] " Zac Medico
2014-11-01 22:46   ` [gentoo-portage-dev] [PATCH 2/5] Add IndexStreamIterator and MultiIterGroupBy Zac Medico
2014-11-02  0:18     ` Zac Medico
2014-11-02 22:50     ` [gentoo-portage-dev] [PATCH 2/5 v3] " Zac Medico
2014-11-03  3:07     ` [gentoo-portage-dev] [PATCH 2/5 v4] " Zac Medico
2014-11-01 22:46   ` [gentoo-portage-dev] [PATCH 3/5] Add IndexedPortdb class Zac Medico
2014-11-04  5:07     ` [gentoo-portage-dev] [PATCH 3/5 v2] " Zac Medico
2014-11-04 20:34       ` [gentoo-portage-dev] [PATCH 3/5 v3] " Zac Medico
2014-11-01 22:46   ` [gentoo-portage-dev] [PATCH 4/5] Add IndexedVardb class Zac Medico
2014-11-05  9:59     ` [gentoo-portage-dev] " Zac Medico
2014-11-07  8:45       ` Zac Medico [this message]
2014-11-07 16:51         ` [gentoo-portage-dev] [PATCH] Log changes between vdb_metadata.pickle updates Brian Dolbec
2014-11-07 20:17           ` Zac Medico
2014-11-08  9:16         ` [gentoo-portage-dev] [PATCH v2] " Zac Medico
2014-11-01 22:46   ` [gentoo-portage-dev] [PATCH 5/5] Add emerge --search-index option Zac Medico
2014-11-01 23:04     ` Zac Medico
2014-11-04  5:42       ` [gentoo-portage-dev] [PATCH 5/5 v3] " Zac Medico
2014-11-04  9:10         ` [gentoo-portage-dev] " Zac Medico
2014-11-04 22:09     ` [gentoo-portage-dev] [PATCH 5/5 v4] " Zac Medico
2014-11-03 21:42   ` [gentoo-portage-dev] Brian Dolbec
2014-11-04  9:19     ` [gentoo-portage-dev] Zac Medico

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1415349955-10728-1-git-send-email-zmedico@gentoo.org \
    --to=zmedico@gentoo.org \
    --cc=gentoo-portage-dev@lists.gentoo.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox