* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-07 18:17 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-07 18:17 UTC (permalink / raw
To: gentoo-commits
commit: 63e329100d9b6c8bf1c6b87ab417882b9116047e
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Oct 7 18:17:35 2012 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Oct 7 18:17:35 2012 +0000
URL: http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=63e32910
PollScheduler: split out SchedulerInterface
---
pym/_emerge/PollScheduler.py | 78 ++---------------------
pym/_emerge/Scheduler.py | 9 ++-
pym/portage/util/_async/SchedulerInterface.py | 86 +++++++++++++++++++++++++
3 files changed, 97 insertions(+), 76 deletions(-)
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index 105943f..a341604 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -1,18 +1,12 @@
# Copyright 1999-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
-import gzip
-import errno
-
try:
import threading
except ImportError:
import dummy_threading as threading
-from portage import _encodings
-from portage import _unicode_encode
-from portage.util import writemsg_level
-from portage.util.SlotObject import SlotObject
+from portage.util._async.SchedulerInterface import SchedulerInterface
from portage.util._eventloop.EventLoop import EventLoop
from portage.util._eventloop.global_event_loop import global_event_loop
@@ -23,13 +17,6 @@ class PollScheduler(object):
# max time between loadavg checks (milliseconds)
_loadavg_latency = 30000
- class _sched_iface_class(SlotObject):
- __slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT",
- "IO_PRI", "child_watch_add",
- "idle_add", "io_add_watch", "iteration",
- "output", "run",
- "source_remove", "timeout_add")
-
def __init__(self, main=False, event_loop=None):
"""
@param main: If True then use global_event_loop(), otherwise use
@@ -49,20 +36,11 @@ class PollScheduler(object):
self._event_loop = global_event_loop()
else:
self._event_loop = EventLoop(main=False)
- self.sched_iface = self._sched_iface_class(
- IO_ERR=self._event_loop.IO_ERR,
- IO_HUP=self._event_loop.IO_HUP,
- IO_IN=self._event_loop.IO_IN,
- IO_NVAL=self._event_loop.IO_NVAL,
- IO_OUT=self._event_loop.IO_OUT,
- IO_PRI=self._event_loop.IO_PRI,
- child_watch_add=self._event_loop.child_watch_add,
- idle_add=self._event_loop.idle_add,
- io_add_watch=self._event_loop.io_add_watch,
- iteration=self._event_loop.iteration,
- output=self._task_output,
- source_remove=self._event_loop.source_remove,
- timeout_add=self._event_loop.timeout_add)
+ self.sched_iface = SchedulerInterface(self._event_loop,
+ self._is_background)
+
+ def _is_background(self):
+ return self._background
def terminate(self):
"""
@@ -176,47 +154,3 @@ class PollScheduler(object):
return False
return True
-
- def _task_output(self, msg, log_path=None, background=None,
- level=0, noiselevel=-1):
- """
- Output msg to stdout if not self._background. If log_path
- is not None then append msg to the log (appends with
- compression if the filename extension of log_path
- corresponds to a supported compression type).
- """
-
- if background is None:
- # If the task does not have a local background value
- # (like for parallel-fetch), then use the global value.
- background = self._background
-
- msg_shown = False
- if not background:
- writemsg_level(msg, level=level, noiselevel=noiselevel)
- msg_shown = True
-
- if log_path is not None:
- try:
- f = open(_unicode_encode(log_path,
- encoding=_encodings['fs'], errors='strict'),
- mode='ab')
- f_real = f
- except IOError as e:
- if e.errno not in (errno.ENOENT, errno.ESTALE):
- raise
- if not msg_shown:
- writemsg_level(msg, level=level, noiselevel=noiselevel)
- else:
-
- if log_path.endswith('.gz'):
- # NOTE: The empty filename argument prevents us from
- # triggering a bug in python3 which causes GzipFile
- # to raise AttributeError if fileobj.name is bytes
- # instead of unicode.
- f = gzip.GzipFile(filename='', mode='ab', fileobj=f)
-
- f.write(_unicode_encode(msg))
- f.close()
- if f_real is not f:
- f_real.close()
diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index c5779cb..f62f6e7 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -28,6 +28,7 @@ from portage._sets import SETPREFIX
from portage._sets.base import InternalPackageSet
from portage.util import ensure_dirs, writemsg, writemsg_level
from portage.util.SlotObject import SlotObject
+from portage.util._async.SchedulerInterface import SchedulerInterface
from portage.package.ebuild.digestcheck import digestcheck
from portage.package.ebuild.digestgen import digestgen
from portage.package.ebuild.doebuild import (_check_temp_dir,
@@ -79,7 +80,7 @@ class Scheduler(PollScheduler):
_opts_no_self_update = frozenset(["--buildpkgonly",
"--fetchonly", "--fetch-all-uri", "--pretend"])
- class _iface_class(PollScheduler._sched_iface_class):
+ class _iface_class(SchedulerInterface):
__slots__ = ("fetch",
"scheduleSetup", "scheduleUnpack")
@@ -215,11 +216,11 @@ class Scheduler(PollScheduler):
fetch_iface = self._fetch_iface_class(log_file=self._fetch_log,
schedule=self._schedule_fetch)
self._sched_iface = self._iface_class(
+ self._event_loop,
+ self._is_background,
fetch=fetch_iface,
scheduleSetup=self._schedule_setup,
- scheduleUnpack=self._schedule_unpack,
- **dict((k, getattr(self.sched_iface, k))
- for k in self.sched_iface.__slots__))
+ scheduleUnpack=self._schedule_unpack)
self._prefetchers = weakref.WeakValueDictionary()
self._pkg_queue = []
diff --git a/pym/portage/util/_async/SchedulerInterface.py b/pym/portage/util/_async/SchedulerInterface.py
new file mode 100644
index 0000000..04c6efb
--- /dev/null
+++ b/pym/portage/util/_async/SchedulerInterface.py
@@ -0,0 +1,86 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import gzip
+import errno
+
+from portage import _encodings
+from portage import _unicode_encode
+from portage.util import writemsg_level
+from ..SlotObject import SlotObject
+
+class SchedulerInterface(SlotObject):
+
+ __slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT", "IO_PRI",
+ "child_watch_add", "idle_add", "io_add_watch", "iteration",
+ "source_remove", "timeout_add", "_event_loop", "_is_background")
+
+ def __init__(self, event_loop, is_background=None, **kwargs):
+ SlotObject.__init__(self, **kwargs)
+ self._event_loop = event_loop
+ if is_background is None:
+ is_background = self._return_false
+ self._is_background = is_background
+ self.IO_ERR = event_loop.IO_ERR
+ self.IO_HUP = event_loop.IO_HUP
+ self.IO_IN = event_loop.IO_IN
+ self.IO_NVAL = event_loop.IO_NVAL
+ self.IO_OUT = event_loop.IO_OUT
+ self.IO_PRI = event_loop.IO_PRI
+ self.child_watch_add = event_loop.child_watch_add
+ self.idle_add = event_loop.idle_add
+ self.io_add_watch = event_loop.io_add_watch
+ self.iteration = event_loop.iteration
+ self.source_remove = event_loop.source_remove
+ self.timeout_add = event_loop.timeout_add
+
+ @staticmethod
+ def _return_false(self):
+ return False
+
+ def output(self, msg, log_path=None, background=None,
+ level=0, noiselevel=-1):
+ """
+ Output msg to stdout if not self._background_cb(). If log_path
+ is not None then append msg to the log (appends with
+ compression if the filename extension of log_path corresponds
+ to a supported compression type).
+ """
+
+ global_background = self._is_background()
+ if background is None or global_background:
+ # Use the global value if the task does not have a local
+ # background value. For example, parallel-fetch tasks run
+ # in the background while other tasks concurrently run in
+ # the foreground.
+ background = global_background
+
+ msg_shown = False
+ if not background:
+ writemsg_level(msg, level=level, noiselevel=noiselevel)
+ msg_shown = True
+
+ if log_path is not None:
+ try:
+ f = open(_unicode_encode(log_path,
+ encoding=_encodings['fs'], errors='strict'),
+ mode='ab')
+ f_real = f
+ except IOError as e:
+ if e.errno not in (errno.ENOENT, errno.ESTALE):
+ raise
+ if not msg_shown:
+ writemsg_level(msg, level=level, noiselevel=noiselevel)
+ else:
+
+ if log_path.endswith('.gz'):
+ # NOTE: The empty filename argument prevents us from
+ # triggering a bug in python3 which causes GzipFile
+ # to raise AttributeError if fileobj.name is bytes
+ # instead of unicode.
+ f = gzip.GzipFile(filename='', mode='ab', fileobj=f)
+
+ f.write(_unicode_encode(msg))
+ f.close()
+ if f_real is not f:
+ f_real.close()
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-07 18:22 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-07 18:22 UTC (permalink / raw
To: gentoo-commits
commit: 2932012a0e22856b14473f7ecb26fbee91a37c20
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Oct 7 18:17:35 2012 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Oct 7 18:21:52 2012 +0000
URL: http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=2932012a
PollScheduler: split out SchedulerInterface
---
pym/_emerge/PollScheduler.py | 78 ++---------------------
pym/_emerge/Scheduler.py | 9 ++-
pym/portage/util/_async/SchedulerInterface.py | 86 +++++++++++++++++++++++++
3 files changed, 97 insertions(+), 76 deletions(-)
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index 105943f..a341604 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -1,18 +1,12 @@
# Copyright 1999-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
-import gzip
-import errno
-
try:
import threading
except ImportError:
import dummy_threading as threading
-from portage import _encodings
-from portage import _unicode_encode
-from portage.util import writemsg_level
-from portage.util.SlotObject import SlotObject
+from portage.util._async.SchedulerInterface import SchedulerInterface
from portage.util._eventloop.EventLoop import EventLoop
from portage.util._eventloop.global_event_loop import global_event_loop
@@ -23,13 +17,6 @@ class PollScheduler(object):
# max time between loadavg checks (milliseconds)
_loadavg_latency = 30000
- class _sched_iface_class(SlotObject):
- __slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT",
- "IO_PRI", "child_watch_add",
- "idle_add", "io_add_watch", "iteration",
- "output", "run",
- "source_remove", "timeout_add")
-
def __init__(self, main=False, event_loop=None):
"""
@param main: If True then use global_event_loop(), otherwise use
@@ -49,20 +36,11 @@ class PollScheduler(object):
self._event_loop = global_event_loop()
else:
self._event_loop = EventLoop(main=False)
- self.sched_iface = self._sched_iface_class(
- IO_ERR=self._event_loop.IO_ERR,
- IO_HUP=self._event_loop.IO_HUP,
- IO_IN=self._event_loop.IO_IN,
- IO_NVAL=self._event_loop.IO_NVAL,
- IO_OUT=self._event_loop.IO_OUT,
- IO_PRI=self._event_loop.IO_PRI,
- child_watch_add=self._event_loop.child_watch_add,
- idle_add=self._event_loop.idle_add,
- io_add_watch=self._event_loop.io_add_watch,
- iteration=self._event_loop.iteration,
- output=self._task_output,
- source_remove=self._event_loop.source_remove,
- timeout_add=self._event_loop.timeout_add)
+ self.sched_iface = SchedulerInterface(self._event_loop,
+ self._is_background)
+
+ def _is_background(self):
+ return self._background
def terminate(self):
"""
@@ -176,47 +154,3 @@ class PollScheduler(object):
return False
return True
-
- def _task_output(self, msg, log_path=None, background=None,
- level=0, noiselevel=-1):
- """
- Output msg to stdout if not self._background. If log_path
- is not None then append msg to the log (appends with
- compression if the filename extension of log_path
- corresponds to a supported compression type).
- """
-
- if background is None:
- # If the task does not have a local background value
- # (like for parallel-fetch), then use the global value.
- background = self._background
-
- msg_shown = False
- if not background:
- writemsg_level(msg, level=level, noiselevel=noiselevel)
- msg_shown = True
-
- if log_path is not None:
- try:
- f = open(_unicode_encode(log_path,
- encoding=_encodings['fs'], errors='strict'),
- mode='ab')
- f_real = f
- except IOError as e:
- if e.errno not in (errno.ENOENT, errno.ESTALE):
- raise
- if not msg_shown:
- writemsg_level(msg, level=level, noiselevel=noiselevel)
- else:
-
- if log_path.endswith('.gz'):
- # NOTE: The empty filename argument prevents us from
- # triggering a bug in python3 which causes GzipFile
- # to raise AttributeError if fileobj.name is bytes
- # instead of unicode.
- f = gzip.GzipFile(filename='', mode='ab', fileobj=f)
-
- f.write(_unicode_encode(msg))
- f.close()
- if f_real is not f:
- f_real.close()
diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index c5779cb..f62f6e7 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -28,6 +28,7 @@ from portage._sets import SETPREFIX
from portage._sets.base import InternalPackageSet
from portage.util import ensure_dirs, writemsg, writemsg_level
from portage.util.SlotObject import SlotObject
+from portage.util._async.SchedulerInterface import SchedulerInterface
from portage.package.ebuild.digestcheck import digestcheck
from portage.package.ebuild.digestgen import digestgen
from portage.package.ebuild.doebuild import (_check_temp_dir,
@@ -79,7 +80,7 @@ class Scheduler(PollScheduler):
_opts_no_self_update = frozenset(["--buildpkgonly",
"--fetchonly", "--fetch-all-uri", "--pretend"])
- class _iface_class(PollScheduler._sched_iface_class):
+ class _iface_class(SchedulerInterface):
__slots__ = ("fetch",
"scheduleSetup", "scheduleUnpack")
@@ -215,11 +216,11 @@ class Scheduler(PollScheduler):
fetch_iface = self._fetch_iface_class(log_file=self._fetch_log,
schedule=self._schedule_fetch)
self._sched_iface = self._iface_class(
+ self._event_loop,
+ self._is_background,
fetch=fetch_iface,
scheduleSetup=self._schedule_setup,
- scheduleUnpack=self._schedule_unpack,
- **dict((k, getattr(self.sched_iface, k))
- for k in self.sched_iface.__slots__))
+ scheduleUnpack=self._schedule_unpack)
self._prefetchers = weakref.WeakValueDictionary()
self._pkg_queue = []
diff --git a/pym/portage/util/_async/SchedulerInterface.py b/pym/portage/util/_async/SchedulerInterface.py
new file mode 100644
index 0000000..731f521
--- /dev/null
+++ b/pym/portage/util/_async/SchedulerInterface.py
@@ -0,0 +1,86 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import gzip
+import errno
+
+from portage import _encodings
+from portage import _unicode_encode
+from portage.util import writemsg_level
+from ..SlotObject import SlotObject
+
+class SchedulerInterface(SlotObject):
+
+ __slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT", "IO_PRI",
+ "child_watch_add", "idle_add", "io_add_watch", "iteration",
+ "source_remove", "timeout_add", "_event_loop", "_is_background")
+
+ def __init__(self, event_loop, is_background=None, **kwargs):
+ SlotObject.__init__(self, **kwargs)
+ self._event_loop = event_loop
+ if is_background is None:
+ is_background = self._return_false
+ self._is_background = is_background
+ self.IO_ERR = event_loop.IO_ERR
+ self.IO_HUP = event_loop.IO_HUP
+ self.IO_IN = event_loop.IO_IN
+ self.IO_NVAL = event_loop.IO_NVAL
+ self.IO_OUT = event_loop.IO_OUT
+ self.IO_PRI = event_loop.IO_PRI
+ self.child_watch_add = event_loop.child_watch_add
+ self.idle_add = event_loop.idle_add
+ self.io_add_watch = event_loop.io_add_watch
+ self.iteration = event_loop.iteration
+ self.source_remove = event_loop.source_remove
+ self.timeout_add = event_loop.timeout_add
+
+ @staticmethod
+ def _return_false():
+ return False
+
+ def output(self, msg, log_path=None, background=None,
+ level=0, noiselevel=-1):
+ """
+ Output msg to stdout if not self._is_background(). If log_path
+ is not None then append msg to the log (appends with
+ compression if the filename extension of log_path corresponds
+ to a supported compression type).
+ """
+
+ global_background = self._is_background()
+ if background is None or global_background:
+ # Use the global value if the task does not have a local
+ # background value. For example, parallel-fetch tasks run
+ # in the background while other tasks concurrently run in
+ # the foreground.
+ background = global_background
+
+ msg_shown = False
+ if not background:
+ writemsg_level(msg, level=level, noiselevel=noiselevel)
+ msg_shown = True
+
+ if log_path is not None:
+ try:
+ f = open(_unicode_encode(log_path,
+ encoding=_encodings['fs'], errors='strict'),
+ mode='ab')
+ f_real = f
+ except IOError as e:
+ if e.errno not in (errno.ENOENT, errno.ESTALE):
+ raise
+ if not msg_shown:
+ writemsg_level(msg, level=level, noiselevel=noiselevel)
+ else:
+
+ if log_path.endswith('.gz'):
+ # NOTE: The empty filename argument prevents us from
+ # triggering a bug in python3 which causes GzipFile
+ # to raise AttributeError if fileobj.name is bytes
+ # instead of unicode.
+ f = gzip.GzipFile(filename='', mode='ab', fileobj=f)
+
+ f.write(_unicode_encode(msg))
+ f.close()
+ if f_real is not f:
+ f_real.close()
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-07 18:51 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-07 18:51 UTC (permalink / raw
To: gentoo-commits
commit: 2bc9b7c6147fb30838773b52674f53bb920bab72
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Oct 7 18:17:35 2012 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Oct 7 18:51:26 2012 +0000
URL: http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=2bc9b7c6
PollScheduler: split out SchedulerInterface
---
pym/_emerge/PollScheduler.py | 78 ++---------------------
pym/_emerge/Scheduler.py | 14 ++--
pym/portage/util/_async/SchedulerInterface.py | 86 +++++++++++++++++++++++++
3 files changed, 100 insertions(+), 78 deletions(-)
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index 105943f..8d5c290 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -1,18 +1,12 @@
# Copyright 1999-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
-import gzip
-import errno
-
try:
import threading
except ImportError:
import dummy_threading as threading
-from portage import _encodings
-from portage import _unicode_encode
-from portage.util import writemsg_level
-from portage.util.SlotObject import SlotObject
+from portage.util._async.SchedulerInterface import SchedulerInterface
from portage.util._eventloop.EventLoop import EventLoop
from portage.util._eventloop.global_event_loop import global_event_loop
@@ -23,13 +17,6 @@ class PollScheduler(object):
# max time between loadavg checks (milliseconds)
_loadavg_latency = 30000
- class _sched_iface_class(SlotObject):
- __slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT",
- "IO_PRI", "child_watch_add",
- "idle_add", "io_add_watch", "iteration",
- "output", "run",
- "source_remove", "timeout_add")
-
def __init__(self, main=False, event_loop=None):
"""
@param main: If True then use global_event_loop(), otherwise use
@@ -49,20 +36,11 @@ class PollScheduler(object):
self._event_loop = global_event_loop()
else:
self._event_loop = EventLoop(main=False)
- self.sched_iface = self._sched_iface_class(
- IO_ERR=self._event_loop.IO_ERR,
- IO_HUP=self._event_loop.IO_HUP,
- IO_IN=self._event_loop.IO_IN,
- IO_NVAL=self._event_loop.IO_NVAL,
- IO_OUT=self._event_loop.IO_OUT,
- IO_PRI=self._event_loop.IO_PRI,
- child_watch_add=self._event_loop.child_watch_add,
- idle_add=self._event_loop.idle_add,
- io_add_watch=self._event_loop.io_add_watch,
- iteration=self._event_loop.iteration,
- output=self._task_output,
- source_remove=self._event_loop.source_remove,
- timeout_add=self._event_loop.timeout_add)
+ self.sched_iface = SchedulerInterface(self._event_loop,
+ is_background=self._is_background)
+
+ def _is_background(self):
+ return self._background
def terminate(self):
"""
@@ -176,47 +154,3 @@ class PollScheduler(object):
return False
return True
-
- def _task_output(self, msg, log_path=None, background=None,
- level=0, noiselevel=-1):
- """
- Output msg to stdout if not self._background. If log_path
- is not None then append msg to the log (appends with
- compression if the filename extension of log_path
- corresponds to a supported compression type).
- """
-
- if background is None:
- # If the task does not have a local background value
- # (like for parallel-fetch), then use the global value.
- background = self._background
-
- msg_shown = False
- if not background:
- writemsg_level(msg, level=level, noiselevel=noiselevel)
- msg_shown = True
-
- if log_path is not None:
- try:
- f = open(_unicode_encode(log_path,
- encoding=_encodings['fs'], errors='strict'),
- mode='ab')
- f_real = f
- except IOError as e:
- if e.errno not in (errno.ENOENT, errno.ESTALE):
- raise
- if not msg_shown:
- writemsg_level(msg, level=level, noiselevel=noiselevel)
- else:
-
- if log_path.endswith('.gz'):
- # NOTE: The empty filename argument prevents us from
- # triggering a bug in python3 which causes GzipFile
- # to raise AttributeError if fileobj.name is bytes
- # instead of unicode.
- f = gzip.GzipFile(filename='', mode='ab', fileobj=f)
-
- f.write(_unicode_encode(msg))
- f.close()
- if f_real is not f:
- f_real.close()
diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index c5779cb..d64468a 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -28,6 +28,8 @@ from portage._sets import SETPREFIX
from portage._sets.base import InternalPackageSet
from portage.util import ensure_dirs, writemsg, writemsg_level
from portage.util.SlotObject import SlotObject
+from portage.util._async.SchedulerInterface import SchedulerInterface
+from portage.util._eventloop.EventLoop import EventLoop
from portage.package.ebuild.digestcheck import digestcheck
from portage.package.ebuild.digestgen import digestgen
from portage.package.ebuild.doebuild import (_check_temp_dir,
@@ -79,7 +81,7 @@ class Scheduler(PollScheduler):
_opts_no_self_update = frozenset(["--buildpkgonly",
"--fetchonly", "--fetch-all-uri", "--pretend"])
- class _iface_class(PollScheduler._sched_iface_class):
+ class _iface_class(SchedulerInterface):
__slots__ = ("fetch",
"scheduleSetup", "scheduleUnpack")
@@ -215,11 +217,11 @@ class Scheduler(PollScheduler):
fetch_iface = self._fetch_iface_class(log_file=self._fetch_log,
schedule=self._schedule_fetch)
self._sched_iface = self._iface_class(
+ self._event_loop,
+ is_background=self._is_background,
fetch=fetch_iface,
scheduleSetup=self._schedule_setup,
- scheduleUnpack=self._schedule_unpack,
- **dict((k, getattr(self.sched_iface, k))
- for k in self.sched_iface.__slots__))
+ scheduleUnpack=self._schedule_unpack)
self._prefetchers = weakref.WeakValueDictionary()
self._pkg_queue = []
@@ -767,10 +769,10 @@ class Scheduler(PollScheduler):
failures = 0
- # Use a local PollScheduler instance here, since we don't
+ # Use a local EventLoop instance here, since we don't
# want tasks here to trigger the usual Scheduler callbacks
# that handle job scheduling and status display.
- sched_iface = PollScheduler().sched_iface
+ sched_iface = SchedulerInterface(EventLoop(main=False))
for x in self._mergelist:
if not isinstance(x, Package):
diff --git a/pym/portage/util/_async/SchedulerInterface.py b/pym/portage/util/_async/SchedulerInterface.py
new file mode 100644
index 0000000..731f521
--- /dev/null
+++ b/pym/portage/util/_async/SchedulerInterface.py
@@ -0,0 +1,86 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import gzip
+import errno
+
+from portage import _encodings
+from portage import _unicode_encode
+from portage.util import writemsg_level
+from ..SlotObject import SlotObject
+
+class SchedulerInterface(SlotObject):
+
+ __slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT", "IO_PRI",
+ "child_watch_add", "idle_add", "io_add_watch", "iteration",
+ "source_remove", "timeout_add", "_event_loop", "_is_background")
+
+ def __init__(self, event_loop, is_background=None, **kwargs):
+ SlotObject.__init__(self, **kwargs)
+ self._event_loop = event_loop
+ if is_background is None:
+ is_background = self._return_false
+ self._is_background = is_background
+ self.IO_ERR = event_loop.IO_ERR
+ self.IO_HUP = event_loop.IO_HUP
+ self.IO_IN = event_loop.IO_IN
+ self.IO_NVAL = event_loop.IO_NVAL
+ self.IO_OUT = event_loop.IO_OUT
+ self.IO_PRI = event_loop.IO_PRI
+ self.child_watch_add = event_loop.child_watch_add
+ self.idle_add = event_loop.idle_add
+ self.io_add_watch = event_loop.io_add_watch
+ self.iteration = event_loop.iteration
+ self.source_remove = event_loop.source_remove
+ self.timeout_add = event_loop.timeout_add
+
+ @staticmethod
+ def _return_false():
+ return False
+
+ def output(self, msg, log_path=None, background=None,
+ level=0, noiselevel=-1):
+ """
+ Output msg to stdout if not self._is_background(). If log_path
+ is not None then append msg to the log (appends with
+ compression if the filename extension of log_path corresponds
+ to a supported compression type).
+ """
+
+ global_background = self._is_background()
+ if background is None or global_background:
+ # Use the global value if the task does not have a local
+ # background value. For example, parallel-fetch tasks run
+ # in the background while other tasks concurrently run in
+ # the foreground.
+ background = global_background
+
+ msg_shown = False
+ if not background:
+ writemsg_level(msg, level=level, noiselevel=noiselevel)
+ msg_shown = True
+
+ if log_path is not None:
+ try:
+ f = open(_unicode_encode(log_path,
+ encoding=_encodings['fs'], errors='strict'),
+ mode='ab')
+ f_real = f
+ except IOError as e:
+ if e.errno not in (errno.ENOENT, errno.ESTALE):
+ raise
+ if not msg_shown:
+ writemsg_level(msg, level=level, noiselevel=noiselevel)
+ else:
+
+ if log_path.endswith('.gz'):
+ # NOTE: The empty filename argument prevents us from
+ # triggering a bug in python3 which causes GzipFile
+ # to raise AttributeError if fileobj.name is bytes
+ # instead of unicode.
+ f = gzip.GzipFile(filename='', mode='ab', fileobj=f)
+
+ f.write(_unicode_encode(msg))
+ f.close()
+ if f_real is not f:
+ f_real.close()
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-08 20:44 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-08 20:44 UTC (permalink / raw
To: gentoo-commits
commit: 91b253ba2f249b19d6cc68f51cd6b909a765a715
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Oct 8 20:44:22 2012 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Oct 8 20:44:22 2012 +0000
URL: http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=91b253ba
PollScheduler: disable default _loadavg_latency
Move the 30 second default to the Scheduler class, since that's the
only place that it's currently needed (all other schedulers have
relatively short-running jobs).
---
pym/_emerge/PollScheduler.py | 2 +-
pym/_emerge/Scheduler.py | 7 ++++++-
pym/portage/util/_async/AsyncScheduler.py | 4 +++-
3 files changed, 10 insertions(+), 3 deletions(-)
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index 1a81e2d..d02b0da 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -15,7 +15,7 @@ from _emerge.getloadavg import getloadavg
class PollScheduler(object):
# max time between loadavg checks (milliseconds)
- _loadavg_latency = 30000
+ _loadavg_latency = None
def __init__(self, main=False, event_loop=None):
"""
diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index 40033c2..3b55b2b 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -66,6 +66,9 @@ if sys.hexversion >= 0x3000000:
class Scheduler(PollScheduler):
+ # max time between loadavg checks (milliseconds)
+ _loadavg_latency = 30000
+
# max time between display status updates (milliseconds)
_max_display_latency = 3000
@@ -1339,7 +1342,9 @@ class Scheduler(PollScheduler):
def _main_loop(self):
term_check_id = self._event_loop.idle_add(self._termination_check)
loadavg_check_id = None
- if self._max_load is not None:
+ if self._max_load is not None and \
+ self._loadavg_latency is not None and \
+ (self._max_jobs is True or self._max_jobs > 1):
# We have to schedule periodically, in case the load
# average has changed since the last call.
loadavg_check_id = self._event_loop.timeout_add(
diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py
index c6a37f5..182e19e 100644
--- a/pym/portage/util/_async/AsyncScheduler.py
+++ b/pym/portage/util/_async/AsyncScheduler.py
@@ -66,7 +66,9 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
def _start(self):
self._term_check_id = self._event_loop.idle_add(self._termination_check)
- if self._max_load is not None:
+ if self._max_load is not None and \
+ self._loadavg_latency is not None and \
+ (self._max_jobs is True or self._max_jobs > 1):
# We have to schedule periodically, in case the load
# average has changed since the last call.
self._loadavg_check_id = self._event_loop.timeout_add(
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-16 19:28 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-16 19:28 UTC (permalink / raw
To: gentoo-commits
commit: 6de6ecc0049160f049f889b81c128e682225c55b
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Oct 16 19:27:46 2012 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Oct 16 19:27:46 2012 +0000
URL: http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=6de6ecc0
CompositeTask._start_task: propagate scheduler
---
pym/_emerge/CompositeTask.py | 4 ++++
pym/portage/util/_async/PopenProcess.py | 4 ++++
2 files changed, 8 insertions(+), 0 deletions(-)
diff --git a/pym/_emerge/CompositeTask.py b/pym/_emerge/CompositeTask.py
index 3e43478..40cf859 100644
--- a/pym/_emerge/CompositeTask.py
+++ b/pym/_emerge/CompositeTask.py
@@ -142,6 +142,10 @@ class CompositeTask(AsynchronousTask):
a task.
"""
+ try:
+ task.scheduler = self.scheduler
+ except AttributeError:
+ pass
task.addExitListener(exit_handler)
self._current_task = task
task.start()
diff --git a/pym/portage/util/_async/PopenProcess.py b/pym/portage/util/_async/PopenProcess.py
index c3ae6e9..37d32f2 100644
--- a/pym/portage/util/_async/PopenProcess.py
+++ b/pym/portage/util/_async/PopenProcess.py
@@ -14,6 +14,10 @@ class PopenProcess(SubProcess):
def _start(self):
if self.pipe_reader is not None:
+ try:
+ self.pipe_reader.scheduler = self.scheduler
+ except AttributeError:
+ pass
self.pipe_reader.addExitListener(self._pipe_reader_exit)
self.pipe_reader.start()
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-19 1:15 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-19 1:15 UTC (permalink / raw
To: gentoo-commits
commit: 5a7690fd10a6935e756495590c9c2b6a19aa8139
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Fri Oct 19 01:15:00 2012 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Oct 19 01:15:00 2012 +0000
URL: http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=5a7690fd
SpawnProcess: split out a PipeLogger class
The copyright dates for these classes begin in 2008, since SpawnProcess
code is derived from the EbuildFetcherAsync class which was added in
commit e4edadf5ae7063f375d76be151c6d0e949980ecf in 2008.
---
pym/_emerge/SpawnProcess.py | 165 +++++++--------------------------
pym/portage/util/_async/PipeLogger.py | 149 +++++++++++++++++++++++++++++
2 files changed, 183 insertions(+), 131 deletions(-)
diff --git a/pym/_emerge/SpawnProcess.py b/pym/_emerge/SpawnProcess.py
index ab152c3..d18512b 100644
--- a/pym/_emerge/SpawnProcess.py
+++ b/pym/_emerge/SpawnProcess.py
@@ -1,17 +1,12 @@
-# Copyright 1999-2012 Gentoo Foundation
+# Copyright 2008-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
from _emerge.SubProcess import SubProcess
import sys
-from portage.cache.mappings import slot_dict_class
import portage
-from portage import _encodings
-from portage import _unicode_encode
from portage import os
from portage.const import BASH_BINARY
-import fcntl
-import errno
-import gzip
+from portage.util._async.PipeLogger import PipeLogger
class SpawnProcess(SubProcess):
@@ -26,10 +21,7 @@ class SpawnProcess(SubProcess):
"path_lookup", "pre_exec")
__slots__ = ("args",) + \
- _spawn_kwarg_names + ("_log_file_real", "_selinux_type",)
-
- _file_names = ("log", "process", "stdout")
- _files_dict = slot_dict_class(_file_names, prefix="")
+ _spawn_kwarg_names + ("_pipe_logger", "_selinux_type",)
def _start(self):
@@ -37,17 +29,13 @@ class SpawnProcess(SubProcess):
self.fd_pipes = {}
fd_pipes = self.fd_pipes
- self._files = self._files_dict()
- files = self._files
-
master_fd, slave_fd = self._pipe(fd_pipes)
- fcntl.fcntl(master_fd, fcntl.F_SETFL,
- fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
- files.process = master_fd
- logfile = None
- if self._can_log(slave_fd):
- logfile = self.logfile
+ can_log = self._can_log(slave_fd)
+ if can_log:
+ log_file_path = self.logfile
+ else:
+ log_file_path = None
null_input = None
if not self.background or 0 in fd_pipes:
@@ -74,37 +62,19 @@ class SpawnProcess(SubProcess):
sys.__stderr__.flush()
break
- if logfile is not None:
+ fd_pipes_orig = fd_pipes.copy()
- fd_pipes_orig = fd_pipes.copy()
+ if log_file_path is not None:
fd_pipes[1] = slave_fd
fd_pipes[2] = slave_fd
- files.log = open(_unicode_encode(logfile,
- encoding=_encodings['fs'], errors='strict'), mode='ab')
- if logfile.endswith('.gz'):
- self._log_file_real = files.log
- files.log = gzip.GzipFile(filename='', mode='ab',
- fileobj=files.log)
-
- portage.util.apply_secpass_permissions(logfile,
- uid=portage.portage_uid, gid=portage.portage_gid,
- mode=0o660)
-
- if not self.background:
- files.stdout = os.dup(fd_pipes_orig[1])
-
- output_handler = self._output_handler
-
else:
-
# Create a dummy pipe so the scheduler can monitor
# the process from inside a poll() loop.
fd_pipes[self._dummy_pipe_fd] = slave_fd
if self.background:
fd_pipes[1] = slave_fd
fd_pipes[2] = slave_fd
- output_handler = self._dummy_handler
kwargs = {}
for k in self._spawn_kwarg_names:
@@ -116,10 +86,6 @@ class SpawnProcess(SubProcess):
kwargs["returnpid"] = True
kwargs.pop("logfile", None)
- self._reg_id = self.scheduler.io_add_watch(files.process,
- self._registered_events, output_handler)
- self._registered = True
-
retval = self._spawn(self.args, **kwargs)
os.close(slave_fd)
@@ -136,6 +102,18 @@ class SpawnProcess(SubProcess):
self.pid = retval[0]
portage.process.spawned_pids.remove(self.pid)
+ stdout_fd = None
+ if can_log and not self.background:
+ stdout_fd = os.dup(fd_pipes_orig[1])
+
+ self._pipe_logger = PipeLogger(background=self.background,
+ scheduler=self.scheduler, input_fd=master_fd,
+ log_file_path=log_file_path,
+ stdout_fd=stdout_fd)
+ self._pipe_logger.addExitListener(self._pipe_logger_exit)
+ self._pipe_logger.start()
+ self._registered = True
+
def _can_log(self, slave_fd):
return True
@@ -158,92 +136,17 @@ class SpawnProcess(SubProcess):
return spawn_func(args, **kwargs)
- def _output_handler(self, fd, event):
-
- files = self._files
- while True:
- buf = self._read_buf(fd, event)
-
- if buf is None:
- # not a POLLIN event, EAGAIN, etc...
- break
+ def _pipe_logger_exit(self, pipe_logger):
+ self._pipe_logger = None
+ self._unregister()
+ self.wait()
- if not buf:
- # EOF
- self._unregister()
- self.wait()
- break
-
- else:
- if not self.background:
- write_successful = False
- failures = 0
- while True:
- try:
- if not write_successful:
- os.write(files.stdout, buf)
- write_successful = True
- break
- except OSError as e:
- if e.errno != errno.EAGAIN:
- raise
- del e
- failures += 1
- if failures > 50:
- # Avoid a potentially infinite loop. In
- # most cases, the failure count is zero
- # and it's unlikely to exceed 1.
- raise
-
- # This means that a subprocess has put an inherited
- # stdio file descriptor (typically stdin) into
- # O_NONBLOCK mode. This is not acceptable (see bug
- # #264435), so revert it. We need to use a loop
- # here since there's a race condition due to
- # parallel processes being able to change the
- # flags on the inherited file descriptor.
- # TODO: When possible, avoid having child processes
- # inherit stdio file descriptors from portage
- # (maybe it can't be avoided with
- # PROPERTIES=interactive).
- fcntl.fcntl(files.stdout, fcntl.F_SETFL,
- fcntl.fcntl(files.stdout,
- fcntl.F_GETFL) ^ os.O_NONBLOCK)
-
- files.log.write(buf)
- files.log.flush()
-
- self._unregister_if_appropriate(event)
-
- return True
-
- def _dummy_handler(self, fd, event):
- """
- This method is mainly interested in detecting EOF, since
- the only purpose of the pipe is to allow the scheduler to
- monitor the process from inside a poll() loop.
- """
-
- while True:
- buf = self._read_buf(fd, event)
-
- if buf is None:
- # not a POLLIN event, EAGAIN, etc...
- break
-
- if not buf:
- # EOF
- self._unregister()
- self.wait()
- break
-
- self._unregister_if_appropriate(event)
-
- return True
+ def _waitpid_loop(self):
+ SubProcess._waitpid_loop(self)
- def _unregister(self):
- super(SpawnProcess, self)._unregister()
- if self._log_file_real is not None:
- # Avoid "ResourceWarning: unclosed file" since python 3.2.
- self._log_file_real.close()
- self._log_file_real = None
+ pipe_logger = self._pipe_logger
+ if pipe_logger is not None:
+ self._pipe_logger = None
+ pipe_logger.removeExitListener(self._pipe_logger_exit)
+ pipe_logger.cancel()
+ pipe_logger.wait()
diff --git a/pym/portage/util/_async/PipeLogger.py b/pym/portage/util/_async/PipeLogger.py
new file mode 100644
index 0000000..dbdd56f
--- /dev/null
+++ b/pym/portage/util/_async/PipeLogger.py
@@ -0,0 +1,149 @@
+# Copyright 2008-2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import fcntl
+import errno
+import gzip
+
+import portage
+from portage import os, _encodings, _unicode_encode
+from _emerge.AbstractPollTask import AbstractPollTask
+
+class PipeLogger(AbstractPollTask):
+
+ """
+ This can be used for logging output of a child process,
+ optionally outputing to log_file_path and/or stdout_fd. It can
+ also monitor for EOF on input_fd, which may be used to detect
+ termination of a child process. If log_file_path ends with
+ '.gz' then the log file is written with compression.
+ """
+
+ __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
+ ("_log_file", "_log_file_real", "_reg_id")
+
+ def _start(self):
+
+ log_file_path = self.log_file_path
+ if log_file_path is not None:
+
+ self._log_file = open(_unicode_encode(log_file_path,
+ encoding=_encodings['fs'], errors='strict'), mode='ab')
+ if log_file_path.endswith('.gz'):
+ self._log_file_real = self._log_file
+ self._log_file = gzip.GzipFile(filename='', mode='ab',
+ fileobj=self._log_file)
+
+ portage.util.apply_secpass_permissions(log_file_path,
+ uid=portage.portage_uid, gid=portage.portage_gid,
+ mode=0o660)
+
+ fcntl.fcntl(self.input_fd, fcntl.F_SETFL,
+ fcntl.fcntl(self.input_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+
+ self._reg_id = self.scheduler.io_add_watch(self.input_fd,
+ self._registered_events, self._output_handler)
+ self._registered = True
+
+ def isAlive(self):
+ return self._registered
+
+ def _cancel(self):
+ self._unregister()
+ if self.returncode is None:
+ self.returncode = self._cancelled_returncode
+
+ def _wait(self):
+ if self.returncode is not None:
+ return self.returncode
+ self._wait_loop()
+ self.returncode = os.EX_OK
+ return self.returncode
+
+ def _output_handler(self, fd, event):
+
+ background = self.background
+ stdout_fd = self.stdout_fd
+ log_file = self._log_file
+
+ while True:
+ buf = self._read_buf(fd, event)
+
+ if buf is None:
+ # not a POLLIN event, EAGAIN, etc...
+ break
+
+ if not buf:
+ # EOF
+ self._unregister()
+ self.wait()
+ break
+
+ else:
+ if not background and stdout_fd is not None:
+ write_successful = False
+ failures = 0
+ while True:
+ try:
+ if not write_successful:
+ os.write(stdout_fd, buf)
+ write_successful = True
+ break
+ except OSError as e:
+ if e.errno != errno.EAGAIN:
+ raise
+ del e
+ failures += 1
+ if failures > 50:
+ # Avoid a potentially infinite loop. In
+ # most cases, the failure count is zero
+ # and it's unlikely to exceed 1.
+ raise
+
+ # This means that a subprocess has put an inherited
+ # stdio file descriptor (typically stdin) into
+ # O_NONBLOCK mode. This is not acceptable (see bug
+ # #264435), so revert it. We need to use a loop
+ # here since there's a race condition due to
+ # parallel processes being able to change the
+ # flags on the inherited file descriptor.
+ # TODO: When possible, avoid having child processes
+ # inherit stdio file descriptors from portage
+ # (maybe it can't be avoided with
+ # PROPERTIES=interactive).
+ fcntl.fcntl(stdout_fd, fcntl.F_SETFL,
+ fcntl.fcntl(stdout_fd,
+ fcntl.F_GETFL) ^ os.O_NONBLOCK)
+
+ if log_file is not None:
+ log_file.write(buf)
+ log_file.flush()
+
+ self._unregister_if_appropriate(event)
+
+ return True
+
+ def _unregister(self):
+
+ if self._reg_id is not None:
+ self.scheduler.source_remove(self._reg_id)
+ self._reg_id = None
+
+ if self.input_fd is not None:
+ os.close(self.input_fd)
+ self.input_fd = None
+
+ if self.stdout_fd is not None:
+ os.close(self.stdout_fd)
+ self.stdout_fd = None
+
+ if self._log_file is not None:
+ self._log_file.close()
+ self._log_file = None
+
+ if self._log_file_real is not None:
+ # Avoid "ResourceWarning: unclosed file" since python 3.2.
+ self._log_file_real.close()
+ self._log_file_real = None
+
+ self._registered = False
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2012-10-19 1:23 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2012-10-19 1:23 UTC (permalink / raw
To: gentoo-commits
commit: 14f4235b940700f9d6470a216d6fe3dce37ea148
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Fri Oct 19 01:23:02 2012 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Oct 19 01:23:02 2012 +0000
URL: http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=14f4235b
PipeLogger/Reader: remove redundant isAlive()
Parent class AbstractPollTask has equivalent implementation.
---
pym/_emerge/PipeReader.py | 3 ---
pym/portage/util/_async/PipeLogger.py | 3 ---
2 files changed, 0 insertions(+), 6 deletions(-)
diff --git a/pym/_emerge/PipeReader.py b/pym/_emerge/PipeReader.py
index 3eb9d36..fcdefb4 100644
--- a/pym/_emerge/PipeReader.py
+++ b/pym/_emerge/PipeReader.py
@@ -33,9 +33,6 @@ class PipeReader(AbstractPollTask):
self._registered_events, output_handler))
self._registered = True
- def isAlive(self):
- return self._registered
-
def _cancel(self):
if self.returncode is None:
self.returncode = 1
diff --git a/pym/portage/util/_async/PipeLogger.py b/pym/portage/util/_async/PipeLogger.py
index dbdd56f..0905e47 100644
--- a/pym/portage/util/_async/PipeLogger.py
+++ b/pym/portage/util/_async/PipeLogger.py
@@ -45,9 +45,6 @@ class PipeLogger(AbstractPollTask):
self._registered_events, self._output_handler)
self._registered = True
- def isAlive(self):
- return self._registered
-
def _cancel(self):
self._unregister()
if self.returncode is None:
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2013-01-06 11:16 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2013-01-06 11:16 UTC (permalink / raw
To: gentoo-commits
commit: 4ac9adbb25e1f83853976cba374d94b2a82ef124
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Jan 6 11:15:49 2013 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Jan 6 11:15:49 2013 +0000
URL: http://git.overlays.gentoo.org/gitweb/?p=proj/portage.git;a=commit;h=4ac9adbb
_keep_scheduling: check _terminated.is_set()
This ensures that scheduling loops terminate as soon as possible after
an interrupt is received.
---
pym/_emerge/Scheduler.py | 4 ++--
pym/portage/util/_async/AsyncScheduler.py | 4 ++--
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index a423274..c2f2459 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -1,4 +1,4 @@
-# Copyright 1999-2012 Gentoo Foundation
+# Copyright 1999-2013 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
from __future__ import print_function
@@ -1531,7 +1531,7 @@ class Scheduler(PollScheduler):
self._config_pool[settings['EROOT']].append(settings)
def _keep_scheduling(self):
- return bool(not self._terminated_tasks and self._pkg_queue and \
+ return bool(not self._terminated.is_set() and self._pkg_queue and \
not (self._failed_pkgs and not self._build_opts.fetchonly))
def _is_work_scheduled(self):
diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py
index 182e19e..9b96c6f 100644
--- a/pym/portage/util/_async/AsyncScheduler.py
+++ b/pym/portage/util/_async/AsyncScheduler.py
@@ -1,4 +1,4 @@
-# Copyright 2012 Gentoo Foundation
+# Copyright 2012-2013 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
from portage import os
@@ -38,7 +38,7 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
raise NotImplementedError(self)
def _keep_scheduling(self):
- return self._remaining_tasks and not self._terminated_tasks
+ return self._remaining_tasks and not self._terminated.is_set()
def _running_job_count(self):
return len(self._running_tasks)
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
@ 2017-03-24 20:33 Zac Medico
0 siblings, 0 replies; 9+ messages in thread
From: Zac Medico @ 2017-03-24 20:33 UTC (permalink / raw
To: gentoo-commits
commit: 86400e9f864e86f8f677ccda9ce4103d6d02ef87
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Mar 21 06:56:55 2017 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Mar 24 20:32:25 2017 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=86400e9f
PollScheduler: terminate via call_soon for asyncio compat
Use call_soon to schedule the _termination_check callback when needed.
The previous idle_add usage was relatively inefficient, because it
scheduled the _termination_check callback to be called in every
iteration of the event loop.
Add a _cleanup method to handle cleanup of callbacks registered with
the global event loop. Since the terminate method is thread safe and it
interacts with self._term_callback_handle, use this variable only while
holding a lock.
pym/_emerge/PollScheduler.py | 57 +++++++++++++++++++++++--------
pym/_emerge/Scheduler.py | 7 ++--
pym/portage/util/_async/AsyncScheduler.py | 16 ++++-----
3 files changed, 54 insertions(+), 26 deletions(-)
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index b118ac157..569879b36 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -25,8 +25,10 @@ class PollScheduler(object):
a non-main thread)
@type main: bool
"""
+ self._term_rlock = threading.RLock()
self._terminated = threading.Event()
self._terminated_tasks = False
+ self._term_check_handle = None
self._max_jobs = 1
self._max_load = None
self._scheduling = False
@@ -44,6 +46,21 @@ class PollScheduler(object):
def _is_background(self):
return self._background
+ def _cleanup(self):
+ """
+ Cleanup any callbacks that have been registered with the global
+ event loop.
+ """
+ # The self._term_check_handle attribute requires locking
+ # since it's modified by the thread safe terminate method.
+ with self._term_rlock:
+ if self._term_check_handle not in (None, False):
+ self._term_check_handle.cancel()
+ # This prevents the terminate method from scheduling
+ # any more callbacks (since _cleanup must eliminate all
+ # callbacks in order to ensure complete cleanup).
+ self._term_check_handle = False
+
def terminate(self):
"""
Schedules asynchronous, graceful termination of the scheduler
@@ -51,26 +68,36 @@ class PollScheduler(object):
This method is thread-safe (and safe for signal handlers).
"""
- self._terminated.set()
+ with self._term_rlock:
+ if self._term_check_handle is None:
+ self._terminated.set()
+ self._term_check_handle = self._event_loop.call_soon_threadsafe(
+ self._termination_check, True)
- def _termination_check(self):
+ def _termination_check(self, retry=False):
"""
Calls _terminate_tasks() if appropriate. It's guaranteed not to
- call it while _schedule_tasks() is being called. The check should
- be executed for each iteration of the event loop, for response to
- termination signals at the earliest opportunity. It always returns
- True, for continuous scheduling via idle_add.
+ call it while _schedule_tasks() is being called. This method must
+ only be called via the event loop thread.
+
+ @param retry: If True then reschedule if scheduling state prevents
+ immediate termination.
+ @type retry: bool
"""
- if not self._scheduling and \
- self._terminated.is_set() and \
+ if self._terminated.is_set() and \
not self._terminated_tasks:
- self._scheduling = True
- try:
- self._terminated_tasks = True
- self._terminate_tasks()
- finally:
- self._scheduling = False
- return True
+ if not self._scheduling:
+ self._scheduling = True
+ try:
+ self._terminated_tasks = True
+ self._terminate_tasks()
+ finally:
+ self._scheduling = False
+
+ elif retry:
+ with self._term_rlock:
+ self._term_check_handle = self._event_loop.call_soon(
+ self._termination_check, True)
def _terminate_tasks(self):
"""
diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index 71fe75f62..58ff97139 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -1055,6 +1055,7 @@ class Scheduler(PollScheduler):
else:
signal.signal(signal.SIGCONT, signal.SIG_DFL)
+ self._termination_check()
if received_signal:
sys.exit(received_signal[0])
@@ -1091,6 +1092,10 @@ class Scheduler(PollScheduler):
if isinstance(x, Package) and x.operation == "merge"])
self._status_display.maxval = self._pkg_count.maxval
+ # Cleanup any callbacks that have been registered with the global
+ # event loop by calls to the terminate method.
+ self._cleanup()
+
self._logger.log(" *** Finished. Cleaning up...")
if failed_pkgs:
@@ -1393,7 +1398,6 @@ class Scheduler(PollScheduler):
blocker_db.discardBlocker(pkg)
def _main_loop(self):
- term_check_id = self._event_loop.idle_add(self._termination_check)
loadavg_check_id = None
if self._max_load is not None and \
self._loadavg_latency is not None and \
@@ -1420,7 +1424,6 @@ class Scheduler(PollScheduler):
while self._is_work_scheduled():
self._event_loop.iteration()
finally:
- self._event_loop.source_remove(term_check_id)
if loadavg_check_id is not None:
self._event_loop.source_remove(loadavg_check_id)
diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py
index 9b96c6f36..3deb6cb04 100644
--- a/pym/portage/util/_async/AsyncScheduler.py
+++ b/pym/portage/util/_async/AsyncScheduler.py
@@ -18,7 +18,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
self._error_count = 0
self._running_tasks = set()
self._remaining_tasks = True
- self._term_check_id = None
self._loadavg_check_id = None
def _poll(self):
@@ -65,7 +64,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
self._schedule()
def _start(self):
- self._term_check_id = self._event_loop.idle_add(self._termination_check)
if self._max_load is not None and \
self._loadavg_latency is not None and \
(self._max_jobs is True or self._max_jobs > 1):
@@ -75,6 +73,12 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
self._loadavg_latency, self._schedule)
self._schedule()
+ def _cleanup(self):
+ super(AsyncScheduler, self)._cleanup()
+ if self._loadavg_check_id is not None:
+ self._event_loop.source_remove(self._loadavg_check_id)
+ self._loadavg_check_id = None
+
def _wait(self):
# Loop while there are jobs to be scheduled.
while self._keep_scheduling():
@@ -86,13 +90,7 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
while self._is_work_scheduled():
self._event_loop.iteration()
- if self._term_check_id is not None:
- self._event_loop.source_remove(self._term_check_id)
- self._term_check_id = None
-
- if self._loadavg_check_id is not None:
- self._event_loop.source_remove(self._loadavg_check_id)
- self._loadavg_check_id = None
+ self._cleanup()
if self._error_count > 0:
self.returncode = 1
^ permalink raw reply related [flat|nested] 9+ messages in thread
end of thread, other threads:[~2017-03-24 20:33 UTC | newest]
Thread overview: 9+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2013-01-06 11:16 [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/ Zac Medico
-- strict thread matches above, loose matches on Subject: below --
2017-03-24 20:33 Zac Medico
2012-10-19 1:23 Zac Medico
2012-10-19 1:15 Zac Medico
2012-10-16 19:28 Zac Medico
2012-10-08 20:44 Zac Medico
2012-10-07 18:51 Zac Medico
2012-10-07 18:22 Zac Medico
2012-10-07 18:17 Zac Medico
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox