public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/portage:master commit in: man/, bin/, pym/portage/util/_async/, ...
@ 2012-10-03  9:37 Zac Medico
  0 siblings, 0 replies; only message in thread
From: Zac Medico @ 2012-10-03  9:37 UTC (permalink / raw
  To: gentoo-commits

commit:     15a799b52155a47568f4b049ff8487a2718b270c
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Oct  3 09:31:41 2012 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Oct  3 09:36:50 2012 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=15a799b5

egencache: add --update-manifests, bug #436918

Update manifest files, and sign them if signing is enabled. This
supports parallelization if enabled via the --jobs option. The
--thin-manifests and --sign-manifests options may be used to manually
override layout.conf settings. There's also a new --strict-manifests
option that may be used to manually override the "strict" FEATURES
setting, a --gpg-key option to override PORTAGE_GPG_KEY, and a
--gpg-dir option to override PORTAGE_GPG_DIR.

---
 bin/egencache                                      |  160 +++++++++++++++++++-
 man/egencache.1                                    |   21 +++
 pym/_emerge/PollScheduler.py                       |    6 +-
 pym/portage/manifest.py                            |   10 +-
 .../ebuild/_parallel_manifest/ManifestProcess.py   |   43 ++++++
 .../ebuild/_parallel_manifest/ManifestScheduler.py |   79 ++++++++++
 .../ebuild/_parallel_manifest/ManifestTask.py      |   75 +++++++++
 .../package/ebuild/_parallel_manifest/__init__.py  |    2 +
 pym/portage/util/_async/AsyncScheduler.py          |   88 +++++++++++
 pym/portage/util/_async/ForkProcess.py             |   48 ++++++
 pym/portage/util/_async/__init__.py                |    2 +
 11 files changed, 527 insertions(+), 7 deletions(-)

diff --git a/bin/egencache b/bin/egencache
index ec62a8c..a72fff7 100755
--- a/bin/egencache
+++ b/bin/egencache
@@ -4,6 +4,7 @@
 
 from __future__ import print_function
 
+import platform
 import signal
 import sys
 # This block ensures that ^C interrupts are handled quietly.
@@ -20,6 +21,17 @@ try:
 except KeyboardInterrupt:
 	sys.exit(128 + signal.SIGINT)
 
+def debug_signal(signum, frame):
+	import pdb
+	pdb.set_trace()
+
+if platform.python_implementation() == 'Jython':
+	debug_signum = signal.SIGUSR2 # bug #424259
+else:
+	debug_signum = signal.SIGUSR1
+
+signal.signal(debug_signum, debug_signal)
+
 import io
 import logging
 import optparse
@@ -36,7 +48,9 @@ from portage import os, _encodings, _unicode_encode, _unicode_decode
 from _emerge.MetadataRegen import MetadataRegen
 from portage.cache.cache_errors import CacheError, StatCollision
 from portage.manifest import guessManifestFileType
+from portage.package.ebuild._parallel_manifest.ManifestScheduler import ManifestScheduler
 from portage.util import cmp_sort_key, writemsg_level
+from portage.util._eventloop.global_event_loop import global_event_loop
 from portage import cpv_getkey
 from portage.dep import Atom, isjustname
 from portage.versions import pkgsplit, vercmp
@@ -72,6 +86,9 @@ def parse_args(args):
 	actions.add_option("--update-changelogs",
 		action="store_true",
 		help="update the ChangeLog files from SCM logs")
+	actions.add_option("--update-manifests",
+		action="store_true",
+		help="update manifests")
 	parser.add_option_group(actions)
 
 	common = optparse.OptionGroup(parser, 'Common options')
@@ -81,12 +98,33 @@ def parse_args(args):
 	common.add_option("--config-root",
 		help="location of portage config files",
 		dest="portage_configroot")
+	common.add_option("--gpg-dir",
+		help="override the PORTAGE_GPG_DIR variable",
+		dest="gpg_dir")
+	common.add_option("--gpg-key",
+		help="override the PORTAGE_GPG_KEY variable",
+		dest="gpg_key")
 	common.add_option("--portdir",
 		help="override the portage tree location",
 		dest="portdir")
 	common.add_option("--portdir-overlay",
 		help="override the PORTDIR_OVERLAY variable (requires that --repo is also specified)",
 		dest="portdir_overlay")
+	common.add_option("--sign-manifests",
+		type="choice",
+		choices=('y', 'n'),
+		metavar="<y|n>",
+		help="manually override layout.conf sign-manifests setting")
+	common.add_option("--strict-manifests",
+		type="choice",
+		choices=('y', 'n'),
+		metavar="<y|n>",
+		help="manually override \"strict\" FEATURES setting")
+	common.add_option("--thin-manifests",
+		type="choice",
+		choices=('y', 'n'),
+		metavar="<y|n>",
+		help="manually override layout.conf thin-manifests setting")
 	common.add_option("--tolerant",
 		action="store_true",
 		help="exit successfully if only minor errors occurred")
@@ -865,8 +903,8 @@ def egencache_main(args):
 		settings = portage.config(config_root=config_root,
 			local_config=False, env=env)
 
-	if not options.update and not options.update_use_local_desc \
-			and not options.update_changelogs:
+	if not (options.update or options.update_use_local_desc or
+			options.update_changelogs or options.update_manifests):
 		parser.error('No action specified')
 		return 1
 
@@ -883,10 +921,17 @@ def egencache_main(args):
 			parser.error("PORTDIR is undefined")
 			return 1
 
+	repo_config = settings.repositories.get_repo_for_location(repo_path)
+
+	if options.strict_manifests is not None:
+		if options.strict_manifests == "y":
+			settings.features.add("strict")
+		else:
+			settings.features.add("discard")
+
 	if options.update and 'metadata-transfer' not in settings.features:
 		# Forcibly enable metadata-transfer if portdbapi has a pregenerated
 		# cache that does not support eclass validation.
-		repo_config = settings.repositories.get_repo_for_location(repo_path)
 		cache = repo_config.get_pregenerated_cache(
 			portage.dbapi.dbapi._known_keys, readonly=True)
 		if cache is not None and not cache.complete_eclass_entries:
@@ -915,6 +960,69 @@ def egencache_main(args):
 					level=logging.ERROR, noiselevel=-1)
 				return 1
 
+	if options.sign_manifests is not None:
+		repo_config.sign_manifest = options.sign_manifests == 'y'
+
+	if options.thin_manifests is not None:
+		repo_config.thin_manifest = options.thin_manifests == 'y'
+
+	gpg_cmd = None
+	gpg_vars = None
+
+	if options.update_manifests:
+		if repo_config.sign_manifest:
+
+			sign_problem = False
+			gpg_dir = None
+			gpg_cmd = settings.get("PORTAGE_GPG_SIGNING_COMMAND")
+			if gpg_cmd is None:
+				writemsg_level("egencache: error: "
+					"PORTAGE_GPG_SIGNING_COMMAND is unset! "
+					"Is make.globals missing?\n",
+					level=logging.ERROR, noiselevel=-1)
+				sign_problem = True
+			elif "${PORTAGE_GPG_KEY}" in gpg_cmd and \
+				options.gpg_key is None and \
+				"PORTAGE_GPG_KEY" not in settings:
+				writemsg_level("egencache: error: "
+					"PORTAGE_GPG_KEY is unset!\n",
+					level=logging.ERROR, noiselevel=-1)
+				sign_problem = True
+			elif "${PORTAGE_GPG_DIR}" in gpg_cmd:
+				if options.gpg_dir is not None:
+					gpg_dir = options.gpg_dir
+				elif "PORTAGE_GPG_DIR" not in settings:
+					gpg_dir = os.path.expanduser("~/.gnupg")
+				else:
+					gpg_dir = os.path.expanduser(settings["PORTAGE_GPG_DIR"])
+				if not os.access(gpg_dir, os.X_OK):
+					writemsg_level(("egencache: error: "
+						"Unable to access directory: "
+						"PORTAGE_GPG_DIR='%s'\n") % gpg_dir,
+						level=logging.ERROR, noiselevel=-1)
+					sign_problem = True
+
+			if sign_problem:
+				writemsg_level("egencache: You may disable manifest "
+					"signatures with --sign-manifests=n or by setting "
+					"\"sign-manifests = false\" in metadata/layout.conf\n",
+					level=logging.ERROR, noiselevel=-1)
+				return 1
+
+			gpg_vars = {}
+			if gpg_dir is not None:
+				gpg_vars["PORTAGE_GPG_DIR"] = gpg_dir
+			gpg_var_names = []
+			if options.gpg_key is None:
+				gpg_var_names.append("PORTAGE_GPG_KEY")
+			else:
+				gpg_vars["PORTAGE_GPG_KEY"] = options.gpg_key
+
+			for k in gpg_var_names:
+				v = settings.get(k)
+				if v is not None:
+					gpg_vars[k] = v
+
 	ret = [os.EX_OK]
 
 	if options.update:
@@ -932,6 +1040,52 @@ def egencache_main(args):
 		else:
 			ret.append(gen_cache.returncode)
 
+	if options.update_manifests:
+
+		cp_iter = None
+		if atoms:
+			cp_iter = iter(atoms)
+
+		event_loop = global_event_loop()
+		scheduler = ManifestScheduler(portdb, cp_iter=cp_iter,
+			gpg_cmd=gpg_cmd, gpg_vars=gpg_vars,
+			max_jobs=options.jobs,
+			max_load=options.load_average,
+			event_loop=event_loop)
+
+		received_signal = []
+
+		def sighandler(signum, frame):
+			signal.signal(signal.SIGINT, signal.SIG_IGN)
+			signal.signal(signal.SIGTERM, signal.SIG_IGN)
+			received_signal.append(128 + signum)
+			scheduler.terminate()
+
+		earlier_sigint_handler = signal.signal(signal.SIGINT, sighandler)
+		earlier_sigterm_handler = signal.signal(signal.SIGTERM, sighandler)
+
+		try:
+			scheduler.start()
+			scheduler.wait()
+		finally:
+			# Restore previous handlers
+			if earlier_sigint_handler is not None:
+				signal.signal(signal.SIGINT, earlier_sigint_handler)
+			else:
+				signal.signal(signal.SIGINT, signal.SIG_DFL)
+			if earlier_sigterm_handler is not None:
+				signal.signal(signal.SIGTERM, earlier_sigterm_handler)
+			else:
+				signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
+		if received_signal:
+			sys.exit(received_signal[0])
+
+		if options.tolerant:
+			ret.append(os.EX_OK)
+		else:
+			ret.append(scheduler.returncode)
+
 	if options.update_use_local_desc:
 		gen_desc = GenUseLocalDesc(portdb,
 			output=options.uld_output,

diff --git a/man/egencache.1 b/man/egencache.1
index 9094595..bc5db67 100644
--- a/man/egencache.1
+++ b/man/egencache.1
@@ -20,6 +20,12 @@ Update the ChangeLog files from SCM logs (supported only in git repos).
 .TP
 .BR "\-\-update\-use\-local\-desc"
 Update the \fIprofiles/use.local.desc\fR file from metadata.xml.
+.TP
+.BR "\-\-update\-manifests"
+Update manifest files, and sign them if signing is enabled. This supports
+parallelization if enabled via the \-\-jobs option. The \-\-thin\-manifests
+and \-\-sign\-manifests options may be used to manually override layout.conf
+settings.
 .SH OPTIONS
 .TP
 .BR "\-\-cache\-dir=CACHE_DIR"
@@ -34,6 +40,12 @@ Location of portage config files.
 .br
 Defaults to /.
 .TP
+.BR "\-\-gpg\-dir"
+Override the PORTAGE_GPG_DIR variable.
+.TP
+.BR "\-\-gpg\-key"
+Override the PORTAGE_GPG_KEY variable.
+.TP
 .BR "\-\-ignore-default-opts"
 Causes \fIEGENCACHE_DEFAULT_OPTS\fR to be ignored.
 .TP
@@ -72,6 +84,15 @@ This option should only be needed for distribution via something like
 more thorough mechanism which allows it to detect changed inode numbers
 (described in \fIracy-git.txt\fR in the git technical docs).
 .TP
+.BR "\-\-sign\-manifests< y | n >"
+Manually override layout.conf sign-manifests setting.
+.TP
+.BR "\-\-strict\-manifests< y | n >"
+Manually override "strict" FEATURES setting.
+.TP
+.BR "\-\-thin\-manifests< y | n >"
+Manually override layout.conf thin-manifests setting.
+.TP
 .BR "\-\-tolerant"
 Exit successfully if only minor errors occurred, such as skipped cache
 updates due to ebuilds that either fail to source or are not sourced

diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index 808fa6e..bcf80ab 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -30,7 +30,7 @@ class PollScheduler(object):
 			"output", "register", "run",
 			"source_remove", "timeout_add", "unregister")
 
-	def __init__(self, main=False):
+	def __init__(self, main=False, event_loop=None):
 		"""
 		@param main: If True then use global_event_loop(), otherwise use
 			a local EventLoop instance (default is False, for safe use in
@@ -44,7 +44,9 @@ class PollScheduler(object):
 		self._jobs = 0
 		self._scheduling = False
 		self._background = False
-		if main:
+		if event_loop is not None:
+			self._event_loop = event_loop
+		elif main:
 			self._event_loop = global_event_loop()
 		else:
 			self._event_loop = EventLoop(main=False)

diff --git a/pym/portage/manifest.py b/pym/portage/manifest.py
index b81b580..9a85c8f 100644
--- a/pym/portage/manifest.py
+++ b/pym/portage/manifest.py
@@ -266,9 +266,12 @@ class Manifest(object):
 						(MANIFEST2_REQUIRED_HASH, t, f))
 
 	def write(self, sign=False, force=False):
-		""" Write Manifest instance to disk, optionally signing it """
+		""" Write Manifest instance to disk, optionally signing it. Returns
+		True if the Manifest is actually written, and False if the write
+		is skipped due to existing Manifest being identical."""
+		rval = False
 		if not self.allow_create:
-			return
+			return rval
 		self.checkIntegrity()
 		try:
 			myentries = list(self._createManifestEntries())
@@ -301,6 +304,7 @@ class Manifest(object):
 					# non-empty for all currently known use cases.
 					write_atomic(self.getFullname(), "".join("%s\n" %
 						_unicode(myentry) for myentry in myentries))
+					rval = True
 				else:
 					# With thin manifest, there's no need to have
 					# a Manifest file if there are no DIST entries.
@@ -309,6 +313,7 @@ class Manifest(object):
 					except OSError as e:
 						if e.errno != errno.ENOENT:
 							raise
+					rval = True
 
 			if sign:
 				self.sign()
@@ -316,6 +321,7 @@ class Manifest(object):
 			if e.errno == errno.EACCES:
 				raise PermissionDenied(str(e))
 			raise
+		return rval
 
 	def sign(self):
 		""" Sign the Manifest """

diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestProcess.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestProcess.py
new file mode 100644
index 0000000..44e2576
--- /dev/null
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestProcess.py
@@ -0,0 +1,43 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import portage
+from portage import os
+from portage.exception import (FileNotFound,
+	PermissionDenied, PortagePackageException)
+from portage.localization import _
+from portage.util._async.ForkProcess import ForkProcess
+
+class ManifestProcess(ForkProcess):
+
+	__slots__ = ("cp", "distdir", "fetchlist_dict", "repo_config")
+
+	MODIFIED = 16
+
+	def _run(self):
+		mf = self.repo_config.load_manifest(
+			os.path.join(self.repo_config.location, self.cp),
+			self.distdir, fetchlist_dict=self.fetchlist_dict)
+
+		try:
+			mf.create(assumeDistHashesAlways=True)
+		except FileNotFound as e:
+			portage.writemsg(_("!!! File %s doesn't exist, can't update "
+				"Manifest\n") % e, noiselevel=-1)
+			return 1
+
+		except PortagePackageException as e:
+			portage.writemsg(("!!! %s\n") % (e,), noiselevel=-1)
+			return 1
+
+		try:
+			modified = mf.write(sign=False)
+		except PermissionDenied as e:
+			portage.writemsg("!!! %s: %s\n" % (_("Permission Denied"), e,),
+				noiselevel=-1)
+			return 1
+		else:
+			if modified:
+				return self.MODIFIED
+			else:
+				return os.EX_OK

diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
new file mode 100644
index 0000000..b480e77
--- /dev/null
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
@@ -0,0 +1,79 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import portage
+from portage import os
+from portage.dep import _repo_separator
+from portage.localization import _
+from portage.util._async.AsyncScheduler import AsyncScheduler
+from .ManifestTask import ManifestTask
+
+class ManifestScheduler(AsyncScheduler):
+
+	def __init__(self, portdb, cp_iter=None,
+		gpg_cmd=None, gpg_vars=None, **kwargs):
+
+		AsyncScheduler.__init__(self, **kwargs)
+
+		self._portdb = portdb
+
+		if cp_iter is None:
+			cp_iter = self._iter_every_cp()
+		self._cp_iter = cp_iter
+		self._gpg_cmd = gpg_cmd
+		self._gpg_vars = gpg_vars
+		self._task_iter = self._iter_tasks()
+
+	def _next_task(self):
+		return next(self._task_iter)
+
+	def _iter_every_cp(self):
+		every_cp = self._portdb.cp_all()
+		every_cp.sort(reverse=True)
+		try:
+			while not self._terminated_tasks:
+				yield every_cp.pop()
+		except IndexError:
+			pass
+
+	def _iter_tasks(self):
+		portdb = self._portdb
+		distdir = portdb.settings["DISTDIR"]
+		disabled_repos = set()
+
+		for cp in self._cp_iter:
+			if self._terminated_tasks:
+				break
+			# We iterate over portdb.porttrees, since it's common to
+			# tweak this attribute in order to adjust repo selection.
+			for mytree in portdb.porttrees:
+				repo_config = portdb.repositories.get_repo_for_location(mytree)
+				if not repo_config.create_manifest:
+					if repo_config.name not in disabled_repos:
+						disabled_repos.add(repo_config.name)
+						portage.writemsg(
+							_(">>> Skipping creating Manifest for %s%s%s; "
+							"repository is configured to not use them\n") %
+							(cp, _repo_separator, repo_config.name),
+							noiselevel=-1)
+					continue
+				cpv_list = portdb.cp_list(cp, mytree=[repo_config.location])
+				if not cpv_list:
+					continue
+				fetchlist_dict = {}
+				for cpv in cpv_list:
+					fetchlist_dict[cpv] = \
+						list(portdb.getFetchMap(cpv, mytree=mytree))
+
+				yield ManifestTask(cp=cp, distdir=distdir,
+					fetchlist_dict=fetchlist_dict, repo_config=repo_config,
+					gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars)
+
+	def _task_exit(self, task):
+		AsyncScheduler._task_exit(self, task)
+		if task.returncode != os.EX_OK:
+			if not self._terminated_tasks:
+				portage.writemsg(
+					"Error processing %s%s%s, continuing...\n" %
+					(task.cp, _repo_separator, task.repo_config.name),
+					noiselevel=-1)

diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
new file mode 100644
index 0000000..53b85b2
--- /dev/null
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
@@ -0,0 +1,75 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.util import shlex_split, varexpand, writemsg
+from _emerge.CompositeTask import CompositeTask
+from _emerge.SpawnProcess import SpawnProcess
+from .ManifestProcess import ManifestProcess
+
+class ManifestTask(CompositeTask):
+
+	__slots__ = ("cp", "distdir", "fetchlist_dict", "gpg_cmd",
+		"gpg_vars", "repo_config", "_manifest_path")
+
+	def _start(self):
+		self._manifest_path = os.path.join(self.repo_config.location,
+			self.cp, "Manifest")
+		manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir,
+			fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config,
+			scheduler=self.scheduler)
+		self._start_task(manifest_proc, self._manifest_proc_exit)
+
+	def _manifest_proc_exit(self, manifest_proc):
+		self._assert_current(manifest_proc)
+		if manifest_proc.returncode not in (os.EX_OK, manifest_proc.MODIFIED):
+			self.returncode = manifest_proc.returncode
+			self._current_task = None
+			self.wait()
+			return
+
+		modified = manifest_proc.returncode == manifest_proc.MODIFIED
+
+		if self.gpg_cmd is None or not modified or \
+			not os.path.exists(self._manifest_path):
+			self.returncode = os.EX_OK
+			self._current_task = None
+			self.wait()
+			return
+
+		self._start_gpg_proc()
+
+	def _start_gpg_proc(self):
+		gpg_vars = self.gpg_vars
+		if gpg_vars is None:
+			gpg_vars = {}
+		else:
+			gpg_vars = gpg_vars.copy()
+		gpg_vars["FILE"] = self._manifest_path
+		gpg_cmd = varexpand(self.gpg_cmd, mydict=gpg_vars)
+		gpg_cmd = shlex_split(gpg_cmd)
+		gpg_proc = SpawnProcess(
+			args=gpg_cmd, env=os.environ, scheduler=self.scheduler)
+		self._start_task(gpg_proc, self._gpg_proc_exit)
+
+	def _gpg_proc_exit(self, gpg_proc):
+		if self._default_exit(gpg_proc) != os.EX_OK:
+			self.wait()
+			return
+
+		rename_args = (self._manifest_path + ".asc", self._manifest_path)
+		try:
+			os.rename(*rename_args)
+		except OSError as e:
+			writemsg("!!! rename('%s', '%s'): %s\n" % rename_args + (e,),
+				noiselevel=-1)
+			try:
+				os.unlink(self._manifest_path + ".asc")
+			except OSError:
+				pass
+			self.returncode = 1
+		else:
+			self.returncode = os.EX_OK
+
+		self._current_task = None
+		self.wait()

diff --git a/pym/portage/package/ebuild/_parallel_manifest/__init__.py b/pym/portage/package/ebuild/_parallel_manifest/__init__.py
new file mode 100644
index 0000000..418ad86
--- /dev/null
+++ b/pym/portage/package/ebuild/_parallel_manifest/__init__.py
@@ -0,0 +1,2 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2

diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py
new file mode 100644
index 0000000..cae45fd
--- /dev/null
+++ b/pym/portage/util/_async/AsyncScheduler.py
@@ -0,0 +1,88 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from _emerge.AsynchronousTask import AsynchronousTask
+from _emerge.PollScheduler import PollScheduler
+
+class AsyncScheduler(AsynchronousTask, PollScheduler):
+
+	__slots__ = ('_error_count', '_loadavg_check_id',
+		'_max_jobs', '_max_load',
+		'_remaining_tasks', '_running_tasks', '_term_check_id')
+
+	def __init__(self, max_jobs=None, max_load=None, **kwargs):
+		AsynchronousTask.__init__(self)
+		PollScheduler.__init__(self, **kwargs)
+
+		if max_jobs is None:
+			max_jobs = 1
+		self._max_jobs = max_jobs
+		self._max_load = max_load
+		self._error_count = 0
+		self._running_tasks = set()
+		self._remaining_tasks = True
+		self._term_check_id = None
+		self._loadavg_check_id = None
+
+	def _cancel(self):
+		self._terminated.set()
+		self._terminate_tasks()
+
+	def _terminate_tasks(self):
+		for task in list(self._running_tasks):
+			task.cancel()
+
+	def _next_task(self):
+		raise NotImplementedError(self)
+
+	def _keep_scheduling(self):
+		return self._remaining_tasks and not self._terminated_tasks
+
+	def _running_job_count(self):
+		return len(self._running_tasks)
+
+	def _schedule_tasks(self):
+		while self._keep_scheduling() and self._can_add_job():
+			try:
+				task = self._next_task()
+			except StopIteration:
+				self._remaining_tasks = False
+			else:
+				self._running_tasks.add(task)
+				task.scheduler = self.sched_iface
+				task.addExitListener(self._task_exit)
+				task.start()
+
+	def _task_exit(self, task):
+		self._running_tasks.discard(task)
+		if task.returncode != os.EX_OK:
+			self._error_count += 1
+		self._schedule()
+
+	def _start(self):
+		self._term_check_id = self.sched_iface.idle_add(self._termination_check)
+		if self._max_load is not None:
+			# We have to schedule periodically, in case the load
+			# average has changed since the last call.
+			self._loadavg_check_id = self.sched_iface.timeout_add(
+				self._loadavg_latency, self._schedule)
+		self._schedule()
+
+	def _wait(self):
+		# Loop while there are jobs to be scheduled.
+		while self._keep_scheduling():
+			self.sched_iface.iteration()
+
+		# Clean shutdown of previously scheduled jobs. In the
+		# case of termination, this allows for basic cleanup
+		# such as flushing of buffered output to logs.
+		while self._is_work_scheduled():
+			self.sched_iface.iteration()
+
+		if self._error_count > 0:
+			self.returncode = 1
+		else:
+			self.returncode = os.EX_OK 
+
+		return self.returncode

diff --git a/pym/portage/util/_async/ForkProcess.py b/pym/portage/util/_async/ForkProcess.py
new file mode 100644
index 0000000..607d0ff
--- /dev/null
+++ b/pym/portage/util/_async/ForkProcess.py
@@ -0,0 +1,48 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import signal
+import traceback
+
+import portage
+from portage import os
+from _emerge.SpawnProcess import SpawnProcess
+
+class ForkProcess(SpawnProcess):
+
+	def _spawn(self, args, fd_pipes=None, **kwargs):
+		"""
+		Fork a subprocess, apply local settings, and call fetch().
+		"""
+
+		pid = os.fork()
+		if pid != 0:
+			if not isinstance(pid, int):
+				raise AssertionError(
+					"fork returned non-integer: %s" % (repr(pid),))
+			portage.process.spawned_pids.append(pid)
+			return [pid]
+
+		portage.locks._close_fds()
+		# Disable close_fds since we don't exec (see _setup_pipes docstring).
+		portage.process._setup_pipes(fd_pipes, close_fds=False)
+
+		# Use default signal handlers in order to avoid problems
+		# killing subprocesses as reported in bug #353239.
+		signal.signal(signal.SIGINT, signal.SIG_DFL)
+		signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
+		rval = 1
+		try:
+			rval = self._run()
+		except SystemExit:
+			raise
+		except:
+			traceback.print_exc()
+		finally:
+			# Call os._exit() from finally block, in order to suppress any
+			# finally blocks from earlier in the call stack. See bug #345289.
+			os._exit(rval)
+
+	def _run(self):
+		raise NotImplementedError(self)

diff --git a/pym/portage/util/_async/__init__.py b/pym/portage/util/_async/__init__.py
new file mode 100644
index 0000000..418ad86
--- /dev/null
+++ b/pym/portage/util/_async/__init__.py
@@ -0,0 +1,2 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2


^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2012-10-03  9:37 UTC | newest]

Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2012-10-03  9:37 [gentoo-commits] proj/portage:master commit in: man/, bin/, pym/portage/util/_async/, Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox