public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-07 18:17 Zac Medico
  0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-07 18:17 UTC (permalink / raw
  To: gentoo-commits

commit:     63e329100d9b6c8bf1c6b87ab417882b9116047e
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Oct  7 18:17:35 2012 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Oct  7 18:17:35 2012 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=63e32910

PollScheduler: split out SchedulerInterface

---
 pym/_emerge/PollScheduler.py                  |   78 ++---------------------
 pym/_emerge/Scheduler.py                      |    9 ++-
 pym/portage/util/_async/SchedulerInterface.py |   86 +++++++++++++++++++++++++
 3 files changed, 97 insertions(+), 76 deletions(-)

diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index 105943f..a341604 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -1,18 +1,12 @@
 # Copyright 1999-2012 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
-import gzip
-import errno
-
 try:
 	import threading
 except ImportError:
 	import dummy_threading as threading
 
-from portage import _encodings
-from portage import _unicode_encode
-from portage.util import writemsg_level
-from portage.util.SlotObject import SlotObject
+from portage.util._async.SchedulerInterface import SchedulerInterface
 from portage.util._eventloop.EventLoop import EventLoop
 from portage.util._eventloop.global_event_loop import global_event_loop
 
@@ -23,13 +17,6 @@ class PollScheduler(object):
 	# max time between loadavg checks (milliseconds)
 	_loadavg_latency = 30000
 
-	class _sched_iface_class(SlotObject):
-		__slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT",
-			"IO_PRI", "child_watch_add",
-			"idle_add", "io_add_watch", "iteration",
-			"output", "run",
-			"source_remove", "timeout_add")
-
 	def __init__(self, main=False, event_loop=None):
 		"""
 		@param main: If True then use global_event_loop(), otherwise use
@@ -49,20 +36,11 @@ class PollScheduler(object):
 			self._event_loop = global_event_loop()
 		else:
 			self._event_loop = EventLoop(main=False)
-		self.sched_iface = self._sched_iface_class(
-			IO_ERR=self._event_loop.IO_ERR,
-			IO_HUP=self._event_loop.IO_HUP,
-			IO_IN=self._event_loop.IO_IN,
-			IO_NVAL=self._event_loop.IO_NVAL,
-			IO_OUT=self._event_loop.IO_OUT,
-			IO_PRI=self._event_loop.IO_PRI,
-			child_watch_add=self._event_loop.child_watch_add,
-			idle_add=self._event_loop.idle_add,
-			io_add_watch=self._event_loop.io_add_watch,
-			iteration=self._event_loop.iteration,
-			output=self._task_output,
-			source_remove=self._event_loop.source_remove,
-			timeout_add=self._event_loop.timeout_add)
+		self.sched_iface = SchedulerInterface(self._event_loop,
+			self._is_background)
+
+	def _is_background(self):
+		return self._background
 
 	def terminate(self):
 		"""
@@ -176,47 +154,3 @@ class PollScheduler(object):
 				return False
 
 		return True
-
-	def _task_output(self, msg, log_path=None, background=None,
-		level=0, noiselevel=-1):
-		"""
-		Output msg to stdout if not self._background. If log_path
-		is not None then append msg to the log (appends with
-		compression if the filename extension of log_path
-		corresponds to a supported compression type).
-		"""
-
-		if background is None:
-			# If the task does not have a local background value
-			# (like for parallel-fetch), then use the global value.
-			background = self._background
-
-		msg_shown = False
-		if not background:
-			writemsg_level(msg, level=level, noiselevel=noiselevel)
-			msg_shown = True
-
-		if log_path is not None:
-			try:
-				f = open(_unicode_encode(log_path,
-					encoding=_encodings['fs'], errors='strict'),
-					mode='ab')
-				f_real = f
-			except IOError as e:
-				if e.errno not in (errno.ENOENT, errno.ESTALE):
-					raise
-				if not msg_shown:
-					writemsg_level(msg, level=level, noiselevel=noiselevel)
-			else:
-
-				if log_path.endswith('.gz'):
-					# NOTE: The empty filename argument prevents us from
-					# triggering a bug in python3 which causes GzipFile
-					# to raise AttributeError if fileobj.name is bytes
-					# instead of unicode.
-					f =  gzip.GzipFile(filename='', mode='ab', fileobj=f)
-
-				f.write(_unicode_encode(msg))
-				f.close()
-				if f_real is not f:
-					f_real.close()

diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index c5779cb..f62f6e7 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -28,6 +28,7 @@ from portage._sets import SETPREFIX
 from portage._sets.base import InternalPackageSet
 from portage.util import ensure_dirs, writemsg, writemsg_level
 from portage.util.SlotObject import SlotObject
+from portage.util._async.SchedulerInterface import SchedulerInterface
 from portage.package.ebuild.digestcheck import digestcheck
 from portage.package.ebuild.digestgen import digestgen
 from portage.package.ebuild.doebuild import (_check_temp_dir,
@@ -79,7 +80,7 @@ class Scheduler(PollScheduler):
 	_opts_no_self_update = frozenset(["--buildpkgonly",
 		"--fetchonly", "--fetch-all-uri", "--pretend"])
 
-	class _iface_class(PollScheduler._sched_iface_class):
+	class _iface_class(SchedulerInterface):
 		__slots__ = ("fetch",
 			"scheduleSetup", "scheduleUnpack")
 
@@ -215,11 +216,11 @@ class Scheduler(PollScheduler):
 		fetch_iface = self._fetch_iface_class(log_file=self._fetch_log,
 			schedule=self._schedule_fetch)
 		self._sched_iface = self._iface_class(
+			self._event_loop,
+			self._is_background,
 			fetch=fetch_iface,
 			scheduleSetup=self._schedule_setup,
-			scheduleUnpack=self._schedule_unpack,
-			**dict((k, getattr(self.sched_iface, k))
-			for k in self.sched_iface.__slots__))
+			scheduleUnpack=self._schedule_unpack)
 
 		self._prefetchers = weakref.WeakValueDictionary()
 		self._pkg_queue = []

diff --git a/pym/portage/util/_async/SchedulerInterface.py b/pym/portage/util/_async/SchedulerInterface.py
new file mode 100644
index 0000000..04c6efb
--- /dev/null
+++ b/pym/portage/util/_async/SchedulerInterface.py
@@ -0,0 +1,86 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import gzip
+import errno
+
+from portage import _encodings
+from portage import _unicode_encode
+from portage.util import writemsg_level
+from ..SlotObject import SlotObject
+
+class SchedulerInterface(SlotObject):
+
+	__slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT", "IO_PRI",
+		"child_watch_add", "idle_add", "io_add_watch", "iteration",
+		"source_remove", "timeout_add", "_event_loop", "_is_background")
+
+	def __init__(self, event_loop, is_background=None, **kwargs):
+		SlotObject.__init__(self, **kwargs)
+		self._event_loop = event_loop
+		if is_background is None:
+			is_background = self._return_false
+		self._is_background = is_background
+		self.IO_ERR = event_loop.IO_ERR
+		self.IO_HUP = event_loop.IO_HUP
+		self.IO_IN = event_loop.IO_IN
+		self.IO_NVAL = event_loop.IO_NVAL
+		self.IO_OUT = event_loop.IO_OUT
+		self.IO_PRI = event_loop.IO_PRI
+		self.child_watch_add = event_loop.child_watch_add
+		self.idle_add = event_loop.idle_add
+		self.io_add_watch = event_loop.io_add_watch
+		self.iteration = event_loop.iteration
+		self.source_remove = event_loop.source_remove
+		self.timeout_add = event_loop.timeout_add
+
+	@staticmethod
+	def _return_false(self):
+		return False
+
+	def output(self, msg, log_path=None, background=None,
+		level=0, noiselevel=-1):
+		"""
+		Output msg to stdout if not self._background_cb(). If log_path
+		is not None then append msg to the log (appends with
+		compression if the filename extension of log_path corresponds
+		to a supported compression type).
+		"""
+
+		global_background = self._is_background()
+		if background is None or global_background:
+			# Use the global value if the task does not have a local
+			# background value. For example, parallel-fetch tasks run
+			# in the background while other tasks concurrently run in
+			# the foreground.
+			background = global_background
+
+		msg_shown = False
+		if not background:
+			writemsg_level(msg, level=level, noiselevel=noiselevel)
+			msg_shown = True
+
+		if log_path is not None:
+			try:
+				f = open(_unicode_encode(log_path,
+					encoding=_encodings['fs'], errors='strict'),
+					mode='ab')
+				f_real = f
+			except IOError as e:
+				if e.errno not in (errno.ENOENT, errno.ESTALE):
+					raise
+				if not msg_shown:
+					writemsg_level(msg, level=level, noiselevel=noiselevel)
+			else:
+
+				if log_path.endswith('.gz'):
+					# NOTE: The empty filename argument prevents us from
+					# triggering a bug in python3 which causes GzipFile
+					# to raise AttributeError if fileobj.name is bytes
+					# instead of unicode.
+					f =  gzip.GzipFile(filename='', mode='ab', fileobj=f)
+
+				f.write(_unicode_encode(msg))
+				f.close()
+				if f_real is not f:
+					f_real.close()


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-07 18:22 Zac Medico
  0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-07 18:22 UTC (permalink / raw
  To: gentoo-commits

commit:     2932012a0e22856b14473f7ecb26fbee91a37c20
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Oct  7 18:17:35 2012 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Oct  7 18:21:52 2012 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=2932012a

PollScheduler: split out SchedulerInterface

---
 pym/_emerge/PollScheduler.py                  |   78 ++---------------------
 pym/_emerge/Scheduler.py                      |    9 ++-
 pym/portage/util/_async/SchedulerInterface.py |   86 +++++++++++++++++++++++++
 3 files changed, 97 insertions(+), 76 deletions(-)

diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index 105943f..a341604 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -1,18 +1,12 @@
 # Copyright 1999-2012 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
-import gzip
-import errno
-
 try:
 	import threading
 except ImportError:
 	import dummy_threading as threading
 
-from portage import _encodings
-from portage import _unicode_encode
-from portage.util import writemsg_level
-from portage.util.SlotObject import SlotObject
+from portage.util._async.SchedulerInterface import SchedulerInterface
 from portage.util._eventloop.EventLoop import EventLoop
 from portage.util._eventloop.global_event_loop import global_event_loop
 
@@ -23,13 +17,6 @@ class PollScheduler(object):
 	# max time between loadavg checks (milliseconds)
 	_loadavg_latency = 30000
 
-	class _sched_iface_class(SlotObject):
-		__slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT",
-			"IO_PRI", "child_watch_add",
-			"idle_add", "io_add_watch", "iteration",
-			"output", "run",
-			"source_remove", "timeout_add")
-
 	def __init__(self, main=False, event_loop=None):
 		"""
 		@param main: If True then use global_event_loop(), otherwise use
@@ -49,20 +36,11 @@ class PollScheduler(object):
 			self._event_loop = global_event_loop()
 		else:
 			self._event_loop = EventLoop(main=False)
-		self.sched_iface = self._sched_iface_class(
-			IO_ERR=self._event_loop.IO_ERR,
-			IO_HUP=self._event_loop.IO_HUP,
-			IO_IN=self._event_loop.IO_IN,
-			IO_NVAL=self._event_loop.IO_NVAL,
-			IO_OUT=self._event_loop.IO_OUT,
-			IO_PRI=self._event_loop.IO_PRI,
-			child_watch_add=self._event_loop.child_watch_add,
-			idle_add=self._event_loop.idle_add,
-			io_add_watch=self._event_loop.io_add_watch,
-			iteration=self._event_loop.iteration,
-			output=self._task_output,
-			source_remove=self._event_loop.source_remove,
-			timeout_add=self._event_loop.timeout_add)
+		self.sched_iface = SchedulerInterface(self._event_loop,
+			self._is_background)
+
+	def _is_background(self):
+		return self._background
 
 	def terminate(self):
 		"""
@@ -176,47 +154,3 @@ class PollScheduler(object):
 				return False
 
 		return True
-
-	def _task_output(self, msg, log_path=None, background=None,
-		level=0, noiselevel=-1):
-		"""
-		Output msg to stdout if not self._background. If log_path
-		is not None then append msg to the log (appends with
-		compression if the filename extension of log_path
-		corresponds to a supported compression type).
-		"""
-
-		if background is None:
-			# If the task does not have a local background value
-			# (like for parallel-fetch), then use the global value.
-			background = self._background
-
-		msg_shown = False
-		if not background:
-			writemsg_level(msg, level=level, noiselevel=noiselevel)
-			msg_shown = True
-
-		if log_path is not None:
-			try:
-				f = open(_unicode_encode(log_path,
-					encoding=_encodings['fs'], errors='strict'),
-					mode='ab')
-				f_real = f
-			except IOError as e:
-				if e.errno not in (errno.ENOENT, errno.ESTALE):
-					raise
-				if not msg_shown:
-					writemsg_level(msg, level=level, noiselevel=noiselevel)
-			else:
-
-				if log_path.endswith('.gz'):
-					# NOTE: The empty filename argument prevents us from
-					# triggering a bug in python3 which causes GzipFile
-					# to raise AttributeError if fileobj.name is bytes
-					# instead of unicode.
-					f =  gzip.GzipFile(filename='', mode='ab', fileobj=f)
-
-				f.write(_unicode_encode(msg))
-				f.close()
-				if f_real is not f:
-					f_real.close()

diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index c5779cb..f62f6e7 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -28,6 +28,7 @@ from portage._sets import SETPREFIX
 from portage._sets.base import InternalPackageSet
 from portage.util import ensure_dirs, writemsg, writemsg_level
 from portage.util.SlotObject import SlotObject
+from portage.util._async.SchedulerInterface import SchedulerInterface
 from portage.package.ebuild.digestcheck import digestcheck
 from portage.package.ebuild.digestgen import digestgen
 from portage.package.ebuild.doebuild import (_check_temp_dir,
@@ -79,7 +80,7 @@ class Scheduler(PollScheduler):
 	_opts_no_self_update = frozenset(["--buildpkgonly",
 		"--fetchonly", "--fetch-all-uri", "--pretend"])
 
-	class _iface_class(PollScheduler._sched_iface_class):
+	class _iface_class(SchedulerInterface):
 		__slots__ = ("fetch",
 			"scheduleSetup", "scheduleUnpack")
 
@@ -215,11 +216,11 @@ class Scheduler(PollScheduler):
 		fetch_iface = self._fetch_iface_class(log_file=self._fetch_log,
 			schedule=self._schedule_fetch)
 		self._sched_iface = self._iface_class(
+			self._event_loop,
+			self._is_background,
 			fetch=fetch_iface,
 			scheduleSetup=self._schedule_setup,
-			scheduleUnpack=self._schedule_unpack,
-			**dict((k, getattr(self.sched_iface, k))
-			for k in self.sched_iface.__slots__))
+			scheduleUnpack=self._schedule_unpack)
 
 		self._prefetchers = weakref.WeakValueDictionary()
 		self._pkg_queue = []

diff --git a/pym/portage/util/_async/SchedulerInterface.py b/pym/portage/util/_async/SchedulerInterface.py
new file mode 100644
index 0000000..731f521
--- /dev/null
+++ b/pym/portage/util/_async/SchedulerInterface.py
@@ -0,0 +1,86 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import gzip
+import errno
+
+from portage import _encodings
+from portage import _unicode_encode
+from portage.util import writemsg_level
+from ..SlotObject import SlotObject
+
+class SchedulerInterface(SlotObject):
+
+	__slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT", "IO_PRI",
+		"child_watch_add", "idle_add", "io_add_watch", "iteration",
+		"source_remove", "timeout_add", "_event_loop", "_is_background")
+
+	def __init__(self, event_loop, is_background=None, **kwargs):
+		SlotObject.__init__(self, **kwargs)
+		self._event_loop = event_loop
+		if is_background is None:
+			is_background = self._return_false
+		self._is_background = is_background
+		self.IO_ERR = event_loop.IO_ERR
+		self.IO_HUP = event_loop.IO_HUP
+		self.IO_IN = event_loop.IO_IN
+		self.IO_NVAL = event_loop.IO_NVAL
+		self.IO_OUT = event_loop.IO_OUT
+		self.IO_PRI = event_loop.IO_PRI
+		self.child_watch_add = event_loop.child_watch_add
+		self.idle_add = event_loop.idle_add
+		self.io_add_watch = event_loop.io_add_watch
+		self.iteration = event_loop.iteration
+		self.source_remove = event_loop.source_remove
+		self.timeout_add = event_loop.timeout_add
+
+	@staticmethod
+	def _return_false():
+		return False
+
+	def output(self, msg, log_path=None, background=None,
+		level=0, noiselevel=-1):
+		"""
+		Output msg to stdout if not self._is_background(). If log_path
+		is not None then append msg to the log (appends with
+		compression if the filename extension of log_path corresponds
+		to a supported compression type).
+		"""
+
+		global_background = self._is_background()
+		if background is None or global_background:
+			# Use the global value if the task does not have a local
+			# background value. For example, parallel-fetch tasks run
+			# in the background while other tasks concurrently run in
+			# the foreground.
+			background = global_background
+
+		msg_shown = False
+		if not background:
+			writemsg_level(msg, level=level, noiselevel=noiselevel)
+			msg_shown = True
+
+		if log_path is not None:
+			try:
+				f = open(_unicode_encode(log_path,
+					encoding=_encodings['fs'], errors='strict'),
+					mode='ab')
+				f_real = f
+			except IOError as e:
+				if e.errno not in (errno.ENOENT, errno.ESTALE):
+					raise
+				if not msg_shown:
+					writemsg_level(msg, level=level, noiselevel=noiselevel)
+			else:
+
+				if log_path.endswith('.gz'):
+					# NOTE: The empty filename argument prevents us from
+					# triggering a bug in python3 which causes GzipFile
+					# to raise AttributeError if fileobj.name is bytes
+					# instead of unicode.
+					f =  gzip.GzipFile(filename='', mode='ab', fileobj=f)
+
+				f.write(_unicode_encode(msg))
+				f.close()
+				if f_real is not f:
+					f_real.close()


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-07 18:51 Zac Medico
  0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-07 18:51 UTC (permalink / raw
  To: gentoo-commits

commit:     2bc9b7c6147fb30838773b52674f53bb920bab72
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Oct  7 18:17:35 2012 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Oct  7 18:51:26 2012 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=2bc9b7c6

PollScheduler: split out SchedulerInterface

---
 pym/_emerge/PollScheduler.py                  |   78 ++---------------------
 pym/_emerge/Scheduler.py                      |   14 ++--
 pym/portage/util/_async/SchedulerInterface.py |   86 +++++++++++++++++++++++++
 3 files changed, 100 insertions(+), 78 deletions(-)

diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index 105943f..8d5c290 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -1,18 +1,12 @@
 # Copyright 1999-2012 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
-import gzip
-import errno
-
 try:
 	import threading
 except ImportError:
 	import dummy_threading as threading
 
-from portage import _encodings
-from portage import _unicode_encode
-from portage.util import writemsg_level
-from portage.util.SlotObject import SlotObject
+from portage.util._async.SchedulerInterface import SchedulerInterface
 from portage.util._eventloop.EventLoop import EventLoop
 from portage.util._eventloop.global_event_loop import global_event_loop
 
@@ -23,13 +17,6 @@ class PollScheduler(object):
 	# max time between loadavg checks (milliseconds)
 	_loadavg_latency = 30000
 
-	class _sched_iface_class(SlotObject):
-		__slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT",
-			"IO_PRI", "child_watch_add",
-			"idle_add", "io_add_watch", "iteration",
-			"output", "run",
-			"source_remove", "timeout_add")
-
 	def __init__(self, main=False, event_loop=None):
 		"""
 		@param main: If True then use global_event_loop(), otherwise use
@@ -49,20 +36,11 @@ class PollScheduler(object):
 			self._event_loop = global_event_loop()
 		else:
 			self._event_loop = EventLoop(main=False)
-		self.sched_iface = self._sched_iface_class(
-			IO_ERR=self._event_loop.IO_ERR,
-			IO_HUP=self._event_loop.IO_HUP,
-			IO_IN=self._event_loop.IO_IN,
-			IO_NVAL=self._event_loop.IO_NVAL,
-			IO_OUT=self._event_loop.IO_OUT,
-			IO_PRI=self._event_loop.IO_PRI,
-			child_watch_add=self._event_loop.child_watch_add,
-			idle_add=self._event_loop.idle_add,
-			io_add_watch=self._event_loop.io_add_watch,
-			iteration=self._event_loop.iteration,
-			output=self._task_output,
-			source_remove=self._event_loop.source_remove,
-			timeout_add=self._event_loop.timeout_add)
+		self.sched_iface = SchedulerInterface(self._event_loop,
+			is_background=self._is_background)
+
+	def _is_background(self):
+		return self._background
 
 	def terminate(self):
 		"""
@@ -176,47 +154,3 @@ class PollScheduler(object):
 				return False
 
 		return True
-
-	def _task_output(self, msg, log_path=None, background=None,
-		level=0, noiselevel=-1):
-		"""
-		Output msg to stdout if not self._background. If log_path
-		is not None then append msg to the log (appends with
-		compression if the filename extension of log_path
-		corresponds to a supported compression type).
-		"""
-
-		if background is None:
-			# If the task does not have a local background value
-			# (like for parallel-fetch), then use the global value.
-			background = self._background
-
-		msg_shown = False
-		if not background:
-			writemsg_level(msg, level=level, noiselevel=noiselevel)
-			msg_shown = True
-
-		if log_path is not None:
-			try:
-				f = open(_unicode_encode(log_path,
-					encoding=_encodings['fs'], errors='strict'),
-					mode='ab')
-				f_real = f
-			except IOError as e:
-				if e.errno not in (errno.ENOENT, errno.ESTALE):
-					raise
-				if not msg_shown:
-					writemsg_level(msg, level=level, noiselevel=noiselevel)
-			else:
-
-				if log_path.endswith('.gz'):
-					# NOTE: The empty filename argument prevents us from
-					# triggering a bug in python3 which causes GzipFile
-					# to raise AttributeError if fileobj.name is bytes
-					# instead of unicode.
-					f =  gzip.GzipFile(filename='', mode='ab', fileobj=f)
-
-				f.write(_unicode_encode(msg))
-				f.close()
-				if f_real is not f:
-					f_real.close()

diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index c5779cb..d64468a 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -28,6 +28,8 @@ from portage._sets import SETPREFIX
 from portage._sets.base import InternalPackageSet
 from portage.util import ensure_dirs, writemsg, writemsg_level
 from portage.util.SlotObject import SlotObject
+from portage.util._async.SchedulerInterface import SchedulerInterface
+from portage.util._eventloop.EventLoop import EventLoop
 from portage.package.ebuild.digestcheck import digestcheck
 from portage.package.ebuild.digestgen import digestgen
 from portage.package.ebuild.doebuild import (_check_temp_dir,
@@ -79,7 +81,7 @@ class Scheduler(PollScheduler):
 	_opts_no_self_update = frozenset(["--buildpkgonly",
 		"--fetchonly", "--fetch-all-uri", "--pretend"])
 
-	class _iface_class(PollScheduler._sched_iface_class):
+	class _iface_class(SchedulerInterface):
 		__slots__ = ("fetch",
 			"scheduleSetup", "scheduleUnpack")
 
@@ -215,11 +217,11 @@ class Scheduler(PollScheduler):
 		fetch_iface = self._fetch_iface_class(log_file=self._fetch_log,
 			schedule=self._schedule_fetch)
 		self._sched_iface = self._iface_class(
+			self._event_loop,
+			is_background=self._is_background,
 			fetch=fetch_iface,
 			scheduleSetup=self._schedule_setup,
-			scheduleUnpack=self._schedule_unpack,
-			**dict((k, getattr(self.sched_iface, k))
-			for k in self.sched_iface.__slots__))
+			scheduleUnpack=self._schedule_unpack)
 
 		self._prefetchers = weakref.WeakValueDictionary()
 		self._pkg_queue = []
@@ -767,10 +769,10 @@ class Scheduler(PollScheduler):
 
 		failures = 0
 
-		# Use a local PollScheduler instance here, since we don't
+		# Use a local EventLoop instance here, since we don't
 		# want tasks here to trigger the usual Scheduler callbacks
 		# that handle job scheduling and status display.
-		sched_iface = PollScheduler().sched_iface
+		sched_iface = SchedulerInterface(EventLoop(main=False))
 
 		for x in self._mergelist:
 			if not isinstance(x, Package):

diff --git a/pym/portage/util/_async/SchedulerInterface.py b/pym/portage/util/_async/SchedulerInterface.py
new file mode 100644
index 0000000..731f521
--- /dev/null
+++ b/pym/portage/util/_async/SchedulerInterface.py
@@ -0,0 +1,86 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import gzip
+import errno
+
+from portage import _encodings
+from portage import _unicode_encode
+from portage.util import writemsg_level
+from ..SlotObject import SlotObject
+
+class SchedulerInterface(SlotObject):
+
+	__slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT", "IO_PRI",
+		"child_watch_add", "idle_add", "io_add_watch", "iteration",
+		"source_remove", "timeout_add", "_event_loop", "_is_background")
+
+	def __init__(self, event_loop, is_background=None, **kwargs):
+		SlotObject.__init__(self, **kwargs)
+		self._event_loop = event_loop
+		if is_background is None:
+			is_background = self._return_false
+		self._is_background = is_background
+		self.IO_ERR = event_loop.IO_ERR
+		self.IO_HUP = event_loop.IO_HUP
+		self.IO_IN = event_loop.IO_IN
+		self.IO_NVAL = event_loop.IO_NVAL
+		self.IO_OUT = event_loop.IO_OUT
+		self.IO_PRI = event_loop.IO_PRI
+		self.child_watch_add = event_loop.child_watch_add
+		self.idle_add = event_loop.idle_add
+		self.io_add_watch = event_loop.io_add_watch
+		self.iteration = event_loop.iteration
+		self.source_remove = event_loop.source_remove
+		self.timeout_add = event_loop.timeout_add
+
+	@staticmethod
+	def _return_false():
+		return False
+
+	def output(self, msg, log_path=None, background=None,
+		level=0, noiselevel=-1):
+		"""
+		Output msg to stdout if not self._is_background(). If log_path
+		is not None then append msg to the log (appends with
+		compression if the filename extension of log_path corresponds
+		to a supported compression type).
+		"""
+
+		global_background = self._is_background()
+		if background is None or global_background:
+			# Use the global value if the task does not have a local
+			# background value. For example, parallel-fetch tasks run
+			# in the background while other tasks concurrently run in
+			# the foreground.
+			background = global_background
+
+		msg_shown = False
+		if not background:
+			writemsg_level(msg, level=level, noiselevel=noiselevel)
+			msg_shown = True
+
+		if log_path is not None:
+			try:
+				f = open(_unicode_encode(log_path,
+					encoding=_encodings['fs'], errors='strict'),
+					mode='ab')
+				f_real = f
+			except IOError as e:
+				if e.errno not in (errno.ENOENT, errno.ESTALE):
+					raise
+				if not msg_shown:
+					writemsg_level(msg, level=level, noiselevel=noiselevel)
+			else:
+
+				if log_path.endswith('.gz'):
+					# NOTE: The empty filename argument prevents us from
+					# triggering a bug in python3 which causes GzipFile
+					# to raise AttributeError if fileobj.name is bytes
+					# instead of unicode.
+					f =  gzip.GzipFile(filename='', mode='ab', fileobj=f)
+
+				f.write(_unicode_encode(msg))
+				f.close()
+				if f_real is not f:
+					f_real.close()


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-08 20:44 Zac Medico
  0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-08 20:44 UTC (permalink / raw
  To: gentoo-commits

commit:     91b253ba2f249b19d6cc68f51cd6b909a765a715
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Oct  8 20:44:22 2012 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Oct  8 20:44:22 2012 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=91b253ba

PollScheduler: disable default _loadavg_latency

Move the 30 second default to the Scheduler class, since that's the
only place that it's currently needed (all other schedulers have
relatively short-running jobs).

---
 pym/_emerge/PollScheduler.py              |    2 +-
 pym/_emerge/Scheduler.py                  |    7 ++++++-
 pym/portage/util/_async/AsyncScheduler.py |    4 +++-
 3 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index 1a81e2d..d02b0da 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -15,7 +15,7 @@ from _emerge.getloadavg import getloadavg
 class PollScheduler(object):
 
 	# max time between loadavg checks (milliseconds)
-	_loadavg_latency = 30000
+	_loadavg_latency = None
 
 	def __init__(self, main=False, event_loop=None):
 		"""

diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index 40033c2..3b55b2b 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -66,6 +66,9 @@ if sys.hexversion >= 0x3000000:
 
 class Scheduler(PollScheduler):
 
+	# max time between loadavg checks (milliseconds)
+	_loadavg_latency = 30000
+
 	# max time between display status updates (milliseconds)
 	_max_display_latency = 3000
 
@@ -1339,7 +1342,9 @@ class Scheduler(PollScheduler):
 	def _main_loop(self):
 		term_check_id = self._event_loop.idle_add(self._termination_check)
 		loadavg_check_id = None
-		if self._max_load is not None:
+		if self._max_load is not None and \
+			self._loadavg_latency is not None and \
+			(self._max_jobs is True or self._max_jobs > 1):
 			# We have to schedule periodically, in case the load
 			# average has changed since the last call.
 			loadavg_check_id = self._event_loop.timeout_add(

diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py
index c6a37f5..182e19e 100644
--- a/pym/portage/util/_async/AsyncScheduler.py
+++ b/pym/portage/util/_async/AsyncScheduler.py
@@ -66,7 +66,9 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
 
 	def _start(self):
 		self._term_check_id = self._event_loop.idle_add(self._termination_check)
-		if self._max_load is not None:
+		if self._max_load is not None and \
+			self._loadavg_latency is not None and \
+			(self._max_jobs is True or self._max_jobs > 1):
 			# We have to schedule periodically, in case the load
 			# average has changed since the last call.
 			self._loadavg_check_id = self._event_loop.timeout_add(


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-16 19:28 Zac Medico
  0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-16 19:28 UTC (permalink / raw
  To: gentoo-commits

commit:     6de6ecc0049160f049f889b81c128e682225c55b
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Oct 16 19:27:46 2012 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Oct 16 19:27:46 2012 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=6de6ecc0

CompositeTask._start_task: propagate scheduler

---
 pym/_emerge/CompositeTask.py            |    4 ++++
 pym/portage/util/_async/PopenProcess.py |    4 ++++
 2 files changed, 8 insertions(+), 0 deletions(-)

diff --git a/pym/_emerge/CompositeTask.py b/pym/_emerge/CompositeTask.py
index 3e43478..40cf859 100644
--- a/pym/_emerge/CompositeTask.py
+++ b/pym/_emerge/CompositeTask.py
@@ -142,6 +142,10 @@ class CompositeTask(AsynchronousTask):
 		a task.
 
 		"""
+		try:
+			task.scheduler = self.scheduler
+		except AttributeError:
+			pass
 		task.addExitListener(exit_handler)
 		self._current_task = task
 		task.start()

diff --git a/pym/portage/util/_async/PopenProcess.py b/pym/portage/util/_async/PopenProcess.py
index c3ae6e9..37d32f2 100644
--- a/pym/portage/util/_async/PopenProcess.py
+++ b/pym/portage/util/_async/PopenProcess.py
@@ -14,6 +14,10 @@ class PopenProcess(SubProcess):
 
 	def _start(self):
 		if self.pipe_reader is not None:
+			try:
+				self.pipe_reader.scheduler = self.scheduler
+			except AttributeError:
+				pass
 			self.pipe_reader.addExitListener(self._pipe_reader_exit)
 			self.pipe_reader.start()
 


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-19  1:15 Zac Medico
  0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-19  1:15 UTC (permalink / raw
  To: gentoo-commits

commit:     5a7690fd10a6935e756495590c9c2b6a19aa8139
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Fri Oct 19 01:15:00 2012 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Oct 19 01:15:00 2012 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=5a7690fd

SpawnProcess: split out a PipeLogger class

The copyright dates for these classes begin in 2008, since SpawnProcess
code is derived from the EbuildFetcherAsync class which was added in
commit e4edadf5ae7063f375d76be151c6d0e949980ecf in 2008.

---
 pym/_emerge/SpawnProcess.py           |  165 +++++++--------------------------
 pym/portage/util/_async/PipeLogger.py |  149 +++++++++++++++++++++++++++++
 2 files changed, 183 insertions(+), 131 deletions(-)

diff --git a/pym/_emerge/SpawnProcess.py b/pym/_emerge/SpawnProcess.py
index ab152c3..d18512b 100644
--- a/pym/_emerge/SpawnProcess.py
+++ b/pym/_emerge/SpawnProcess.py
@@ -1,17 +1,12 @@
-# Copyright 1999-2012 Gentoo Foundation
+# Copyright 2008-2012 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 from _emerge.SubProcess import SubProcess
 import sys
-from portage.cache.mappings import slot_dict_class
 import portage
-from portage import _encodings
-from portage import _unicode_encode
 from portage import os
 from portage.const import BASH_BINARY
-import fcntl
-import errno
-import gzip
+from portage.util._async.PipeLogger import PipeLogger
 
 class SpawnProcess(SubProcess):
 
@@ -26,10 +21,7 @@ class SpawnProcess(SubProcess):
 		"path_lookup", "pre_exec")
 
 	__slots__ = ("args",) + \
-		_spawn_kwarg_names + ("_log_file_real", "_selinux_type",)
-
-	_file_names = ("log", "process", "stdout")
-	_files_dict = slot_dict_class(_file_names, prefix="")
+		_spawn_kwarg_names + ("_pipe_logger", "_selinux_type",)
 
 	def _start(self):
 
@@ -37,17 +29,13 @@ class SpawnProcess(SubProcess):
 			self.fd_pipes = {}
 		fd_pipes = self.fd_pipes
 
-		self._files = self._files_dict()
-		files = self._files
-
 		master_fd, slave_fd = self._pipe(fd_pipes)
-		fcntl.fcntl(master_fd, fcntl.F_SETFL,
-			fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
-		files.process = master_fd
 
-		logfile = None
-		if self._can_log(slave_fd):
-			logfile = self.logfile
+		can_log = self._can_log(slave_fd)
+		if can_log:
+			log_file_path = self.logfile
+		else:
+			log_file_path = None
 
 		null_input = None
 		if not self.background or 0 in fd_pipes:
@@ -74,37 +62,19 @@ class SpawnProcess(SubProcess):
 				sys.__stderr__.flush()
 				break
 
-		if logfile is not None:
+		fd_pipes_orig = fd_pipes.copy()
 
-			fd_pipes_orig = fd_pipes.copy()
+		if log_file_path is not None:
 			fd_pipes[1] = slave_fd
 			fd_pipes[2] = slave_fd
 
-			files.log = open(_unicode_encode(logfile,
-				encoding=_encodings['fs'], errors='strict'), mode='ab')
-			if logfile.endswith('.gz'):
-				self._log_file_real = files.log
-				files.log = gzip.GzipFile(filename='', mode='ab',
-					fileobj=files.log)
-
-			portage.util.apply_secpass_permissions(logfile,
-				uid=portage.portage_uid, gid=portage.portage_gid,
-				mode=0o660)
-
-			if not self.background:
-				files.stdout = os.dup(fd_pipes_orig[1])
-
-			output_handler = self._output_handler
-
 		else:
-
 			# Create a dummy pipe so the scheduler can monitor
 			# the process from inside a poll() loop.
 			fd_pipes[self._dummy_pipe_fd] = slave_fd
 			if self.background:
 				fd_pipes[1] = slave_fd
 				fd_pipes[2] = slave_fd
-			output_handler = self._dummy_handler
 
 		kwargs = {}
 		for k in self._spawn_kwarg_names:
@@ -116,10 +86,6 @@ class SpawnProcess(SubProcess):
 		kwargs["returnpid"] = True
 		kwargs.pop("logfile", None)
 
-		self._reg_id = self.scheduler.io_add_watch(files.process,
-			self._registered_events, output_handler)
-		self._registered = True
-
 		retval = self._spawn(self.args, **kwargs)
 
 		os.close(slave_fd)
@@ -136,6 +102,18 @@ class SpawnProcess(SubProcess):
 		self.pid = retval[0]
 		portage.process.spawned_pids.remove(self.pid)
 
+		stdout_fd = None
+		if can_log and not self.background:
+			stdout_fd = os.dup(fd_pipes_orig[1])
+
+		self._pipe_logger = PipeLogger(background=self.background,
+			scheduler=self.scheduler, input_fd=master_fd,
+			log_file_path=log_file_path,
+			stdout_fd=stdout_fd)
+		self._pipe_logger.addExitListener(self._pipe_logger_exit)
+		self._pipe_logger.start()
+		self._registered = True
+
 	def _can_log(self, slave_fd):
 		return True
 
@@ -158,92 +136,17 @@ class SpawnProcess(SubProcess):
 
 		return spawn_func(args, **kwargs)
 
-	def _output_handler(self, fd, event):
-
-		files = self._files
-		while True:
-			buf = self._read_buf(fd, event)
-
-			if buf is None:
-				# not a POLLIN event, EAGAIN, etc...
-				break
+	def _pipe_logger_exit(self, pipe_logger):
+		self._pipe_logger = None
+		self._unregister()
+		self.wait()
 
-			if not buf:
-				# EOF
-				self._unregister()
-				self.wait()
-				break
-
-			else:
-				if not self.background:
-					write_successful = False
-					failures = 0
-					while True:
-						try:
-							if not write_successful:
-								os.write(files.stdout, buf)
-								write_successful = True
-							break
-						except OSError as e:
-							if e.errno != errno.EAGAIN:
-								raise
-							del e
-							failures += 1
-							if failures > 50:
-								# Avoid a potentially infinite loop. In
-								# most cases, the failure count is zero
-								# and it's unlikely to exceed 1.
-								raise
-
-							# This means that a subprocess has put an inherited
-							# stdio file descriptor (typically stdin) into
-							# O_NONBLOCK mode. This is not acceptable (see bug
-							# #264435), so revert it. We need to use a loop
-							# here since there's a race condition due to
-							# parallel processes being able to change the
-							# flags on the inherited file descriptor.
-							# TODO: When possible, avoid having child processes
-							# inherit stdio file descriptors from portage
-							# (maybe it can't be avoided with
-							# PROPERTIES=interactive).
-							fcntl.fcntl(files.stdout, fcntl.F_SETFL,
-								fcntl.fcntl(files.stdout,
-								fcntl.F_GETFL) ^ os.O_NONBLOCK)
-
-				files.log.write(buf)
-				files.log.flush()
-
-		self._unregister_if_appropriate(event)
-
-		return True
-
-	def _dummy_handler(self, fd, event):
-		"""
-		This method is mainly interested in detecting EOF, since
-		the only purpose of the pipe is to allow the scheduler to
-		monitor the process from inside a poll() loop.
-		"""
-
-		while True:
-			buf = self._read_buf(fd, event)
-
-			if buf is None:
-				# not a POLLIN event, EAGAIN, etc...
-				break
-
-			if not buf:
-				# EOF
-				self._unregister()
-				self.wait()
-				break
-
-		self._unregister_if_appropriate(event)
-
-		return True
+	def _waitpid_loop(self):
+		SubProcess._waitpid_loop(self)
 
-	def _unregister(self):
-		super(SpawnProcess, self)._unregister()
-		if self._log_file_real is not None:
-			# Avoid "ResourceWarning: unclosed file" since python 3.2.
-			self._log_file_real.close()
-			self._log_file_real = None
+		pipe_logger = self._pipe_logger
+		if pipe_logger is not None:
+			self._pipe_logger = None
+			pipe_logger.removeExitListener(self._pipe_logger_exit)
+			pipe_logger.cancel()
+			pipe_logger.wait()

diff --git a/pym/portage/util/_async/PipeLogger.py b/pym/portage/util/_async/PipeLogger.py
new file mode 100644
index 0000000..dbdd56f
--- /dev/null
+++ b/pym/portage/util/_async/PipeLogger.py
@@ -0,0 +1,149 @@
+# Copyright 2008-2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import fcntl
+import errno
+import gzip
+
+import portage
+from portage import os, _encodings, _unicode_encode
+from _emerge.AbstractPollTask import AbstractPollTask
+
+class PipeLogger(AbstractPollTask):
+
+	"""
+	This can be used for logging output of a child process,
+	optionally outputing to log_file_path and/or stdout_fd.  It can
+	also monitor for EOF on input_fd, which may be used to detect
+	termination of a child process. If log_file_path ends with
+	'.gz' then the log file is written with compression.
+	"""
+
+	__slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
+		("_log_file", "_log_file_real", "_reg_id")
+
+	def _start(self):
+
+		log_file_path = self.log_file_path
+		if log_file_path is not None:
+
+			self._log_file = open(_unicode_encode(log_file_path,
+				encoding=_encodings['fs'], errors='strict'), mode='ab')
+			if log_file_path.endswith('.gz'):
+				self._log_file_real = self._log_file
+				self._log_file = gzip.GzipFile(filename='', mode='ab',
+					fileobj=self._log_file)
+
+			portage.util.apply_secpass_permissions(log_file_path,
+				uid=portage.portage_uid, gid=portage.portage_gid,
+				mode=0o660)
+
+		fcntl.fcntl(self.input_fd, fcntl.F_SETFL,
+			fcntl.fcntl(self.input_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+
+		self._reg_id = self.scheduler.io_add_watch(self.input_fd,
+			self._registered_events, self._output_handler)
+		self._registered = True
+
+	def isAlive(self):
+		return self._registered
+
+	def _cancel(self):
+		self._unregister()
+		if self.returncode is None:
+			self.returncode = self._cancelled_returncode
+
+	def _wait(self):
+		if self.returncode is not None:
+			return self.returncode
+		self._wait_loop()
+		self.returncode = os.EX_OK
+		return self.returncode
+
+	def _output_handler(self, fd, event):
+
+		background = self.background
+		stdout_fd = self.stdout_fd
+		log_file = self._log_file 
+
+		while True:
+			buf = self._read_buf(fd, event)
+
+			if buf is None:
+				# not a POLLIN event, EAGAIN, etc...
+				break
+
+			if not buf:
+				# EOF
+				self._unregister()
+				self.wait()
+				break
+
+			else:
+				if not background and stdout_fd is not None:
+					write_successful = False
+					failures = 0
+					while True:
+						try:
+							if not write_successful:
+								os.write(stdout_fd, buf)
+								write_successful = True
+							break
+						except OSError as e:
+							if e.errno != errno.EAGAIN:
+								raise
+							del e
+							failures += 1
+							if failures > 50:
+								# Avoid a potentially infinite loop. In
+								# most cases, the failure count is zero
+								# and it's unlikely to exceed 1.
+								raise
+
+							# This means that a subprocess has put an inherited
+							# stdio file descriptor (typically stdin) into
+							# O_NONBLOCK mode. This is not acceptable (see bug
+							# #264435), so revert it. We need to use a loop
+							# here since there's a race condition due to
+							# parallel processes being able to change the
+							# flags on the inherited file descriptor.
+							# TODO: When possible, avoid having child processes
+							# inherit stdio file descriptors from portage
+							# (maybe it can't be avoided with
+							# PROPERTIES=interactive).
+							fcntl.fcntl(stdout_fd, fcntl.F_SETFL,
+								fcntl.fcntl(stdout_fd,
+								fcntl.F_GETFL) ^ os.O_NONBLOCK)
+
+				if log_file is not None:
+					log_file.write(buf)
+					log_file.flush()
+
+		self._unregister_if_appropriate(event)
+
+		return True
+
+	def _unregister(self):
+
+		if self._reg_id is not None:
+			self.scheduler.source_remove(self._reg_id)
+			self._reg_id = None
+
+		if self.input_fd is not None:
+			os.close(self.input_fd)
+			self.input_fd = None
+
+		if self.stdout_fd is not None:
+			os.close(self.stdout_fd)
+			self.stdout_fd = None
+
+		if self._log_file is not None:
+			self._log_file.close()
+			self._log_file = None
+
+		if self._log_file_real is not None:
+			# Avoid "ResourceWarning: unclosed file" since python 3.2.
+			self._log_file_real.close()
+			self._log_file_real = None
+
+		self._registered = False


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-19  1:23 Zac Medico
  0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-19  1:23 UTC (permalink / raw
  To: gentoo-commits

commit:     14f4235b940700f9d6470a216d6fe3dce37ea148
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Fri Oct 19 01:23:02 2012 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Oct 19 01:23:02 2012 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=14f4235b

PipeLogger/Reader: remove redundant isAlive()

Parent class AbstractPollTask has equivalent implementation.

---
 pym/_emerge/PipeReader.py             |    3 ---
 pym/portage/util/_async/PipeLogger.py |    3 ---
 2 files changed, 0 insertions(+), 6 deletions(-)

diff --git a/pym/_emerge/PipeReader.py b/pym/_emerge/PipeReader.py
index 3eb9d36..fcdefb4 100644
--- a/pym/_emerge/PipeReader.py
+++ b/pym/_emerge/PipeReader.py
@@ -33,9 +33,6 @@ class PipeReader(AbstractPollTask):
 				self._registered_events, output_handler))
 		self._registered = True
 
-	def isAlive(self):
-		return self._registered
-
 	def _cancel(self):
 		if self.returncode is None:
 			self.returncode = 1

diff --git a/pym/portage/util/_async/PipeLogger.py b/pym/portage/util/_async/PipeLogger.py
index dbdd56f..0905e47 100644
--- a/pym/portage/util/_async/PipeLogger.py
+++ b/pym/portage/util/_async/PipeLogger.py
@@ -45,9 +45,6 @@ class PipeLogger(AbstractPollTask):
 			self._registered_events, self._output_handler)
 		self._registered = True
 
-	def isAlive(self):
-		return self._registered
-
 	def _cancel(self):
 		self._unregister()
 		if self.returncode is None:


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2013-01-06 11:16 Zac Medico
  0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2013-01-06 11:16 UTC (permalink / raw
  To: gentoo-commits

commit:     4ac9adbb25e1f83853976cba374d94b2a82ef124
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Jan  6 11:15:49 2013 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Jan  6 11:15:49 2013 +0000
URL:        http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=4ac9adbb

_keep_scheduling: check _terminated.is_set()

This ensures that scheduling loops terminate as soon as possible after
an interrupt is received.

---
 pym/_emerge/Scheduler.py                  |    4 ++--
 pym/portage/util/_async/AsyncScheduler.py |    4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index a423274..c2f2459 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -1,4 +1,4 @@
-# Copyright 1999-2012 Gentoo Foundation
+# Copyright 1999-2013 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 from __future__ import print_function
@@ -1531,7 +1531,7 @@ class Scheduler(PollScheduler):
 		self._config_pool[settings['EROOT']].append(settings)
 
 	def _keep_scheduling(self):
-		return bool(not self._terminated_tasks and self._pkg_queue and \
+		return bool(not self._terminated.is_set() and self._pkg_queue and \
 			not (self._failed_pkgs and not self._build_opts.fetchonly))
 
 	def _is_work_scheduled(self):

diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py
index 182e19e..9b96c6f 100644
--- a/pym/portage/util/_async/AsyncScheduler.py
+++ b/pym/portage/util/_async/AsyncScheduler.py
@@ -1,4 +1,4 @@
-# Copyright 2012 Gentoo Foundation
+# Copyright 2012-2013 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
 from portage import os
@@ -38,7 +38,7 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
 		raise NotImplementedError(self)
 
 	def _keep_scheduling(self):
-		return self._remaining_tasks and not self._terminated_tasks
+		return self._remaining_tasks and not self._terminated.is_set()
 
 	def _running_job_count(self):
 		return len(self._running_tasks)


^ permalink raw reply related	[flat|nested] 9+ messages in thread

* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2017-03-24 20:33 Zac Medico
  0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2017-03-24 20:33 UTC (permalink / raw
  To: gentoo-commits

commit:     86400e9f864e86f8f677ccda9ce4103d6d02ef87
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Mar 21 06:56:55 2017 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Mar 24 20:32:25 2017 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=86400e9f

PollScheduler: terminate via call_soon for asyncio compat

Use call_soon to schedule the _termination_check callback when needed.
The previous idle_add usage was relatively inefficient, because it
scheduled the _termination_check callback to be called in every
iteration of the event loop.

Add a _cleanup method to handle cleanup of callbacks registered with
the global event loop. Since the terminate method is thread safe and it
interacts with self._term_callback_handle, use this variable only while
holding a lock.

 pym/_emerge/PollScheduler.py              | 57 +++++++++++++++++++++++--------
 pym/_emerge/Scheduler.py                  |  7 ++--
 pym/portage/util/_async/AsyncScheduler.py | 16 ++++-----
 3 files changed, 54 insertions(+), 26 deletions(-)

diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index b118ac157..569879b36 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -25,8 +25,10 @@ class PollScheduler(object):
 			a non-main thread)
 		@type main: bool
 		"""
+		self._term_rlock = threading.RLock()
 		self._terminated = threading.Event()
 		self._terminated_tasks = False
+		self._term_check_handle = None
 		self._max_jobs = 1
 		self._max_load = None
 		self._scheduling = False
@@ -44,6 +46,21 @@ class PollScheduler(object):
 	def _is_background(self):
 		return self._background
 
+	def _cleanup(self):
+		"""
+		Cleanup any callbacks that have been registered with the global
+		event loop.
+		"""
+		# The self._term_check_handle attribute requires locking
+		# since it's modified by the thread safe terminate method.
+		with self._term_rlock:
+			if self._term_check_handle not in (None, False):
+				self._term_check_handle.cancel()
+			# This prevents the terminate method from scheduling
+			# any more callbacks (since _cleanup must eliminate all
+			# callbacks in order to ensure complete cleanup).
+			self._term_check_handle = False
+
 	def terminate(self):
 		"""
 		Schedules asynchronous, graceful termination of the scheduler
@@ -51,26 +68,36 @@ class PollScheduler(object):
 
 		This method is thread-safe (and safe for signal handlers).
 		"""
-		self._terminated.set()
+		with self._term_rlock:
+			if self._term_check_handle is None:
+				self._terminated.set()
+				self._term_check_handle = self._event_loop.call_soon_threadsafe(
+					self._termination_check, True)
 
-	def _termination_check(self):
+	def _termination_check(self, retry=False):
 		"""
 		Calls _terminate_tasks() if appropriate. It's guaranteed not to
-		call it while _schedule_tasks() is being called. The check should
-		be executed for each iteration of the event loop, for response to
-		termination signals at the earliest opportunity. It always returns
-		True, for continuous scheduling via idle_add.
+		call it while _schedule_tasks() is being called. This method must
+		only be called via the event loop thread.
+
+		@param retry: If True then reschedule if scheduling state prevents
+			immediate termination.
+		@type retry: bool
 		"""
-		if not self._scheduling and \
-			self._terminated.is_set() and \
+		if self._terminated.is_set() and \
 			not self._terminated_tasks:
-			self._scheduling = True
-			try:
-				self._terminated_tasks = True
-				self._terminate_tasks()
-			finally:
-				self._scheduling = False
-		return True
+			if not self._scheduling:
+				self._scheduling = True
+				try:
+					self._terminated_tasks = True
+					self._terminate_tasks()
+				finally:
+					self._scheduling = False
+
+			elif retry:
+				with self._term_rlock:
+					self._term_check_handle = self._event_loop.call_soon(
+						self._termination_check, True)
 
 	def _terminate_tasks(self):
 		"""

diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index 71fe75f62..58ff97139 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -1055,6 +1055,7 @@ class Scheduler(PollScheduler):
 				else:
 					signal.signal(signal.SIGCONT, signal.SIG_DFL)
 
+			self._termination_check()
 			if received_signal:
 				sys.exit(received_signal[0])
 
@@ -1091,6 +1092,10 @@ class Scheduler(PollScheduler):
 				if isinstance(x, Package) and x.operation == "merge"])
 			self._status_display.maxval = self._pkg_count.maxval
 
+		# Cleanup any callbacks that have been registered with the global
+		# event loop by calls to the terminate method.
+		self._cleanup()
+
 		self._logger.log(" *** Finished. Cleaning up...")
 
 		if failed_pkgs:
@@ -1393,7 +1398,6 @@ class Scheduler(PollScheduler):
 		blocker_db.discardBlocker(pkg)
 
 	def _main_loop(self):
-		term_check_id = self._event_loop.idle_add(self._termination_check)
 		loadavg_check_id = None
 		if self._max_load is not None and \
 			self._loadavg_latency is not None and \
@@ -1420,7 +1424,6 @@ class Scheduler(PollScheduler):
 			while self._is_work_scheduled():
 				self._event_loop.iteration()
 		finally:
-			self._event_loop.source_remove(term_check_id)
 			if loadavg_check_id is not None:
 				self._event_loop.source_remove(loadavg_check_id)
 

diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py
index 9b96c6f36..3deb6cb04 100644
--- a/pym/portage/util/_async/AsyncScheduler.py
+++ b/pym/portage/util/_async/AsyncScheduler.py
@@ -18,7 +18,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
 		self._error_count = 0
 		self._running_tasks = set()
 		self._remaining_tasks = True
-		self._term_check_id = None
 		self._loadavg_check_id = None
 
 	def _poll(self):
@@ -65,7 +64,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
 		self._schedule()
 
 	def _start(self):
-		self._term_check_id = self._event_loop.idle_add(self._termination_check)
 		if self._max_load is not None and \
 			self._loadavg_latency is not None and \
 			(self._max_jobs is True or self._max_jobs > 1):
@@ -75,6 +73,12 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
 				self._loadavg_latency, self._schedule)
 		self._schedule()
 
+	def _cleanup(self):
+		super(AsyncScheduler, self)._cleanup()
+		if self._loadavg_check_id is not None:
+			self._event_loop.source_remove(self._loadavg_check_id)
+			self._loadavg_check_id = None
+
 	def _wait(self):
 		# Loop while there are jobs to be scheduled.
 		while self._keep_scheduling():
@@ -86,13 +90,7 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
 		while self._is_work_scheduled():
 			self._event_loop.iteration()
 
-		if self._term_check_id is not None:
-			self._event_loop.source_remove(self._term_check_id)
-			self._term_check_id = None
-
-		if self._loadavg_check_id is not None:
-			self._event_loop.source_remove(self._loadavg_check_id)
-			self._loadavg_check_id = None
+		self._cleanup()
 
 		if self._error_count > 0:
 			self.returncode = 1


^ permalink raw reply related	[flat|nested] 9+ messages in thread

end of thread, other threads:[~2017-03-24 20:33 UTC | newest]

Thread overview: 9+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2012-10-19  1:15 [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/ Zac Medico
  -- strict thread matches above, loose matches on Subject: below --
2017-03-24 20:33 Zac Medico
2013-01-06 11:16 Zac Medico
2012-10-19  1:23 Zac Medico
2012-10-16 19:28 Zac Medico
2012-10-08 20:44 Zac Medico
2012-10-07 18:51 Zac Medico
2012-10-07 18:22 Zac Medico
2012-10-07 18:17 Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox