public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/portage:master commit in: man/, bin/, pym/portage/util/_async/, pym/portage/_emirrordist/, ...
@ 2013-01-09 18:48 Zac Medico
  0 siblings, 0 replies; only message in thread
From: Zac Medico @ 2013-01-09 18:48 UTC (permalink / raw
  To: gentoo-commits

commit:     12c795d81251b03f3d33d02696dc8db1299dee78
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Jan  9 14:38:16 2013 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Jan  9 17:09:44 2013 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=12c795d8

Add emirrordist, a tool for mirroring distfiles.

Special thanks to Brian Harring, author of the mirror-dist program from
which emirrordist is derived.

---
 bin/emirrordist                              |   13 +
 man/emirrordist.1                            |  143 ++++++
 pym/portage/_emirrordist/Config.py           |  132 ++++++
 pym/portage/_emirrordist/DeletionIterator.py |   83 ++++
 pym/portage/_emirrordist/DeletionTask.py     |  129 ++++++
 pym/portage/_emirrordist/FetchIterator.py    |  132 ++++++
 pym/portage/_emirrordist/FetchTask.py        |  615 ++++++++++++++++++++++++++
 pym/portage/_emirrordist/MirrorDistTask.py   |  218 +++++++++
 pym/portage/_emirrordist/__init__.py         |    2 +
 pym/portage/_emirrordist/main.py             |  438 ++++++++++++++++++
 pym/portage/util/_ShelveUnicodeWrapper.py    |   45 ++
 pym/portage/util/_async/FileCopier.py        |   17 +
 12 files changed, 1967 insertions(+), 0 deletions(-)

diff --git a/bin/emirrordist b/bin/emirrordist
new file mode 100755
index 0000000..8d93de9
--- /dev/null
+++ b/bin/emirrordist
@@ -0,0 +1,13 @@
+#!/usr/bin/python
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import sys
+
+import portage
+portage._internal_caller = True
+portage._disable_legacy_globals()
+from portage._emirrordist.main import emirrordist_main
+
+if __name__ == "__main__":
+	sys.exit(emirrordist_main(sys.argv[1:]))

diff --git a/man/emirrordist.1 b/man/emirrordist.1
new file mode 100644
index 0000000..6d02077
--- /dev/null
+++ b/man/emirrordist.1
@@ -0,0 +1,143 @@
+.TH "EMIRRORDIST" "1" "Jan 2013" "Portage VERSION" "Portage"
+.SH "NAME"
+emirrordist \- a fetch tool for mirroring of package distfiles
+.SH SYNOPSIS
+.B emirrordist
+[\fIoptions\fR] \fI<action>\fR
+.SH ACTIONS
+.TP
+\fB\-h\fR, \fB\-\-help\fR
+Show a help message and exit.
+.TP
+\fB\-\-version\fR
+Display portage version and exit.
+.TP
+\fB\-\-mirror\fR
+Mirror distfiles for the selected repository.
+.SH OPTIONS
+.TP
+\fB\-\-dry\-run\fR
+Perform a trial run with no changes made (typically combined
+with \fI\-v\fR or \fI\-vv\fR).
+.TP
+\fB\-v\fR, \fB\-\-verbose\fR
+Display extra information on stderr (multiple occurences
+increase verbosity).
+.TP
+\fB\-\-ignore\-default\-opts\fR
+Do not use the \fIEMIRRORDIST_DEFAULT_OPTS\fR environment
+variable.
+.TP
+\fB\-\-distfiles\fR=\fIDIR\fR
+Distfiles directory to use (required).
+.TP
+\fB\-j\fR JOBS, \fB\-\-jobs\fR=\fIJOBS\fR
+Number of concurrent jobs to run.
+.TP
+\fB\-l\fR LOAD, \fB\-\-load\-average\fR=\fILOAD\fR
+Load average limit for spawning of new concurrent jobs.
+.TP
+\fB\-\-tries\fR=\fITRIES\fR
+Maximum number of tries per file, 0 means unlimited
+(default is 10).
+.TP
+\fB\-\-repo\fR=\fIREPO\fR
+Name of repo to operate on (default repo is located at
+$PORTDIR).
+.TP
+\fB\-\-config\-root\fR=\fIDIR\fR
+Location of portage config files.
+.TP
+\fB\-\-portdir\fR=\fIDIR\fR
+Override the portage tree location.
+.TP
+\fB\-\-portdir\-overlay\fR=\fIPORTDIR_OVERLAY\fR
+Override the PORTDIR_OVERLAY variable (requires that
+\fI\-\-repo\fR is also specified).
+.TP
+\fB\-\-strict\-manifests=\fR<y|n>
+Manually override "strict" FEATURES setting.
+.TP
+\fB\-\-failure\-log\fR=\fIFILE\fR
+Log file for fetch failures, with tab\-delimited output, for
+reporting purposes. Opened in append mode.
+.TP
+\fB\-\-success\-log\fR=\fIFILE\fR
+Log file for fetch successes, with tab\-delimited output, for
+reporting purposes. Opened in append mode.
+.TP
+\fB\-\-scheduled\-deletion\-log\fR=\fIFILE\fR
+Log file for scheduled deletions, with tab\-delimited output, for
+reporting purposes. Overwritten with each run.
+.TP
+\fB\-\-delete\fR
+Enable deletion of unused distfiles.
+.TP
+\fB\-\-deletion\-db\fR=\fIFILE\fR
+Database file used to track lifetime of files scheduled for
+delayed deletion.
+.TP
+\fB\-\-deletion\-delay\fR=\fISECONDS\fR
+Delay time for deletion of unused distfiles, measured in seconds.
+.TP
+\fB\-\-temp\-dir\fR=\fIDIR\fR
+Temporary directory for downloads.
+.TP
+\fB\-\-mirror\-overrides\fR=\fIFILE\fR
+File holding a list of mirror overrides.
+.TP
+\fB\-\-mirror\-skip\fR=\fIMIRROR_SKIP\fR
+Comma delimited list of mirror targets to skip when
+fetching.
+.TP
+\fB\-\-restrict\-mirror\-exemptions\fR=\fIRESTRICT_MIRROR_EXEMPTIONS\fR
+Comma delimited list of mirror targets for which to ignore
+RESTRICT="mirror" (see \fBebuild\fR(5)).
+.TP
+\fB\-\-verify\-existing\-digest\fR
+Use digest as a verification of whether existing
+distfiles are valid.
+.TP
+\fB\-\-distfiles\-local\fR=\fIDIR\fR
+The distfiles\-local directory to use.
+.TP
+\fB\-\-distfiles\-db\fR=\fIFILE\fR
+Database file used to track which ebuilds a distfile belongs to.
+.TP
+\fB\-\-recycle\-dir\fR=\fIDIR\fR
+Directory for extended retention of files that are removed from
+distdir with the \-\-delete option. These files may be be recycled if
+they are needed again, instead of downloading them again.
+.TP
+\fB\-\-recycle\-db\fR=\fIFILE\fR
+Database file used to track lifetime of files in recycle dir.
+.TP
+\fB\-\-recycle\-deletion\-delay\fR=\fISECONDS\fR
+Delay time for deletion of unused files from recycle dir,
+measured in seconds (defaults to the equivalent of 60 days).
+.TP
+\fB\-\-fetch\-log\-dir\fR=\fIDIR\fR
+Directory for individual fetch logs.
+.TP
+\fB\-\-whitelist\-from\fR=\fIFILE\fR
+Specifies a file containing a list of files to whitelist, one per line,
+# prefixed lines ignored. Use this option multiple times in order to
+specify multiple whitelists.
+.SH "REPORTING BUGS"
+Please report bugs via http://bugs.gentoo.org/
+.SH "THANKS"
+Special thanks to Brian Harring, author of the mirror\-dist program from
+which emirrordist is derived.
+.SH "AUTHORS"
+.nf
+Zac Medico <zmedico@gentoo.org>
+.fi
+.SH "FILES"
+.TP
+.B /etc/portage/make.conf
+Contains variables.
+.SH "SEE ALSO"
+.BR ebuild (5),
+.BR egencache (1),
+.BR make.conf (5),
+.BR portage (5)

diff --git a/pym/portage/_emirrordist/Config.py b/pym/portage/_emirrordist/Config.py
new file mode 100644
index 0000000..db4bfeb
--- /dev/null
+++ b/pym/portage/_emirrordist/Config.py
@@ -0,0 +1,132 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import copy
+import io
+import logging
+import shelve
+import sys
+import time
+
+import portage
+from portage import os
+from portage.util import grabdict, grablines
+from portage.util._ShelveUnicodeWrapper import ShelveUnicodeWrapper
+
+class Config(object):
+	def __init__(self, options, portdb, event_loop):
+		self.options = options
+		self.portdb = portdb
+		self.event_loop = event_loop
+		self.added_byte_count = 0
+		self.added_file_count = 0
+		self.scheduled_deletion_count = 0
+		self.delete_count = 0
+		self.file_owners = {}
+		self.file_failures = {}
+		self.start_time = time.time()
+		self._open_files = []
+
+		self.log_success = self._open_log('success', options.success_log, 'a')
+		self.log_failure = self._open_log('failure', options.failure_log, 'a')
+
+		self.distfiles = None
+		if options.distfiles is not None:
+			self.distfiles = options.distfiles
+
+		self.mirrors = copy.copy(portdb.settings.thirdpartymirrors())
+
+		if options.mirror_overrides is not None:
+			self.mirrors.update(grabdict(options.mirror_overrides))
+
+		if options.mirror_skip is not None:
+			for x in options.mirror_skip.split(","):
+				self.mirrors[x] = []
+
+		self.whitelist = None
+		if options.whitelist_from is not None:
+			self.whitelist = set()
+			for filename in options.whitelist_from:
+				for line in grablines(filename):
+					line = line.strip()
+					if line and not line.startswith("#"):
+						self.whitelist.add(line)
+
+		self.restrict_mirror_exemptions = None
+		if options.restrict_mirror_exemptions is not None:
+			self.restrict_mirror_exemptions = frozenset(
+				options.restrict_mirror_exemptions.split(","))
+
+		self.recycle_db = None
+		if options.recycle_db is not None:
+			self.recycle_db = self._open_shelve(
+				options.recycle_db, 'recycle')
+
+		self.distfiles_db = None
+		if options.distfiles_db is not None:
+			self.distfiles_db = self._open_shelve(
+				options.distfiles_db, 'distfiles')
+
+		self.deletion_db = None
+		if options.deletion_db is not None:
+			self.deletion_db = self._open_shelve(
+				options.deletion_db, 'deletion')
+
+	def _open_log(self, log_desc, log_path, mode):
+
+		if log_path is None or self.options.dry_run:
+			log_func = logging.info
+			line_format = "%s: %%s" % log_desc
+			add_newline = False
+			if log_path is not None:
+				logging.warn(("dry-run: %s log "
+					"redirected to logging.info") % log_desc)
+		else:
+			self._open_files.append(io.open(log_path, mode=mode,
+				encoding='utf_8'))
+			line_format = "%s\n"
+			log_func = self._open_files[-1].write
+
+		return self._LogFormatter(line_format, log_func)
+
+	class _LogFormatter(object):
+
+		__slots__ = ('_line_format', '_log_func')
+
+		def __init__(self, line_format, log_func):
+			self._line_format = line_format
+			self._log_func = log_func
+
+		def __call__(self, msg):
+			self._log_func(self._line_format % (msg,))
+
+	def _open_shelve(self, db_file, db_desc):
+		if self.options.dry_run:
+			open_flag = "r"
+		else:
+			open_flag = "c"
+
+		if self.options.dry_run and not os.path.exists(db_file):
+			db = {}
+		else:
+			db = shelve.open(db_file, flag=open_flag)
+			if sys.hexversion < 0x3000000:
+				db = ShelveUnicodeWrapper(db)
+
+		if self.options.dry_run:
+			logging.warn("dry-run: %s db opened in readonly mode" % db_desc)
+			if not isinstance(db, dict):
+				volatile_db = dict((k, db[k]) for k in db)
+				db.close()
+				db = volatile_db
+		else:
+			self._open_files.append(db)
+
+		return db
+
+	def __enter__(self):
+		return self
+
+	def __exit__(self, exc_type, exc_value, traceback):
+		while self._open_files:
+			self._open_files.pop().close()

diff --git a/pym/portage/_emirrordist/DeletionIterator.py b/pym/portage/_emirrordist/DeletionIterator.py
new file mode 100644
index 0000000..dff52c0
--- /dev/null
+++ b/pym/portage/_emirrordist/DeletionIterator.py
@@ -0,0 +1,83 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import logging
+import stat
+
+from portage import os
+from .DeletionTask import DeletionTask
+
+class DeletionIterator(object):
+
+	def __init__(self, config):
+		self._config = config
+
+	def __iter__(self):
+		distdir = self._config.options.distfiles
+		file_owners = self._config.file_owners
+		whitelist = self._config.whitelist
+		distfiles_local = self._config.options.distfiles_local
+		deletion_db = self._config.deletion_db
+		deletion_delay = self._config.options.deletion_delay
+		start_time = self._config.start_time
+		distfiles_set = set(os.listdir(self._config.options.distfiles))
+		for filename in distfiles_set:
+			try:
+				st = os.stat(os.path.join(distdir, filename))
+			except OSError as e:
+				logging.error("stat failed on '%s' in distfiles: %s\n" %
+					(filename, e))
+				continue
+			if not stat.S_ISREG(st.st_mode):
+				continue
+			elif filename in file_owners:
+				if deletion_db is not None:
+					try:
+						del deletion_db[filename]
+					except KeyError:
+						pass
+			elif whitelist is not None and filename in whitelist:
+				if deletion_db is not None:
+					try:
+						del deletion_db[filename]
+					except KeyError:
+						pass
+			elif distfiles_local is not None and \
+				os.path.exists(os.path.join(distfiles_local, filename)):
+				if deletion_db is not None:
+					try:
+						del deletion_db[filename]
+					except KeyError:
+						pass
+			else:
+				self._config.scheduled_deletion_count += 1
+
+				if deletion_db is None or deletion_delay is None:
+
+					yield DeletionTask(background=True,
+						distfile=filename,
+						config=self._config)
+
+				else:
+					deletion_entry = deletion_db.get(filename)
+
+					if deletion_entry is None:
+						logging.debug("add '%s' to deletion db" % filename)
+						deletion_db[filename] = start_time
+
+					elif deletion_entry + deletion_delay <= start_time:
+
+						yield DeletionTask(background=True,
+							distfile=filename,
+							config=self._config)
+
+		if deletion_db is not None:
+			for filename in list(deletion_db):
+				if filename not in distfiles_set:
+					try:
+						del deletion_db[filename]
+					except KeyError:
+						pass
+					else:
+						logging.debug("drop '%s' from deletion db" %
+							filename)

diff --git a/pym/portage/_emirrordist/DeletionTask.py b/pym/portage/_emirrordist/DeletionTask.py
new file mode 100644
index 0000000..7d10957
--- /dev/null
+++ b/pym/portage/_emirrordist/DeletionTask.py
@@ -0,0 +1,129 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import logging
+
+from portage import os
+from portage.util._async.FileCopier import FileCopier
+from _emerge.CompositeTask import CompositeTask
+
+class DeletionTask(CompositeTask):
+
+	__slots__ = ('distfile', 'config')
+
+	def _start(self):
+
+		distfile_path = os.path.join(
+			self.config.options.distfiles, self.distfile)
+
+		if self.config.options.recycle_dir is not None:
+			distfile_path = os.path.join(self.config.options.distfiles, self.distfile)
+			recycle_path = os.path.join(
+				self.config.options.recycle_dir, self.distfile)
+			if self.config.options.dry_run:
+				logging.info(("dry-run: move '%s' from "
+					"distfiles to recycle") % self.distfile)
+			else:
+				logging.debug(("move '%s' from "
+					"distfiles to recycle") % self.distfile)
+				try:
+					os.rename(distfile_path, recycle_path)
+				except OSError as e:
+					if e.errno != errno.EXDEV:
+						logging.error(("rename %s from distfiles to "
+							"recycle failed: %s") % (self.distfile, e))
+				else:
+					self.returncode = os.EX_OK
+					self._async_wait()
+					return
+
+				self._start_task(
+					FileCopier(src_path=distfile_path,
+						dest_path=recycle_path,
+						background=False),
+					self._recycle_copier_exit)
+				return
+
+		success = True
+
+		if self.config.options.dry_run:
+			logging.info(("dry-run: delete '%s' from "
+				"distfiles") % self.distfile)
+		else:
+			logging.debug(("delete '%s' from "
+				"distfiles") % self.distfile)
+			try:
+				os.unlink(distfile_path)
+			except OSError as e:
+				if e.errno not in (errno.ENOENT, errno.ESTALE):
+					logging.error("%s unlink failed in distfiles: %s" %
+						(self.distfile, e))
+					success = False
+
+		if success:
+			self._success()
+			self.returncode = os.EX_OK
+		else:
+			self.returncode = 1
+
+		self._async_wait()
+
+	def _recycle_copier_exit(self, copier):
+
+		self._assert_current(copier)
+		if self._was_cancelled():
+			self.wait()
+			return
+
+		success = True
+		if copier.returncode == os.EX_OK:
+
+			try:
+				os.unlink(copier.src_path)
+			except OSError as e:
+				if e.errno not in (errno.ENOENT, errno.ESTALE):
+					logging.error("%s unlink failed in distfiles: %s" %
+						(self.distfile, e))
+					success = False
+
+		else:
+			logging.error(("%s copy from distfiles "
+				"to recycle failed: %s") % (self.distfile, e))
+			success = False
+
+		if success:
+			self._success()
+			self.returncode = os.EX_OK
+		else:
+			self.returncode = 1
+
+		self._current_task = None
+		self.wait()
+
+	def _success(self):
+
+		cpv = "unknown"
+		if self.config.distfiles_db is not None:
+			cpv = self.config.distfiles_db.get(self.distfile, cpv)
+
+		self.config.delete_count += 1
+		self.config.log_success("%s\t%s\tremoved" % (cpv, self.distfile))
+
+		if self.config.distfiles_db is not None:
+			try:
+				del self.config.distfiles_db[self.distfile]
+			except KeyError:
+				pass
+			else:
+				logging.debug(("drop '%s' from "
+					"distfiles db") % self.distfile)
+
+		if self.config.deletion_db is not None:
+			try:
+				del self.config.deletion_db[self.distfile]
+			except KeyError:
+				pass
+			else:
+				logging.debug(("drop '%s' from "
+					"deletion db") % self.distfile)

diff --git a/pym/portage/_emirrordist/FetchIterator.py b/pym/portage/_emirrordist/FetchIterator.py
new file mode 100644
index 0000000..cc5d4be
--- /dev/null
+++ b/pym/portage/_emirrordist/FetchIterator.py
@@ -0,0 +1,132 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.dep import use_reduce
+from portage.exception import PortageException
+from portage.manifest import Manifest
+from .FetchTask import FetchTask
+
+class FetchIterator(object):
+
+	def __init__(self, config):
+		self._config = config
+		self._log_failure = config.log_failure
+
+	def _iter_every_cp(self):
+		# List categories individually, in order to start yielding quicker,
+		# and in order to reduce latency in case of a signal interrupt.
+		cp_all = self._config.portdb.cp_all
+		for category in sorted(self._config.portdb.categories):
+			for cp in cp_all(categories=(category,)):
+				yield cp
+
+	def __iter__(self):
+
+		portdb = self._config.portdb
+		file_owners = self._config.file_owners
+		file_failures = self._config.file_failures
+		restrict_mirror_exemptions = self._config.restrict_mirror_exemptions
+
+		for cp in self._iter_every_cp():
+
+			for tree in portdb.porttrees:
+
+				# Reset state so the Manifest is pulled once
+				# for this cp / tree combination.
+				digests = None
+
+				for cpv in portdb.cp_list(cp, mytree=tree):
+
+					try:
+						restrict, = portdb.aux_get(cpv, ("RESTRICT",),
+							mytree=tree)
+					except (KeyError, PortageException) as e:
+						self._log_failure("%s\t\taux_get exception %s" %
+							(cpv, e))
+						continue
+
+					# Here we use matchnone=True to ignore conditional parts
+					# of RESTRICT since they don't apply unconditionally.
+					# Assume such conditionals only apply on the client side.
+					try:
+						restrict = frozenset(use_reduce(restrict,
+							flat=True, matchnone=True))
+					except PortageException as e:
+						self._log_failure("%s\t\tuse_reduce exception %s" %
+							(cpv, e))
+						continue
+
+					if "fetch" in restrict:
+						continue
+
+					try:
+						uri_map = portdb.getFetchMap(cpv)
+					except PortageException as e:
+						self._log_failure("%s\t\tgetFetchMap exception %s" %
+							(cpv, e))
+						continue
+
+					if not uri_map:
+						continue
+
+					if "mirror" in restrict:
+						skip = False
+						if restrict_mirror_exemptions is not None:
+							new_uri_map = {}
+							for filename, uri_tuple in uri_map.items():
+								for uri in uri_tuple:
+									if uri[:9] == "mirror://":
+										i = uri.find("/", 9)
+										if i != -1 and uri[9:i].strip("/") in \
+											restrict_mirror_exemptions:
+											new_uri_map[filename] = uri_tuple
+											break
+							if new_uri_map:
+								uri_map = new_uri_map
+							else:
+								skip = True
+						else:
+							skip = True
+
+						if skip:
+							continue
+
+					# Parse Manifest for this cp if we haven't yet.
+					if digests is None:
+						try:
+							digests = Manifest(os.path.join(
+								tree, cp)).getTypeDigests("DIST")
+						except (EnvironmentError, PortageException) as e:
+							for filename in uri_map:
+								self._log_failure(
+									"%s\t%s\tManifest exception %s" %
+									(cpv, filename, e))
+								file_failures[filename] = cpv
+							continue
+
+					if not digests:
+						for filename in uri_map:
+							self._log_failure("%s\t%s\tdigest entry missing" %
+								(cpv, filename))
+							file_failures[filename] = cpv
+						continue
+
+					for filename, uri_tuple in uri_map.items():
+						file_digests = digests.get(filename)
+						if file_digests is None:
+							self._log_failure("%s\t%s\tdigest entry missing" %
+								(cpv, filename))
+							file_failures[filename] = cpv
+							continue
+						if filename in file_owners:
+							continue
+						file_owners[filename] = cpv
+
+						yield FetchTask(cpv=cpv,
+							background=True,
+							digests=file_digests,
+							distfile=filename,
+							restrict=restrict,
+							uri_tuple=uri_tuple,
+							config=self._config)

diff --git a/pym/portage/_emirrordist/FetchTask.py b/pym/portage/_emirrordist/FetchTask.py
new file mode 100644
index 0000000..65fd672
--- /dev/null
+++ b/pym/portage/_emirrordist/FetchTask.py
@@ -0,0 +1,615 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import collections
+import errno
+import logging
+import stat
+import subprocess
+import sys
+
+import portage
+from portage import _encodings, _unicode_encode
+from portage import os
+from portage.exception import PortageException
+from portage.util._async.FileCopier import FileCopier
+from portage.util._async.FileDigester import FileDigester
+from portage.util._async.PipeLogger import PipeLogger
+from portage.util._async.PopenProcess import PopenProcess
+from _emerge.CompositeTask import CompositeTask
+
+default_hash_name = portage.const.MANIFEST2_REQUIRED_HASH
+
+default_fetchcommand = "wget -c -v -t 1 --passive-ftp --timeout=60 -O \"${DISTDIR}/${FILE}\" \"${URI}\""
+
+class FetchTask(CompositeTask):
+
+	__slots__ = ('distfile', 'digests', 'config', 'cpv',
+		'restrict', 'uri_tuple', '_current_mirror',
+		'_current_stat', '_fetch_tmp_dir_info', '_fetch_tmp_file',
+		'_fs_mirror_stack', '_mirror_stack',
+		'_previously_added',
+		'_primaryuri_stack', '_log_path', '_tried_uris')
+
+	def _start(self):
+
+		if self.config.options.fetch_log_dir is not None and \
+			not self.config.options.dry_run:
+			self._log_path = os.path.join(
+				self.config.options.fetch_log_dir,
+				self.distfile + '.log')
+
+		self._previously_added = True
+		if self.config.distfiles_db is not None and \
+			self.distfile not in self.config.distfiles_db:
+			self._previously_added = False
+			self.config.distfiles_db[self.distfile] = self.cpv
+
+		if not self._have_needed_digests():
+			msg = "incomplete digests: %s" % " ".join(self.digests)
+			self.scheduler.output(msg, background=self.background,
+				log_path=self._log_path)
+			self.config.log_failure("%s\t%s\t%s" %
+				(self.cpv, self.distfile, msg))
+			self.config.file_failures[self.distfile] = self.cpv
+			self.returncode = os.EX_OK
+			self._async_wait()
+			return
+
+		distfile_path = os.path.join(
+			self.config.options.distfiles, self.distfile)
+
+		st = None
+		size_ok = False
+		try:
+			st = os.stat(distfile_path)
+		except OSError as e:
+			if e.errno not in (errno.ENOENT, errno.ESTALE):
+				msg = "%s stat failed in %s: %s" % \
+					(self.distfile, "distfiles", e)
+				self.scheduler.output(msg + '\n', background=True,
+					log_path=self._log_path)
+				logging.error(msg)
+		else:
+			size_ok = st.st_size == self.digests["size"]
+
+		if not size_ok:
+			if self.config.options.dry_run:
+				if st is not None:
+					logging.info(("dry-run: delete '%s' with "
+						"wrong size from distfiles") % (self.distfile,))
+			else:
+				# Do the unlink in order to ensure that the path is clear,
+				# even if stat raised ENOENT, since a broken symlink can
+				# trigger ENOENT.
+				if self._unlink_file(distfile_path, "distfiles"):
+					if st is not None:
+						logging.debug(("delete '%s' with "
+							"wrong size from distfiles") % (self.distfile,))
+				else:
+					self.config.log_failure("%s\t%s\t%s" %
+						(self.cpv, self.distfile, "unlink failed in distfiles"))
+					self.returncode = os.EX_OK
+					self._async_wait()
+					return
+
+		if size_ok:
+			if self.config.options.verify_existing_digest:
+				self._start_task(
+					FileDigester(file_path=distfile_path,
+						hash_names=(self._select_hash(),),
+						background=self.background,
+						logfile=self._log_path), self._distfiles_digester_exit)
+				return
+
+			self._success()
+			self.returncode = os.EX_OK
+			self._async_wait()
+			return
+
+		self._start_fetch()
+
+	def _success(self):
+		if not self._previously_added:
+			size = self.digests["size"]
+			self.config.added_byte_count += size
+			self.config.added_file_count += 1
+			self.config.log_success("%s\t%s\tadded %i bytes" %
+				(self.cpv, self.distfile, size))
+
+		if self._log_path is not None:
+			if not self.config.options.dry_run:
+				try:
+					os.unlink(self._log_path)
+				except OSError:
+					pass
+
+		if self.config.options.recycle_dir is not None:
+
+			recycle_file = os.path.join(
+				self.config.options.recycle_dir, self.distfile)
+
+			if self.config.options.dry_run:
+				if os.path.exists(recycle_file):
+					logging.info("dry-run: delete '%s' from recycle" %
+						(self.distfile,))
+			else:
+				try:
+					os.unlink(recycle_file)
+				except OSError:
+					pass
+				else:
+					logging.debug("delete '%s' from recycle" %
+						(self.distfile,))
+
+	def _distfiles_digester_exit(self, digester):
+
+		self._assert_current(digester)
+		if self._was_cancelled():
+			self.wait()
+			return
+
+		if self._default_exit(digester) != os.EX_OK:
+			msg = "%s distfiles digester failed unexpectedly" % \
+				(self.distfile,)
+			self.scheduler.output(msg + '\n', background=True,
+				log_path=self._log_path)
+			logging.error(msg)
+			self.wait()
+			return
+
+		wrong_digest = self._find_bad_digest(digester.digests)
+		if wrong_digest is None:
+			self._success()
+			self.returncode = os.EX_OK
+			self.wait()
+			return
+
+		self._start_fetch()
+
+	_mirror_info = collections.namedtuple('_mirror_info',
+		'name location')
+
+	def _start_fetch(self):
+
+		self._previously_added = False
+		self._fs_mirror_stack = []
+		if self.config.options.distfiles_local is not None:
+			self._fs_mirror_stack.append(self._mirror_info(
+				'distfiles-local', self.config.options.distfiles_local))
+		if self.config.options.recycle_dir is not None:
+			self._fs_mirror_stack.append(self._mirror_info(
+				'recycle', self.config.options.recycle_dir))
+
+		self._primaryuri_stack = []
+		self._mirror_stack = []
+		for uri in reversed(self.uri_tuple):
+			if uri.startswith('mirror://'):
+				self._mirror_stack.append(
+					self._mirror_iterator(uri, self.config.mirrors))
+			else:
+				self._primaryuri_stack.append(uri)
+
+		self._tried_uris = set()
+		self._try_next_mirror()
+
+	@staticmethod
+	def _mirror_iterator(uri, mirrors_dict):
+
+		slash_index = uri.find("/", 9)
+		if slash_index != -1:
+			mirror_name = uri[9:slash_index].strip("/")
+			for mirror in mirrors_dict.get(mirror_name, []):
+				yield mirror.rstrip("/") + "/" + uri[slash_index+1:]
+
+	def _try_next_mirror(self):
+		if self._fs_mirror_stack:
+			self._fetch_fs(self._fs_mirror_stack.pop())
+			return
+		else:
+			uri = self._next_uri()
+			if uri is not None:
+				self._tried_uris.add(uri)
+				self._fetch_uri(uri)
+				return
+
+		if self._tried_uris:
+			msg = "all uris failed"
+		else:
+			msg = "no fetchable uris"
+
+		self.config.log_failure("%s\t%s\t%s" %
+			(self.cpv, self.distfile, msg))
+		self.config.file_failures[self.distfile] = self.cpv
+		self.returncode = os.EX_OK
+		self.wait()
+
+	def _next_uri(self):
+		remaining_tries = self.config.options.tries - len(self._tried_uris)
+		if remaining_tries > 0:
+
+			if remaining_tries <= self.config.options.tries / 2:
+				while self._primaryuri_stack:
+					uri = self._primaryuri_stack.pop()
+					if uri not in self._tried_uris:
+						return uri
+
+			while self._mirror_stack:
+				uri = next(self._mirror_stack[-1], None)
+				if uri is None:
+					self._mirror_stack.pop()
+				else:
+					if uri not in self._tried_uris:
+						return uri
+
+			if self._primaryuri_stack:
+				return self._primaryuri_stack.pop()
+
+		return None
+
+	def _fetch_fs(self, mirror_info):
+		file_path = os.path.join(mirror_info.location, self.distfile)
+
+		st = None
+		size_ok = False
+		try:
+			st = os.stat(file_path)
+		except OSError as e:
+			if e.errno not in (errno.ENOENT, errno.ESTALE):
+				msg = "%s stat failed in %s: %s" % \
+					(self.distfile, mirror_info.name, e)
+				self.scheduler.output(msg + '\n', background=True,
+					log_path=self._log_path)
+				logging.error(msg)
+		else:
+			size_ok = st.st_size == self.digests["size"]
+			self._current_stat = st
+
+		if size_ok:
+			self._current_mirror = mirror_info
+			self._start_task(
+				FileDigester(file_path=file_path,
+					hash_names=(self._select_hash(),),
+					background=self.background,
+					logfile=self._log_path),
+				self._fs_mirror_digester_exit)
+		else:
+			self._try_next_mirror()
+
+	def _fs_mirror_digester_exit(self, digester):
+
+		self._assert_current(digester)
+		if self._was_cancelled():
+			self.wait()
+			return
+
+		current_mirror = self._current_mirror
+		if digester.returncode != os.EX_OK:
+			msg = "%s %s digester failed unexpectedly" % \
+			(self.distfile, current_mirror.name)
+			self.scheduler.output(msg + '\n', background=True,
+				log_path=self._log_path)
+			logging.error(msg)
+		else:
+			bad_digest = self._find_bad_digest(digester.digests)
+			if bad_digest is not None:
+				msg = "%s %s has bad %s digest: expected %s, got %s" % \
+					(self.distfile, current_mirror.name, bad_digest,
+					self.digests[bad_digest], digester.digests[bad_digest])
+				self.scheduler.output(msg + '\n', background=True,
+					log_path=self._log_path)
+				logging.error(msg)
+			elif self.config.options.dry_run:
+				# Report success without actually touching any files
+				if self._same_device(current_mirror.location,
+					self.config.options.distfiles):
+					logging.info(("dry-run: hardlink '%s' from %s "
+						"to distfiles") % (self.distfile, current_mirror.name))
+				else:
+					logging.info("dry-run: copy '%s' from %s to distfiles" %
+						(self.distfile, current_mirror.name))
+				self._success()
+				self.returncode = os.EX_OK
+				self.wait()
+				return
+			else:
+				src = os.path.join(current_mirror.location, self.distfile)
+				dest = os.path.join(self.config.options.distfiles, self.distfile)
+				if self._hardlink_atomic(src, dest,
+					"%s to %s" % (current_mirror.name, "distfiles")):
+					logging.debug("hardlink '%s' from %s to distfiles" %
+						(self.distfile, current_mirror.name))
+					self._success()
+					self.returncode = os.EX_OK
+					self.wait()
+					return
+				else:
+					self._start_task(
+						FileCopier(src_path=src, dest_path=dest,
+							background=(self.background and
+								self._log_path is not None),
+							logfile=self._log_path),
+						self._fs_mirror_copier_exit)
+					return
+
+		self._try_next_mirror()
+
+	def _fs_mirror_copier_exit(self, copier):
+
+		self._assert_current(copier)
+		if self._was_cancelled():
+			self.wait()
+			return
+
+		current_mirror = self._current_mirror
+		if copier.returncode != os.EX_OK:
+			msg = "%s %s copy failed unexpectedly" % \
+				(self.distfile, current_mirror.name)
+			self.scheduler.output(msg + '\n', background=True,
+				log_path=self._log_path)
+			logging.error(msg)
+		else:
+
+			logging.debug("copy '%s' from %s to distfiles" %
+				(self.distfile, current_mirror.name))
+
+			try:
+				portage.util.apply_stat_permissions(
+					copier.dest_path, self._current_stat)
+			except (OSError, PortageException) as e:
+				msg = ("%s %s apply_stat_permissions "
+					"failed unexpectedly: %s") % \
+					(self.distfile, current_mirror.name, e)
+				self.scheduler.output(msg + '\n', background=True,
+					log_path=self._log_path)
+				logging.error(msg)
+
+			try:
+				if sys.hexversion >= 0x3030000:
+					os.utime(copier.dest_path,
+						ns=(self._current_stat.st_mtime_ns,
+						self._current_stat.st_mtime_ns))
+				else:
+					os.utime(copier.dest_path,
+						(self._current_stat[stat.ST_MTIME],
+						self._current_stat[stat.ST_MTIME]))
+			except OSError as e:
+				msg = "%s %s utime failed unexpectedly: %s" % \
+					(self.distfile, current_mirror.name, e)
+				self.scheduler.output(msg + '\n', background=True,
+					log_path=self._log_path)
+				logging.error(msg)
+
+			self._success()
+			self.returncode = os.EX_OK
+			self.wait()
+			return
+
+		self._try_next_mirror()
+
+	def _fetch_uri(self, uri):
+
+		if self.config.options.dry_run:
+			# Simply report success.
+			logging.info("dry-run: fetch '%s' from '%s'" %
+				(self.distfile, uri))
+			self._success()
+			self.returncode = os.EX_OK
+			self.wait()
+			return
+
+		if self.config.options.temp_dir:
+			self._fetch_tmp_dir_info = 'temp-dir'
+			distdir = self.config.options.temp_dir
+		else:
+			self._fetch_tmp_dir_info = 'distfiles'
+			distdir = self.config.options.distfiles
+
+		tmp_basename = self.distfile + '._emirrordist_fetch_.%s' % os.getpid()
+
+		variables = {
+			"DISTDIR": distdir,
+			"URI":     uri,
+			"FILE":    tmp_basename
+		}
+
+		self._fetch_tmp_file = os.path.join(distdir, tmp_basename)
+
+		try:
+			os.unlink(self._fetch_tmp_file)
+		except OSError:
+			pass
+
+		args = portage.util.shlex_split(default_fetchcommand)
+		args = [portage.util.varexpand(x, mydict=variables)
+			for x in args]
+
+		if sys.hexversion < 0x3000000 or sys.hexversion >= 0x3020000:
+			# Python 3.1 does not support bytes in Popen args.
+			args = [_unicode_encode(x,
+				encoding=_encodings['fs'], errors='strict') for x in args]
+
+		null_fd = os.open(os.devnull, os.O_RDONLY)
+		fetcher = PopenProcess(background=self.background,
+			proc=subprocess.Popen(args, stdin=null_fd,
+			stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
+			scheduler=self.scheduler)
+		os.close(null_fd)
+
+		fetcher.pipe_reader = PipeLogger(background=self.background,
+			input_fd=fetcher.proc.stdout, log_file_path=self._log_path,
+			scheduler=self.scheduler)
+
+		self._start_task(fetcher, self._fetcher_exit)
+
+	def _fetcher_exit(self, fetcher):
+
+		self._assert_current(fetcher)
+		if self._was_cancelled():
+			self.wait()
+			return
+		
+		if os.path.exists(self._fetch_tmp_file):
+			self._start_task(
+				FileDigester(file_path=self._fetch_tmp_file,
+					hash_names=(self._select_hash(),),
+					background=self.background,
+					logfile=self._log_path),
+					self._fetch_digester_exit)
+		else:
+			self._try_next_mirror()
+
+	def _fetch_digester_exit(self, digester):
+
+		self._assert_current(digester)
+		if self._was_cancelled():
+			self.wait()
+			return
+
+		if digester.returncode != os.EX_OK:
+			msg = "%s %s digester failed unexpectedly" % \
+			(self.distfile, self._fetch_tmp_dir_info)
+			self.scheduler.output(msg + '\n', background=True,
+				log_path=self._log_path)
+			logging.error(msg)
+		else:
+			bad_digest = self._find_bad_digest(digester.digests)
+			if bad_digest is not None:
+				msg = "%s %s has bad %s digest: expected %s, got %s" % \
+					(self.distfile, self._current_mirror.name, bad_digest,
+					self.digests[bad_digest], digester.digests[bad_digest])
+				self.scheduler.output(msg + '\n', background=True,
+					log_path=self._log_path)
+				try:
+					os.unlink(self._fetch_tmp_file)
+				except OSError:
+					pass
+			else:
+				dest = os.path.join(self.config.options.distfiles, self.distfile)
+				try:
+					os.rename(self._fetch_tmp_file, dest)
+				except OSError:
+					self._start_task(
+						FileCopier(src_path=self._fetch_tmp_file,
+							dest_path=dest,
+							background=(self.background and
+								self._log_path is not None),
+							logfile=self._log_path),
+						self._fetch_copier_exit)
+					return
+				else:
+					self._success()
+					self.returncode = os.EX_OK
+					self.wait()
+					return
+
+		self._try_next_mirror()
+
+	def _fetch_copier_exit(self, copier):
+
+		self._assert_current(copier)
+
+		try:
+			os.unlink(self._fetch_tmp_file)
+		except OSError:
+			pass
+
+		if self._was_cancelled():
+			self.wait()
+			return
+
+		if copier.returncode == os.EX_OK:
+			self._success()
+			self.returncode = os.EX_OK
+			self.wait()
+		else:
+			# out of space?
+			msg = "%s %s copy failed unexpectedly" % \
+				(self.distfile, self._fetch_tmp_dir_info)
+			self.scheduler.output(msg + '\n', background=True,
+				log_path=self._log_path)
+			logging.error(msg)
+			self.config.log_failure("%s\t%s\t%s" %
+				(self.cpv, self.distfile, msg))
+			self.config.file_failures[self.distfile] = self.cpv
+			self.returncode = 1
+			self.wait()
+
+	def _unlink_file(self, file_path, dir_info):
+		try:
+			os.unlink(file_path)
+		except OSError as e:
+			if e.errno not in (errno.ENOENT, errno.ESTALE):
+				msg = "unlink '%s' failed in %s: %s" % \
+					(self.distfile, dir_info, e)
+				self.scheduler.output(msg + '\n', background=True,
+					log_path=self._log_path)
+				logging.error(msg)
+				return False
+		return True
+
+	def _have_needed_digests(self):
+		return "size" in self.digests and \
+			self._select_hash() is not None
+
+	def _select_hash(self):
+		if default_hash_name in self.digests:
+			return default_hash_name
+		else:
+			for hash_name in self.digests:
+				if hash_name != "size" and \
+					hash_name in portage.checksum.hashfunc_map:
+					return hash_name
+
+		return None
+
+	def _find_bad_digest(self, digests):
+		for hash_name, hash_value in digests.items():
+			if self.digests[hash_name] != hash_value:
+				return hash_name
+		return None
+
+	@staticmethod
+	def _same_device(path1, path2):
+		try:
+			st1 = os.stat(path1)
+			st2 = os.stat(path2)
+		except OSError:
+			return False
+		else:
+			return st1.st_dev == st2.st_dev
+
+	def _hardlink_atomic(self, src, dest, dir_info):
+
+		head, tail = os.path.split(dest)
+		hardlink_tmp = os.path.join(head, ".%s._mirrordist_hardlink_.%s" % \
+			(tail, os.getpid()))
+
+		try:
+			try:
+				os.link(src, hardlink_tmp)
+			except OSError as e:
+				if e.errno != errno.EXDEV:
+					msg = "hardlink %s from %s failed: %s" % \
+						(self.distfile, dir_info, e)
+					self.scheduler.output(msg + '\n', background=True,
+						log_path=self._log_path)
+					logging.error(msg)
+				return False
+
+			try:
+				os.rename(hardlink_tmp, dest)
+			except OSError as e:
+				msg = "hardlink rename '%s' from %s failed: %s" % \
+					(self.distfile, dir_info, e)
+				self.scheduler.output(msg + '\n', background=True,
+					log_path=self._log_path)
+				logging.error(msg)
+				return False
+		finally:
+			try:
+				os.unlink(hardlink_tmp)
+			except OSError:
+				pass
+
+		return True

diff --git a/pym/portage/_emirrordist/MirrorDistTask.py b/pym/portage/_emirrordist/MirrorDistTask.py
new file mode 100644
index 0000000..b6f875d
--- /dev/null
+++ b/pym/portage/_emirrordist/MirrorDistTask.py
@@ -0,0 +1,218 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import logging
+import sys
+import time
+
+try:
+	import threading
+except ImportError:
+	import dummy_threading as threading
+
+import portage
+from portage import os
+from portage.util._async.TaskScheduler import TaskScheduler
+from _emerge.CompositeTask import CompositeTask
+from .FetchIterator import FetchIterator
+from .DeletionIterator import DeletionIterator
+
+if sys.hexversion >= 0x3000000:
+	long = int
+
+class MirrorDistTask(CompositeTask):
+
+	__slots__ = ('_config', '_terminated', '_term_check_id')
+
+	def __init__(self, config):
+		CompositeTask.__init__(self, scheduler=config.event_loop)
+		self._config = config
+		self._terminated = threading.Event()
+
+	def _start(self):
+		self._term_check_id = self.scheduler.idle_add(self._termination_check)
+		fetch = TaskScheduler(iter(FetchIterator(self._config)),
+			max_jobs=self._config.options.jobs,
+			max_load=self._config.options.load_average,
+			event_loop=self._config.event_loop)
+		self._start_task(fetch, self._fetch_exit)
+
+	def _fetch_exit(self, fetch):
+
+		self._assert_current(fetch)
+		if self._was_cancelled():
+			self.wait()
+			return
+
+		if self._config.options.delete:
+			deletion = TaskScheduler(iter(DeletionIterator(self._config)),
+				max_jobs=self._config.options.jobs,
+				max_load=self._config.options.load_average,
+				event_loop=self._config.event_loop)
+			self._start_task(deletion, self._deletion_exit)
+			return
+
+		self._post_deletion()
+
+	def _deletion_exit(self, deletion):
+
+		self._assert_current(deletion)
+		if self._was_cancelled():
+			self.wait()
+			return
+
+		self._post_deletion()
+
+	def _post_deletion(self):
+
+		if self._config.options.recycle_db is not None:
+			self._update_recycle_db()
+
+		if self._config.options.scheduled_deletion_log is not None:
+			self._scheduled_deletion_log()
+
+		self._summary()
+
+		self.returncode = os.EX_OK
+		self._current_task = None
+		self.wait()
+
+	def _update_recycle_db(self):
+
+		start_time = self._config.start_time
+		recycle_dir = self._config.options.recycle_dir
+		recycle_db = self._config.recycle_db
+		r_deletion_delay = self._config.options.recycle_deletion_delay
+
+		# Use a dict optimize access.
+		recycle_db_cache = dict(recycle_db.items())
+
+		for filename in os.listdir(recycle_dir):
+
+			recycle_file = os.path.join(recycle_dir, filename)
+
+			try:
+				st = os.stat(recycle_file)
+			except OSError as e:
+				if e.errno not in (errno.ENOENT, errno.ESTALE):
+					logging.error(("stat failed for '%s' in "
+						"recycle: %s") % (filename, e))
+				continue
+
+			value = recycle_db_cache.pop(filename, None)
+			if value is None:
+				logging.debug(("add '%s' to "
+					"recycle db") % filename)
+				recycle_db[filename] = (st.st_size, start_time)
+			else:
+				r_size, r_time = value
+				if long(r_size) != st.st_size:
+					recycle_db[filename] = (st.st_size, start_time)
+				elif r_time + r_deletion_delay < start_time:
+					if self._config.options.dry_run:
+						logging.info(("dry-run: delete '%s' from "
+							"recycle") % filename)
+						logging.info(("drop '%s' from "
+							"recycle db") % filename)
+					else:
+						try:
+							os.unlink(recycle_file)
+						except OSError as e:
+							if e.errno not in (errno.ENOENT, errno.ESTALE):
+								logging.error(("delete '%s' from "
+									"recycle failed: %s") % (filename, e))
+						else:
+							logging.debug(("delete '%s' from "
+								"recycle") % filename)
+							try:
+								del recycle_db[filename]
+							except KeyError:
+								pass
+							else:
+								logging.debug(("drop '%s' from "
+									"recycle db") % filename)
+
+		# Existing files were popped from recycle_db_cache,
+		# so any remaining entries are for files that no
+		# longer exist.
+		for filename in recycle_db_cache:
+			try:
+				del recycle_db[filename]
+			except KeyError:
+				pass
+			else:
+				logging.debug(("drop non-existent '%s' from "
+					"recycle db") % filename)
+
+	def _scheduled_deletion_log(self):
+
+		start_time = self._config.start_time
+		dry_run = self._config.options.dry_run
+		deletion_delay = self._config.options.deletion_delay
+		distfiles_db = self._config.distfiles_db
+
+		date_map = {}
+		for filename, timestamp in self._config.deletion_db.items():
+			date = timestamp + deletion_delay
+			if date < start_time:
+				date = start_time
+			date = time.strftime("%Y-%m-%d", time.gmtime(date))
+			date_files = date_map.get(date)
+			if date_files is None:
+				date_files = []
+				date_map[date] = date_files
+			date_files.append(filename)
+
+		if dry_run:
+			logging.warn(("dry-run: scheduled-deletions log "
+				"will be summarized via logging.info"))
+
+		lines = []
+		for date in sorted(date_map):
+			date_files = date_map[date]
+			if dry_run:
+				logging.info(("dry-run: scheduled deletions for %s: %s files") %
+					(date, len(date_files)))
+			lines.append("%s\n" % date)
+			for filename in date_files:
+				cpv = "unknown"
+				if distfiles_db is not None:
+					cpv = distfiles_db.get(filename, cpv)
+				lines.append("\t%s\t%s\n" % (filename, cpv))
+
+		if not dry_run:
+			portage.util.write_atomic(
+				self._config.options.scheduled_deletion_log,
+				"".join(lines))
+
+	def _summary(self):
+		elapsed_time = time.time() - self._config.start_time
+		fail_count = len(self._config.file_failures)
+		delete_count = self._config.delete_count
+		scheduled_deletion_count = self._config.scheduled_deletion_count - delete_count
+		added_file_count = self._config.added_file_count
+		added_byte_count = self._config.added_byte_count
+
+		logging.info("finished in %i seconds" % elapsed_time)
+		logging.info("failed to fetch %i files" % fail_count)
+		logging.info("deleted %i files" % delete_count)
+		logging.info("deletion of %i files scheduled" %
+			scheduled_deletion_count)
+		logging.info("added %i files" % added_file_count)
+		logging.info("added %i bytes total" % added_byte_count)
+
+	def terminate(self):
+		self._terminated.set()
+
+	def _termination_check(self):
+		if self._terminated.is_set():
+			self.cancel()
+			self.wait()
+		return True
+
+	def _wait(self):
+		CompositeTask._wait(self)
+		if self._term_check_id is not None:
+			self.scheduler.source_remove(self._term_check_id)
+			self._term_check_id = None

diff --git a/pym/portage/_emirrordist/__init__.py b/pym/portage/_emirrordist/__init__.py
new file mode 100644
index 0000000..6cde932
--- /dev/null
+++ b/pym/portage/_emirrordist/__init__.py
@@ -0,0 +1,2 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2

diff --git a/pym/portage/_emirrordist/main.py b/pym/portage/_emirrordist/main.py
new file mode 100644
index 0000000..6b7c22f
--- /dev/null
+++ b/pym/portage/_emirrordist/main.py
@@ -0,0 +1,438 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import logging
+import optparse
+import sys
+
+import portage
+from portage import os
+from portage.util import normalize_path
+from portage.util._async.run_main_scheduler import run_main_scheduler
+from portage.util._async.SchedulerInterface import SchedulerInterface
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage._emirrordist.Config import Config
+from .Config import Config
+from .MirrorDistTask import MirrorDistTask
+
+if sys.hexversion >= 0x3000000:
+	long = int
+
+seconds_per_day = 24 * 60 * 60
+
+common_options = (
+	{
+		"longopt"  : "--dry-run",
+		"help"     : "perform a trial run with no changes made (usually combined "
+			"with --verbose)",
+		"action"   : "store_true"
+	},
+	{
+		"longopt"  : "--verbose",
+		"shortopt" : "-v",
+		"help"     : "display extra information on stderr "
+			"(multiple occurences increase verbosity)",
+		"action"   : "count",
+		"default"  : 0,
+	},
+	{
+		"longopt"  : "--ignore-default-opts",
+		"help"     : "do not use the EMIRRORDIST_DEFAULT_OPTS environment variable",
+		"action"   : "store_true"
+	},
+	{
+		"longopt"  : "--distfiles",
+		"help"     : "distfiles directory to use (required)",
+		"metavar"  : "DIR"
+	},
+	{
+		"longopt"  : "--jobs",
+		"shortopt" : "-j",
+		"help"     : "number of concurrent jobs to run"
+	},
+	{
+		"longopt"  : "--load-average",
+		"shortopt" : "-l",
+		"help"     : "load average limit for spawning of new concurrent jobs",
+		"metavar"  : "LOAD"
+	},
+	{
+		"longopt"  : "--tries",
+		"help"     : "maximum number of tries per file, 0 means unlimited (default is 10)",
+		"default"  : 10
+	},
+	{
+		"longopt"  : "--repo",
+		"help"     : "name of repo to operate on (default repo is located at $PORTDIR)"
+	},
+	{
+		"longopt"  : "--config-root",
+		"help"     : "location of portage config files",
+		"metavar"  : "DIR"
+	},
+	{
+		"longopt"  : "--portdir",
+		"help"     : "override the portage tree location",
+		"metavar"  : "DIR"
+	},
+	{
+		"longopt"  : "--portdir-overlay",
+		"help"     : "override the PORTDIR_OVERLAY variable (requires "
+			"that --repo is also specified)"
+	},
+	{
+		"longopt"  : "--strict-manifests",
+		"help"     : "manually override \"strict\" FEATURES setting",
+		"choices"  : ("y", "n"),
+		"metavar"  : "<y|n>",
+		"type"     : "choice"
+	},
+	{
+		"longopt"  : "--failure-log",
+		"help"     : "log file for fetch failures, with tab-delimited "
+			"output, for reporting purposes",
+		"metavar"  : "FILE"
+	},
+	{
+		"longopt"  : "--success-log",
+		"help"     : "log file for fetch successes, with tab-delimited "
+			"output, for reporting purposes",
+		"metavar"  : "FILE"
+	},
+	{
+		"longopt"  : "--scheduled-deletion-log",
+		"help"     : "log file for scheduled deletions, with tab-delimited "
+			"output, for reporting purposes",
+		"metavar"  : "FILE"
+	},
+	{
+		"longopt"  : "--delete",
+		"help"     : "enable deletion of unused distfiles",
+		"action"   : "store_true"
+	},
+	{
+		"longopt"  : "--deletion-db",
+		"help"     : "database file used to track lifetime of files "
+			"scheduled for delayed deletion",
+		"metavar"  : "FILE"
+	},
+	{
+		"longopt"  : "--deletion-delay",
+		"help"     : "delay time for deletion, measured in seconds",
+		"metavar"  : "SECONDS"
+	},
+	{
+		"longopt"  : "--temp-dir",
+		"help"     : "temporary directory for downloads",
+		"metavar"  : "DIR"
+	},
+	{
+		"longopt"  : "--mirror-overrides",
+		"help"     : "file holding a list of mirror overrides",
+		"metavar"  : "FILE"
+	},
+	{
+		"longopt"  : "--mirror-skip",
+		"help"     : "comma delimited list of mirror targets to skip "
+			"when fetching"
+	},
+	{
+		"longopt"  : "--restrict-mirror-exemptions",
+		"help"     : "comma delimited list of mirror targets for which to "
+			"ignore RESTRICT=\"mirror\""
+	},
+	{
+		"longopt"  : "--verify-existing-digest",
+		"help"     : "use digest as a verification of whether existing "
+			"distfiles are valid",
+		"action"   : "store_true"
+	},
+	{
+		"longopt"  : "--distfiles-local",
+		"help"     : "distfiles-local directory to use",
+		"metavar"  : "DIR"
+	},
+	{
+		"longopt"  : "--distfiles-db",
+		"help"     : "database file used to track which ebuilds a "
+			"distfile belongs to",
+		"metavar"  : "FILE"
+	},
+	{
+		"longopt"  : "--recycle-dir",
+		"help"     : "directory for extended retention of files that "
+			"are removed from distdir with the --delete option",
+		"metavar"  : "DIR"
+	},
+	{
+		"longopt"  : "--recycle-db",
+		"help"     : "database file used to track lifetime of files "
+			"in recycle dir",
+		"metavar"  : "FILE"
+	},
+	{
+		"longopt"  : "--recycle-deletion-delay",
+		"help"     : "delay time for deletion of unused files from "
+			"recycle dir, measured in seconds (defaults to "
+			"the equivalent of 60 days)",
+		"default"  : 60 * seconds_per_day,
+		"metavar"  : "SECONDS"
+	},
+	{
+		"longopt"  : "--fetch-log-dir",
+		"help"     : "directory for individual fetch logs",
+		"metavar"  : "DIR"
+	},
+	{
+		"longopt"  : "--whitelist-from",
+		"help"     : "specifies a file containing a list of files to "
+			"whitelist, one per line, # prefixed lines ignored",
+		"action"   : "append",
+		"metavar"  : "FILE"
+	},
+)
+
+def parse_args(args):
+	description = "emirrordist - a fetch tool for mirroring " \
+		"of package distfiles"
+	usage = "emirrordist [options] <action>"
+	parser = optparse.OptionParser(description=description, usage=usage)
+
+	actions = optparse.OptionGroup(parser, 'Actions')
+	actions.add_option("--version",
+		action="store_true",
+		help="display portage version and exit")
+	actions.add_option("--mirror",
+		action="store_true",
+		help="mirror distfiles for the selected repository")
+	parser.add_option_group(actions)
+
+	common = optparse.OptionGroup(parser, 'Common options')
+	for opt_info in common_options:
+		opt_pargs = [opt_info["longopt"]]
+		if opt_info.get("shortopt"):
+			opt_pargs.append(opt_info["shortopt"])
+		opt_kwargs = {"help" : opt_info["help"]}
+		for k in ("action", "choices", "default", "metavar", "type"):
+			if k in opt_info:
+				opt_kwargs[k] = opt_info[k]
+		common.add_option(*opt_pargs, **opt_kwargs)
+	parser.add_option_group(common)
+
+	options, args = parser.parse_args(args)
+
+	return (parser, options, args)
+
+def emirrordist_main(args):
+
+	parser, options, args = parse_args(args)
+
+	if options.version:
+		sys.stdout.write("Portage %s\n" % portage.VERSION)
+		return os.EX_OK
+
+	config_root = options.config_root
+
+	# The calling environment is ignored, so the program is
+	# completely controlled by commandline arguments.
+	env = {}
+
+	if options.repo is None:
+		env['PORTDIR_OVERLAY'] = ''
+	elif options.portdir_overlay:
+		env['PORTDIR_OVERLAY'] = options.portdir_overlay
+
+	if options.portdir is not None:
+		env['PORTDIR'] = options.portdir
+
+	settings = portage.config(config_root=config_root,
+		local_config=False, env=env)
+
+	default_opts = None
+	if not options.ignore_default_opts:
+		default_opts = settings.get('EMIRRORDIST_DEFAULT_OPTS', '').split()
+
+	if default_opts:
+		parser, options, args = parse_args(default_opts + args)
+
+		settings = portage.config(config_root=config_root,
+			local_config=False, env=env)
+
+	repo_path = None
+	if options.repo is not None:
+		repo_path = settings.repositories.treemap.get(options.repo)
+		if repo_path is None:
+			parser.error("Unable to locate repository named '%s'" % \
+				(options.repo,))
+	else:
+		repo_path = settings.repositories.mainRepoLocation()
+		if not repo_path:
+			parser.error("PORTDIR is undefined")
+
+	if options.jobs is not None:
+		options.jobs = int(options.jobs)
+
+	if options.load_average is not None:
+		options.load_average = float(options.load_average)
+
+	if options.failure_log is not None:
+		options.failure_log = normalize_path(
+			os.path.abspath(options.failure_log))
+
+		parent_dir = os.path.dirname(options.failure_log)
+		if not (os.path.isdir(parent_dir) and
+			os.access(parent_dir, os.W_OK|os.X_OK)):
+			parser.error(("--failure-log '%s' parent is not a "
+				"writable directory") % options.failure_log)
+
+	if options.success_log is not None:
+		options.success_log = normalize_path(
+			os.path.abspath(options.success_log))
+
+		parent_dir = os.path.dirname(options.success_log)
+		if not (os.path.isdir(parent_dir) and
+			os.access(parent_dir, os.W_OK|os.X_OK)):
+			parser.error(("--success-log '%s' parent is not a "
+				"writable directory") % options.success_log)
+
+	if options.scheduled_deletion_log is not None:
+		options.scheduled_deletion_log = normalize_path(
+			os.path.abspath(options.scheduled_deletion_log))
+
+		parent_dir = os.path.dirname(options.scheduled_deletion_log)
+		if not (os.path.isdir(parent_dir) and
+			os.access(parent_dir, os.W_OK|os.X_OK)):
+			parser.error(("--scheduled-deletion-log '%s' parent is not a "
+				"writable directory") % options.scheduled_deletion_log)
+
+		if options.deletion_db is None:
+			parser.error("--scheduled-deletion-log requires --deletion-db")
+
+	if options.deletion_delay is not None:
+		options.deletion_delay = long(options.deletion_delay)
+		if options.deletion_db is None:
+			parser.error("--deletion-delay requires --deletion-db")
+
+	if options.deletion_db is not None:
+		if options.deletion_delay is None:
+			parser.error("--deletion-db requires --deletion-delay")
+		options.deletion_db = normalize_path(
+			os.path.abspath(options.deletion_db))
+
+	if options.temp_dir is not None:
+		options.temp_dir = normalize_path(
+			os.path.abspath(options.temp_dir))
+
+		if not (os.path.isdir(options.temp_dir) and
+			os.access(options.temp_dir, os.W_OK|os.X_OK)):
+			parser.error(("--temp-dir '%s' is not a "
+				"writable directory") % options.temp_dir)
+
+	if options.distfiles is not None:
+		options.distfiles = normalize_path(
+			os.path.abspath(options.distfiles))
+
+		if not (os.path.isdir(options.distfiles) and
+			os.access(options.distfiles, os.W_OK|os.X_OK)):
+			parser.error(("--distfiles '%s' is not a "
+				"writable directory") % options.distfiles)
+	else:
+		parser.error("missing required --distfiles parameter")
+
+	if options.mirror_overrides is not None:
+		options.mirror_overrides = normalize_path(
+			os.path.abspath(options.mirror_overrides))
+
+		if not (os.access(options.mirror_overrides, os.R_OK) and
+			os.path.isfile(options.mirror_overrides)):
+			parser.error(
+				"--mirror-overrides-file '%s' is not a readable file" %
+				options.mirror_overrides)
+
+	if options.distfiles_local is not None:
+		options.distfiles_local = normalize_path(
+			os.path.abspath(options.distfiles_local))
+
+		if not (os.path.isdir(options.distfiles_local) and
+			os.access(options.distfiles_local, os.W_OK|os.X_OK)):
+			parser.error(("--distfiles-local '%s' is not a "
+				"writable directory") % options.distfiles_local)
+
+	if options.distfiles_db is not None:
+		options.distfiles_db = normalize_path(
+			os.path.abspath(options.distfiles_db))
+
+	if options.tries is not None:
+		options.tries = int(options.tries)
+
+	if options.recycle_dir is not None:
+		options.recycle_dir = normalize_path(
+			os.path.abspath(options.recycle_dir))
+		if not (os.path.isdir(options.recycle_dir) and
+			os.access(options.recycle_dir, os.W_OK|os.X_OK)):
+			parser.error(("--recycle-dir '%s' is not a "
+				"writable directory") % options.recycle_dir)
+
+	if options.recycle_db is not None:
+		if options.recycle_dir is None:
+			parser.error("--recycle-db requires "
+				"--recycle-dir to be specified")
+		options.recycle_db = normalize_path(
+			os.path.abspath(options.recycle_db))
+
+	if options.recycle_deletion_delay is not None:
+		options.recycle_deletion_delay = \
+			long(options.recycle_deletion_delay)
+
+	if options.fetch_log_dir is not None:
+		options.fetch_log_dir = normalize_path(
+			os.path.abspath(options.fetch_log_dir))
+
+		if not (os.path.isdir(options.fetch_log_dir) and
+			os.access(options.fetch_log_dir, os.W_OK|os.X_OK)):
+			parser.error(("--fetch-log-dir '%s' is not a "
+				"writable directory") % options.fetch_log_dir)
+
+	if options.whitelist_from:
+		normalized_paths = []
+		for x in options.whitelist_from:
+			path = normalize_path(os.path.abspath(x))
+			normalized_paths.append(path)
+			if not (os.access(path, os.R_OK) and os.path.isfile(path)):
+				parser.error(
+					"--whitelist-from '%s' is not a readable file" % x)
+		options.whitelist_from = normalized_paths
+
+	if options.strict_manifests is not None:
+		if options.strict_manifests == "y":
+			settings.features.add("strict")
+		else:
+			settings.features.discard("strict")
+
+	settings.lock()
+
+	portdb = portage.portdbapi(mysettings=settings)
+
+	# Limit ebuilds to the specified repo.
+	portdb.porttrees = [repo_path]
+
+	portage.util.initialize_logger()
+
+	if options.verbose > 0:
+		l = logging.getLogger()
+		l.setLevel(l.getEffectiveLevel() - 10 * options.verbose)
+
+	with Config(options, portdb,
+		SchedulerInterface(global_event_loop())) as config:
+
+		if not options.mirror:
+			parser.error('No action specified')
+
+		returncode = os.EX_OK
+
+		if options.mirror:
+			signum = run_main_scheduler(MirrorDistTask(config))
+			if signum is not None:
+				sys.exit(128 + signum)
+
+	return returncode

diff --git a/pym/portage/util/_ShelveUnicodeWrapper.py b/pym/portage/util/_ShelveUnicodeWrapper.py
new file mode 100644
index 0000000..adbd519
--- /dev/null
+++ b/pym/portage/util/_ShelveUnicodeWrapper.py
@@ -0,0 +1,45 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+class ShelveUnicodeWrapper(object):
+	"""
+	Convert unicode to str and back again, since python-2.x shelve
+	module doesn't support unicode.
+	"""
+	def __init__(self, shelve_instance):
+		self._shelve = shelve_instance
+
+	def _encode(self, s):
+		if isinstance(s, unicode):
+			s = s.encode('utf_8')
+		return s
+
+	def __len__(self):
+		return len(self._shelve)
+
+	def __contains__(self, k):
+		return self._encode(k) in self._shelve
+
+	def __iter__(self):
+		return self._shelve.__iter__()
+
+	def items(self):
+		return self._shelve.iteritems()
+
+	def __setitem__(self, k, v):
+		self._shelve[self._encode(k)] = self._encode(v)
+
+	def __getitem__(self, k):
+		return self._shelve[self._encode(k)]
+
+	def __delitem__(self, k):
+		del self._shelve[self._encode(k)]
+
+	def get(self, k, *args):
+		return self._shelve.get(self._encode(k), *args)
+
+	def close(self):
+		self._shelve.close()
+
+	def clear(self):
+		self._shelve.clear()

diff --git a/pym/portage/util/_async/FileCopier.py b/pym/portage/util/_async/FileCopier.py
new file mode 100644
index 0000000..27e5ab4
--- /dev/null
+++ b/pym/portage/util/_async/FileCopier.py
@@ -0,0 +1,17 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage import shutil
+from portage.util._async.ForkProcess import ForkProcess
+
+class FileCopier(ForkProcess):
+	"""
+	Asynchronously copy a file.
+	"""
+
+	__slots__ = ('src_path', 'dest_path')
+
+	def _run(self):
+		shutil.copy(self.src_path, self.dest_path)
+		return os.EX_OK


^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2013-01-09 18:48 UTC | newest]

Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2013-01-09 18:48 [gentoo-commits] proj/portage:master commit in: man/, bin/, pym/portage/util/_async/, pym/portage/_emirrordist/, Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox