* [gentoo-commits] proj/R_overlay:master commit in: roverlay/ebuild/, roverlay/depres/, roverlay/overlay/
@ 2012-07-02 16:52 André Erdmann
0 siblings, 0 replies; only message in thread
From: André Erdmann @ 2012-07-02 16:52 UTC (permalink / raw
To: gentoo-commits
commit: 8a174d86bba8fa67d38935da2b7b7c51f53606fa
Author: André Erdmann <dywi <AT> mailerd <DOT> de>
AuthorDate: Mon Jul 2 16:48:31 2012 +0000
Commit: André Erdmann <dywi <AT> mailerd <DOT> de>
CommitDate: Mon Jul 2 16:48:31 2012 +0000
URL: http://git.overlays.gentoo.org/gitweb/?p=proj/R_overlay.git;a=commit;h=8a174d86
threaded overlay writing fix some threading issues
* the overlay produced can now be written to fs using per-pacackage dir threads
* fixed some thread locking issues
* added an error queue used by threads to pass exceptions to the main module
modified: roverlay/depres/channels.py
modified: roverlay/depres/depresolver.py
modified: roverlay/ebuild/creation.py
modified: roverlay/ebuild/depres.py
modified: roverlay/overlay/__init__.py
modified: roverlay/overlay/category.py
modified: roverlay/overlay/creator.py
modified: roverlay/overlay/package.py
modified: roverlay/overlay/worker.py
---
roverlay/depres/channels.py | 16 ++-
roverlay/depres/depresolver.py | 296 +++++++++++++++++++++++++---------------
roverlay/ebuild/creation.py | 17 ++-
roverlay/ebuild/depres.py | 9 +-
roverlay/overlay/__init__.py | 18 ++-
roverlay/overlay/category.py | 76 +++++++++--
roverlay/overlay/creator.py | 119 ++++++++++++----
roverlay/overlay/package.py | 165 +++++++++++------------
roverlay/overlay/worker.py | 81 ++++++++----
9 files changed, 510 insertions(+), 287 deletions(-)
diff --git a/roverlay/depres/channels.py b/roverlay/depres/channels.py
index 65d4d45..56491f4 100644
--- a/roverlay/depres/channels.py
+++ b/roverlay/depres/channels.py
@@ -19,7 +19,7 @@ class EbuildJobChannel ( DependencyResolverChannel ):
add deps, then satisfy_request(): collect/lookup
"""
- def __init__ ( self, name=None, logger=None ):
+ def __init__ ( self, err_queue, name=None, logger=None ):
"""EbuildJobChannel
arguments:
@@ -32,6 +32,8 @@ class EbuildJobChannel ( DependencyResolverChannel ):
# in the join()-method
self._depdone = 0
+ self.err_queue = err_queue
+
# set of portage packages (resolved deps)
# this is None unless all deps have been successfully resolved
self._collected_deps = None
@@ -170,12 +172,15 @@ class EbuildJobChannel ( DependencyResolverChannel ):
# DEPEND/RDEPEND/.. later, seewave requires sci-libs/fftw
# in both DEPEND and RDEPEND for example
dep_collected = set()
- satisfiable = True
+ satisfiable = self.err_queue.empty()
def handle_queue_item ( dep_env ):
self._depdone += 1
- if dep_env.is_resolved():
+ if dep_env is None:
+ # could used to unblock the queue
+ return self.err_queue.empty()
+ elif dep_env.is_resolved():
### and dep_env in self.dep_env_list
# successfully resolved
dep_collected.add ( dep_env.get_result() [1] )
@@ -190,7 +195,8 @@ class EbuildJobChannel ( DependencyResolverChannel ):
# loop until
# (a) at least one dependency could not be resolved or
- # (b) all deps processed
+ # (b) all deps processed or
+ # (c) error queue not empty
while self._depdone < len ( self.dep_env_list ) and satisfiable:
# tell the resolver to start
self._depres_master.start()
@@ -204,7 +210,7 @@ class EbuildJobChannel ( DependencyResolverChannel ):
# --- end while
if satisfiable:
- self._collected_deps = dep_collected
+ self._collected_deps = frozenset ( dep_collected )
return self._collected_deps
else:
if close_if_unresolvable: self.close()
diff --git a/roverlay/depres/depresolver.py b/roverlay/depres/depresolver.py
index 26040c0..7f91983 100644
--- a/roverlay/depres/depresolver.py
+++ b/roverlay/depres/depresolver.py
@@ -29,12 +29,11 @@ class DependencyResolver ( object ):
"""Main object for dependency resolution."""
- NUMTHREADS = config.get ( "DEPRES.jobcount", 0 )
+ NUMTHREADS = config.get ( "DEPRES.jobcount", 15 )
def __init__ ( self ):
"""Initializes a DependencyResolver."""
- # these loggers are temporary helpers
self.logger = logging.getLogger ( self.__class__.__name__ )
self.logger_unresolvable = self.logger.getChild ( "UNRESOLVABLE" )
self.logger_resolved = self.logger.getChild ( "RESOLVED" )
@@ -46,6 +45,7 @@ class DependencyResolver ( object ):
# this lock tells whether a dep res 'master' thread is running (locked)
self.runlock = threading.Lock()
+ self.startlock = threading.Lock()
# the dep res main thread
self._mainthread = None
# the dep res worker threads
@@ -86,6 +86,9 @@ class DependencyResolver ( object ):
self.all_channel_ids = set()
# --- end of __init__ (...) ---
+ def set_exception_queue ( self, equeue ):
+ self.err_queue = equeue
+
def _sort ( self ):
"""Sorts the rule pools of this resolver."""
for pool in self.static_rule_pools: pool.sort()
@@ -188,7 +191,7 @@ class DependencyResolver ( object ):
* channel -- channel to be registered
automatically sets channel's resolver to self if it is None
- raises: Exception if channels is already registered with this resolver
+ raises: Exception if channel is already registered with this resolver
returns: channel
"""
@@ -242,7 +245,6 @@ class DependencyResolver ( object ):
except KeyError as expected:
# ok
pass
-
# --- end of channel_closed (...) ---
def _queue_previously_failed ( self ):
@@ -259,81 +261,138 @@ class DependencyResolver ( object ):
# --- end of _queue_previously_failed (...) ---
def start ( self ):
- if not self.runlock.acquire ( False ):
- # already running
- return True
- # --
+ # -- verify whether resolver has to be started
+ if self._depqueue.empty():
+ # nothing to resolve
+ return
+
+ if not self.startlock.acquire ( False ):
+ # another channel/.. is starting the resolver
+ return
+ elif self._depqueue.empty():
+ self.startlock.release()
+ return
+
+ # -- verify...
+
+ # acquire the run lock (that locks _run_main)
+ try:
+ self.runlock.acquire()
+ finally:
+ self.startlock.release()
+
+ if self._depqueue.empty():
+ self.runlock.release()
+ return
if DependencyResolver.NUMTHREADS > 0:
# no need to wait for the old thread
+ # FIXME: could remove the following block
+ if self._mainthread is not None:
+ self._mainthread.join()
+ del self._mainthread
+
self._mainthread = threading.Thread ( target=self._thread_run_main )
self._mainthread.start()
+
else:
self._thread_run_main()
# self.runlock is released when _thread_run_main is done
# --- end of start (...) ---
+ def unblock_channels ( self ):
+ # unblock all channels by processing all remaining deps as
+ # unresolved
+ ## other option: select channel, but this may interfere with
+ ## channel_closed()
+ channel_gone = set()
+ while not self._depqueue.empty():
+ chan, dep_env = self._depqueue.get_nowait()
+ dep_env.set_unresolvable()
+
+ if chan not in channel_gone:
+ try:
+ self._depqueue_done [chan].put_nowait ( dep_env )
+ except KeyError:
+ channel_gone.add ( chan )
+ # --- end of unblock_channels (...) ---
+
def _thread_run_main ( self ):
"""Tells the resolver to run."""
-
jobcount = DependencyResolver.NUMTHREADS
- if jobcount < 1:
- ( self.logger.warning if jobcount < 0 else self.logger.debug ) (
- "Running in sequential mode."
- )
- self._thread_run_resolve()
- else:
+ try:
+ if jobcount < 1:
+ ( self.logger.warning if jobcount < 0 else self.logger.debug ) (
+ "Running in sequential mode."
+ )
+ self._thread_run_resolve()
+ else:
- # wait for old threads
- if not self._threads is None:
- self.logger.warning ( "Waiting for old threads..." )
+ # wait for old threads
+ if not self._threads is None:
+ self.logger.warning ( "Waiting for old threads..." )
+ for t in self._threads: t.join()
+
+ self.logger.debug (
+ "Running in concurrent mode with %i jobs." % jobcount
+ )
+
+ # create threads,
+ self._threads = tuple (
+ threading.Thread ( target=self._thread_run_resolve )
+ for n in range (jobcount)
+ )
+ # run them
+ for t in self._threads: t.start()
+ # and wait until done
for t in self._threads: t.join()
- self.logger.warning (
- "Running in concurrent mode with %i jobs." % jobcount
- )
+ # finally remove them
+ del self._threads
+ self._threads = None
- # create threads,
- self._threads = [
- threading.Thread ( target=self._thread_run_resolve )
- for n in range (jobcount)
- ]
- # run them
- for t in self._threads: t.start()
- # and wait until done
- for t in self._threads: t.join()
- # finally remove them
- del self._threads
- self._threads = None
+ # iterate over _depqueue_failed and report unresolved
+ ## todo can thread this
+ while not self._depqueue_failed.empty():
+ try:
+ channel_id, dep_env = self._depqueue_failed.get_nowait()
+ except queue.Empty:
+ # race cond empty() <-> get_nowait()
+ return
- # iterate over _depqueue_failed and report unresolved
- ## todo can thread this
- while not self._depqueue_failed.empty():
- try:
- channel_id, dep_env = self._depqueue_failed.get_nowait()
dep_env.set_unresolvable()
self._report_event ( 'UNRESOLVABLE', dep_env )
- if channel_id in self._depqueue_done:
- ## todo/fixme/whatever: this 'if' can filter out channels
- ## that have been added again
- self._depqueue_done [channel_id].put ( dep_env )
- except queue.Empty:
- # race cond empty() <-> get_nowait()
- break
- except KeyError:
- # channel has been closed before calling put, ignore this err
- pass
+ try:
+ if channel_id in self._depqueue_done:
+ self._depqueue_done [channel_id].put_nowait ( dep_env )
+ except KeyError:
+ # channel has been closed before calling put, ignore this
+ pass
+
+ if not self.err_queue.empty():
+ self.unblock_channels()
+
+ except ( Exception, KeyboardInterrupt ) as e:
+
+ self.unblock_channels()
- # release the lock
- self.runlock.release()
+ if jobcount > 0 and hasattr ( self, 'err_queue' ):
+ self.err_queue.put_nowait ( id ( self ), e )
+ return
+ else:
+ raise e
+
+ finally:
+ # release the lock
+ self.runlock.release()
# --- end of _thread_run_main (...) ---
@@ -342,77 +401,90 @@ class DependencyResolver ( object ):
returns: None (implicit)
"""
+ try:
+ while self.err_queue.empty() and not self._depqueue.empty():
- while not self._depqueue.empty():
-
- try:
- to_resolve = self._depqueue.get_nowait()
- except queue.Empty:
- # this thread is done when the queue is empty, so this is
- # no error, but just the result of the race condition between
- # queue.empty() and queue.get(False)
- return None
-
- channel_id, dep_env = to_resolve
-
- if channel_id in self._depqueue_done:
- # else channel has been closed, drop dep
-
- self.logger.debug ( "Trying to resolve '%s'." % dep_env.dep_str )
-
- #have_new_rule = False
+ try:
+ to_resolve = self._depqueue.get_nowait()
+ except queue.Empty:
+ # this thread is done when the queue is empty, so this is
+ # no error, but just the result of the race condition between
+ # queue.empty() and queue.get(False)
+ return
- resolved = None
- # resolved can be None, so use a tri-state int for checking
- # 0 -> unresolved, but resolvable
- # 1 -> unresolved and (currently, new rules may change this)
- # not resolvable
- # 2 -> resolved
- is_resolved = 0
+ channel_id, dep_env = to_resolve
- # TODO:
- # (threading: could search the pools in parallel)
+ if channel_id in self._depqueue_done:
+ # else channel has been closed, drop dep
- if USING_DEPRES_CACHE:
- if dep_env.dep_str_low in self._dep_unresolvable:
- # cannot resolve
- is_resolved = 1
+ self.logger.debug (
+ "Trying to resolve '%s'." % dep_env.dep_str
+ )
- if is_resolved == 0:
- # search for a match in the rule pools
- for rulepool in self.static_rule_pools:
- result = rulepool.matches ( dep_env )
- if not result is None and result [0] > 0:
- resolved = result [1]
- is_resolved = 2
- break
+ #have_new_rule = False
+ resolved = None
+ # resolved can be None, so use a tri-state int for checking
+ # 0 -> unresolved, but resolvable
+ # 1 -> unresolved and (currently, new rules may change this)
+ # not resolvable
+ # 2 -> resolved
+ is_resolved = 0
+ # TODO:
+ # (threading: could search the pools in parallel)
- if is_resolved == 2:
- dep_env.set_resolved ( resolved, append=False )
- self._report_event ( 'RESOLVED', dep_env )
- self._depqueue_done [channel_id].put ( dep_env )
- else:
- self._depqueue_failed.put ( to_resolve )
-
- if USING_DEPRES_CACHE:
- # does not work when adding new rules is possible
- self._dep_unresolvable.add ( dep_env.dep_str_low )
-
- """
- ## only useful if new rules can be created
- # new rule found, requeue all previously failed dependency searches
- if have_new_rule:
- self._queue_previously_failed
if USING_DEPRES_CACHE:
- self._dep_unresolvable.clear() #?
- """
- # --- end if channel_id in self._depqueue_done
-
- self._depqueue.task_done()
- # --- end while
-
+ if dep_env.dep_str_low in self._dep_unresolvable:
+ # cannot resolve
+ is_resolved = 1
+
+ if is_resolved == 0:
+ # search for a match in the rule pools
+ for rulepool in self.static_rule_pools:
+ result = rulepool.matches ( dep_env )
+ if not result is None and result [0] > 0:
+ resolved = result [1]
+ is_resolved = 2
+ break
+
+
+
+ if is_resolved == 2:
+ dep_env.set_resolved ( resolved, append=False )
+ self._report_event ( 'RESOLVED', dep_env )
+ try:
+ self._depqueue_done [channel_id].put ( dep_env )
+ except KeyError:
+ # channel gone while resolving
+ pass
+ else:
+ self._depqueue_failed.put ( to_resolve )
+
+ if USING_DEPRES_CACHE:
+ # does not work when adding new rules is possible
+ self._dep_unresolvable.add ( dep_env.dep_str_low )
+
+ """
+ ## only useful if new rules can be created
+ # new rule found, requeue all previously
+ # failed dependency searches
+ if have_new_rule:
+ self._queue_previously_failed
+ if USING_DEPRES_CACHE:
+ self._dep_unresolvable.clear() #?
+ """
+ # --- end if channel_id in self._depqueue_done
+
+ self._depqueue.task_done()
+ # --- end while
+
+ except ( Exception, KeyboardInterrupt ) as e:
+ if jobcount > 0 and hasattr ( self, 'err_queue' ):
+ self.err_queue.put_nowait ( id ( self ), e )
+ return
+ else:
+ raise e
# --- end of _thread_run_resolve (...) ---
diff --git a/roverlay/ebuild/creation.py b/roverlay/ebuild/creation.py
index 056b1a2..b606ae9 100644
--- a/roverlay/ebuild/creation.py
+++ b/roverlay/ebuild/creation.py
@@ -22,7 +22,7 @@ FALLBACK_DESCRIPTION = "<none>"
class EbuildCreation ( object ):
"""Used to create an ebuild using DESCRIPTION data."""
- def __init__ ( self, package_info, depres_channel_spawner=None ):
+ def __init__ ( self, package_info, err_queue, depres_channel_spawner=None ):
"""Initializes the creation of an ebuild.
arguments:
@@ -39,6 +39,8 @@ class EbuildCreation ( object ):
self.depres_channel_spawner = depres_channel_spawner
+ self.err_queue = err_queue
+
self.package_info.set_readonly()
# --- end of __init__ (...) ---
@@ -64,10 +66,10 @@ class EbuildCreation ( object ):
self.logger.info ( "Cannot create an ebuild for this package." )
self.status = -1
- except Exception as e:
- # log this and set status to fail
+ except ( Exception, KeyboardInterrupt ):
+ # set status to fail
self.status = -10
- self.logger.exception ( e )
+ raise
# --- end of run (...) ---
def _lazyimport_desc_data ( self ):
@@ -82,8 +84,8 @@ class EbuildCreation ( object ):
logger=self.logger,
read_now=True
)
- self.package_info.set_writeable()
- self.package_info.update (
+
+ self.package_info.update_now (
desc_data=reader.get_desc ( run_if_unset=False )
)
del reader
@@ -133,7 +135,8 @@ class EbuildCreation ( object ):
_dep_resolution = depres.EbuildDepRes (
self.package_info, self.logger,
create_iuse=True, run_now=True,
- depres_channel_spawner=self.depres_channel_spawner
+ depres_channel_spawner=self.depres_channel_spawner,
+ err_queue=self.err_queue
)
if not _dep_resolution.success():
# log here? (FIXME)
diff --git a/roverlay/ebuild/depres.py b/roverlay/ebuild/depres.py
index 383831e..1c19a00 100644
--- a/roverlay/ebuild/depres.py
+++ b/roverlay/ebuild/depres.py
@@ -22,8 +22,8 @@ class EbuildDepRes ( object ):
"""Handles dependency resolution for a single ebuild."""
def __init__ (
- self, package_info, logger, depres_channel_spawner,
- create_iuse=True, run_now=True
+ self, package_info, logger, depres_channel_spawner, err_queue,
+ create_iuse=True, run_now=True,
):
"""Initializes an EbuildDepRes.
@@ -45,6 +45,8 @@ class EbuildDepRes ( object ):
self.has_suggests = None
self.create_iuse = create_iuse
+ self.err_queue = err_queue
+
self._channels = None
if run_now:
@@ -90,7 +92,8 @@ class EbuildDepRes ( object ):
if dependency_type not in self._channels:
self._channels [dependency_type] = self.request_resolver (
name=dependency_type,
- logger=self.logger
+ logger=self.logger,
+ err_queue=self.err_queue
)
return self._channels [dependency_type]
# --- end of get_channel (...) ---
diff --git a/roverlay/overlay/__init__.py b/roverlay/overlay/__init__.py
index 9ca6abe..a8a939d 100644
--- a/roverlay/overlay/__init__.py
+++ b/roverlay/overlay/__init__.py
@@ -77,14 +77,16 @@ class Overlay ( object ):
"""
if not category in self._categories:
self._catlock.acquire()
- if not category in self._categories:
- self._categories [category] = Category (
- category,
- self.logger,
- None if self.physical_location is None else \
- os.path.join ( self.physical_location, category )
- )
- self._catlock.release()
+ try:
+ if not category in self._categories:
+ self._categories [category] = Category (
+ category,
+ self.logger,
+ None if self.physical_location is None else \
+ os.path.join ( self.physical_location, category )
+ )
+ finally:
+ self._catlock.release()
return self._categories [category]
# --- end of _get_category (...) ---
diff --git a/roverlay/overlay/category.py b/roverlay/overlay/category.py
index 910f194..e938dbf 100644
--- a/roverlay/overlay/category.py
+++ b/roverlay/overlay/category.py
@@ -5,12 +5,19 @@
import threading
import os.path
+try:
+ import queue
+except ImportError:
+ import Queue as queue
+
from roverlay.overlay.package import PackageDir
import roverlay.util
class Category ( object ):
+ WRITE_JOBCOUNT = 3
+
def __init__ ( self, name, logger, directory ):
"""Initializes a overlay/portage category (such as 'app-text', 'sci-R').
@@ -45,14 +52,16 @@ class Category ( object ):
if not pkg_name in self._subdirs:
self._lock.acquire()
- if not pkg_name in self._subdirs:
- self._subdirs [pkg_name] = PackageDir (
- pkg_name,
- self.logger,
- None if self.physical_location is None else \
- os.path.join ( self.physical_location, pkg_name )
- )
- self._lock.release()
+ try:
+ if not pkg_name in self._subdirs:
+ self._subdirs [pkg_name] = PackageDir (
+ pkg_name,
+ self.logger,
+ None if self.physical_location is None else \
+ os.path.join ( self.physical_location, pkg_name )
+ )
+ finally:
+ self._lock.release()
self._subdirs [pkg_name].add ( package_info )
# --- end of add (...) ---
@@ -92,13 +101,56 @@ class Category ( object ):
package.show ( **show_kw )
# --- end of show (...) ---
+ def _run_write_queue ( self, q, write_kw ):
+ try:
+ while not q.empty():
+ pkg = q.get_nowait()
+ pkg.write ( **write_kw )
+
+ except queue.Empty:
+ pass
+ except ( Exception, KeyboardInterrupt ) as e:
+ self.RERAISE_EXCEPTION = e
+
+ # --- end of _run_write_queue (...) ---
+
def write ( self, **write_kw ):
"""Writes this category to its filesystem location.
returns: None (implicit)
"""
- for package in self._subdirs.values():
- if package.physical_location and not package.empty():
- roverlay.util.dodir ( package.physical_location )
- package.write ( **write_kw )
+
+ max_jobs = self.__class__.WRITE_JOBCOUNT
+
+ # todo len.. > 3: what's an reasonable number of min package dirs to
+ # start threaded writing?
+ if max_jobs > 1 and len ( self._subdirs ) > 3:
+
+ # writing 1..self.__class__.WRITE_JOBCOUNT package dirs at once
+
+ write_queue = queue.Queue()
+ for package in self._subdirs.values():
+ if package.physical_location and not package.empty():
+ roverlay.util.dodir ( package.physical_location )
+ write_queue.put_nowait ( package )
+
+
+ if not write_queue.empty():
+ workers = (
+ threading.Thread (
+ target=self._run_write_queue,
+ args=( write_queue, write_kw )
+ ) for n in range ( max_jobs )
+ )
+
+ for w in workers: w.start()
+ for w in workers: w.join()
+
+ if hasattr ( self, 'RERAISE_EXCEPTION' ):
+ raise self.RERAISE_EXCEPTION
+ else:
+ for package in self._subdirs.values():
+ if package.physical_location and not package.empty():
+ roverlay.util.dodir ( package.physical_location )
+ package.write ( **write_kw )
# --- end of write (...) ---
diff --git a/roverlay/overlay/creator.py b/roverlay/overlay/creator.py
index 608ba77..aa1a3a8 100644
--- a/roverlay/overlay/creator.py
+++ b/roverlay/overlay/creator.py
@@ -5,6 +5,9 @@
import time
import logging
import threading
+import signal
+import traceback
+import sys
try:
import queue
@@ -40,18 +43,26 @@ class OverlayCreator ( object ):
self.depresolver = easyresolver.setup()
- self.NUMTHREADS = config.get ( 'EBUILD.jobcount', 0 )
+ self.NUMTHREADS = config.get ( 'EBUILD.jobcount', 10 )
# --
- self._pkg_queue = queue.Queue()
- self._workers = None
- self._runlock = threading.RLock()
+ self._pkg_queue = queue.Queue()
+
+ # this queue is used to propagate exceptions from threads
+ # it's
+ self._err_queue = queue.Queue()
+
+
+ self._workers = None
+ self._runlock = threading.RLock()
self.can_write_overlay = OVERLAY_WRITE_ALLOWED
# this is a method that adds PackageInfo objects to the pkg queue
self.add_package = self._pkg_queue.put
+ self.depresolver.set_exception_queue ( self._err_queue )
+
# --- end of __init__ (...) ---
def _timestamp ( self, description, start, stop=None ):
@@ -109,17 +120,24 @@ class OverlayCreator ( object ):
def run ( self ):
"""Starts ebuild creation and waits until done."""
self._runlock.acquire()
- self.start()
- self.join()
- self._runlock.release()
+ try:
+ self.start()
+ self.join()
+ finally:
+ self._runlock.release()
# --- end of run (...) ---
def start ( self ):
"""Starts ebuild creation."""
self._runlock.acquire()
- self.join()
- self._make_workers()
- self._runlock.release()
+ try:
+ self.join()
+ self._make_workers()
+ except:
+ self._err_queue.put_nowait ( ( -1, None ) )
+ raise
+ finally:
+ self._runlock.release()
# --- end of start (...) ---
def join ( self ):
@@ -142,13 +160,14 @@ class OverlayCreator ( object ):
if self.depresolver is None: return
self._runlock.acquire()
- if self.depresolver is None: return
- self.depresolver.close()
- del self.depresolver
- self.depresolver = None
-
- self._runlock.release()
+ try:
+ if self.depresolver is not None:
+ self.depresolver.close()
+ del self.depresolver
+ self.depresolver = None
+ finally:
+ self._runlock.release()
# --- end of _close_resolver (...) ---
def _waitfor_workers ( self, do_close ):
@@ -157,26 +176,65 @@ class OverlayCreator ( object ):
arguments:
* do_close -- close (exit) workers if True, else wait until done.
"""
- if self._workers is None: return
- self._runlock.acquire()
if self._workers is None: return
+ start = None
+ self._runlock.acquire()
- if self.NUMTHREADS > 0: start = time.time()
+ try:
+ if self._workers is not None:
+ if self.NUMTHREADS > 0: start = time.time()
+
+ if do_close:
+ self._err_queue.put_nowait ( ( -1, None ) )
+ # fixme: remove enabled?
+ for w in self._workers: w.enabled = False
+ else:
+ for w in self._workers: w.stop_when_empty()
+
+ while True in ( w.active() for w in self._workers ):
+ self._pkg_queue.put ( None )
+
+ del self._workers
+ self._workers = None
+
+ while not self._err_queue.empty():
+ e = self._err_queue.get_nowait()
+ self._err_queue.put_nowait ( ( -2, None ) )
+ if isinstance ( e [1], ( Exception, KeyboardInterrupt ) ):
+ self._err_queue.put ( e )
+ self.logger.warning ( "Reraising thread exception." )
+ raise e [1]
+
+
+
+ except ( Exception, KeyboardInterrupt ) as err:
+ # catch interrupt here: still wait until all workers have been closed
+ # and reraise after that
+# SIGINT_RESTORE = signal.signal (
+# signal.SIGINT,
+# lambda sig, frame : sys.stderr.write ( "Please wait ...\n" )
+# )
+
+ try:
+ self._err_queue.put_nowait ( ( -1, None ) )
+ self.depresolver.unblock_channels()
+ while hasattr ( self, '_workers' ) and self._workers is not None:
- if do_close:
- for w in self._workers: w.enabled = False
- else:
- for w in self._workers: w.stop_when_empty()
- while True in ( w.active() for w in self._workers ):
- self._pkg_queue.put ( None )
+ if True in ( w.active() for w in self._workers ):
+ self._pkg_queue.put_nowait ( None )
+ else:
+ del self._workers
+ self._workers = None
+ finally:
+# signal.signal ( signal.SIGINT, SIGINT_RESTORE )
+ raise
- del self._workers
- self._workers = None
+ finally:
+ self._runlock.release()
- self._runlock.release()
- if self.NUMTHREADS > 0: return start
+ return start
# --- end of _waitfor_workers (...) ---
def _join_workers ( self ):
@@ -221,7 +279,8 @@ class OverlayCreator ( object ):
"""
w = OverlayWorker (
self._pkg_queue, self.depresolver, self.logger, self._pkg_done,
- use_threads=use_threads
+ use_threads=use_threads,
+ err_queue=self._err_queue
)
if start_now: w.start()
return w
diff --git a/roverlay/overlay/package.py b/roverlay/overlay/package.py
index e77d4a2..77fcbee 100644
--- a/roverlay/overlay/package.py
+++ b/roverlay/overlay/package.py
@@ -69,64 +69,60 @@ class PackageDir ( object ):
raise Exception ( "cannot write - no directory assigned!" )
self._lock.acquire()
- self._regen_metadata()
+ try:
+ self._regen_metadata()
- # mkdir not required here, overlay.Category does this
+ # mkdir not required here, overlay.Category does this
- # write ebuilds
- for ver, p_info in self._packages.items():
- fh = None
- try:
- efile = self._get_ebuild_filepath ( ver )
+ # write ebuilds
+ for ver, p_info in self._packages.items():
+ fh = None
+ try:
+ efile = self._get_ebuild_filepath ( ver )
- ebuild = p_info ['ebuild']
+ ebuild = p_info ['ebuild']
- fh = open ( efile, 'w' )
- if isinstance ( ebuild, str ):
- if default_header is not None:
- fh.write ( default_header )
- fh.write ( '\n\n' )
- fh.write ( ebuild )
+ fh = open ( efile, 'w' )
+ fh.write ( default_header )
+ fh.write ( '\n\n' )
+ fh.write ( str ( ebuild ) )
fh.write ( '\n' )
- else:
- ebuild.write (
- fh,
- header=default_header, header_is_fallback=True
- )
- if fh: fh.close()
- # adjust owner/perm? TODO
- # chmod 0644 or 0444
- # chown 250.250
+ if fh: fh.close()
- # this marks the package as 'written to fs'
- p_info.set_writeable()
- p_info ['ebuild_file'] = efile
- p_info.set_readonly()
+ # adjust owner/perm? TODO
+ # chmod 0644 or 0444
+ # chown 250.250
- self.logger.info ( "Wrote ebuild %s." % efile )
- except IOError as e:
- if fh: fh.close()
- self.logger.error ( "Couldn't write ebuild %s." % efile )
- self.logger.exception ( e )
+ # this marks the package as 'written to fs'
+ p_info.set_writeable()
+ p_info ['ebuild_file'] = efile
+ p_info.set_readonly()
- # write metadata
- fh = None
- try:
- mfile = self._get_metadata_filepath()
+ self.logger.info ( "Wrote ebuild %s." % efile )
+ except IOError as e:
+ if fh: fh.close()
+ self.logger.error ( "Couldn't write ebuild %s." % efile )
+ self.logger.exception ( e )
- fh = open ( mfile, 'w' )
- self._metadata.write ( fh )
- if fh: fh.close()
+ # write metadata
+ fh = None
+ try:
+ mfile = self._get_metadata_filepath()
- except IOError as e:
- if fh: fh.close()
- self.logger.error ( "Failed to write metadata at %s." % mfile )
- self.logger.exception ( e )
+ fh = open ( mfile, 'w' )
+ self._metadata.write ( fh )
+ if fh: fh.close()
- self.generate_manifest()
+ except IOError as e:
+ if fh: fh.close()
+ self.logger.error ( "Failed to write metadata at %s." % mfile )
+ self.logger.exception ( e )
- self._lock.release()
+ self.generate_manifest()
+
+ finally:
+ self._lock.release()
# --- end of write (...) ---
def show ( self, stream=sys.stderr, default_header=None ):
@@ -141,35 +137,31 @@ class PackageDir ( object ):
* IOError
"""
self._lock.acquire()
- self._regen_metadata()
+ try:
+ self._regen_metadata()
- for ver, p_info in self._packages.items():
- efile = self._get_ebuild_filepath ( ver )
- ebuild = p_info ['ebuild']
+ for ver, p_info in self._packages.items():
+ efile = self._get_ebuild_filepath ( ver )
+ ebuild = p_info ['ebuild']
- stream.write ( "[BEGIN ebuild %s]\n" % efile )
- if isinstance ( ebuild, str ):
- if default_header is not None:
- stream.write ( default_header )
- stream.write ( '\n\n' )
- stream.write ( ebuild )
+ stream.write ( "[BEGIN ebuild %s]\n" % efile )
+
+ stream.write ( default_header )
+ stream.write ( '\n\n' )
+ stream.write ( str ( ebuild ) )
stream.write ( '\n' )
- else:
- ebuild.write (
- stream,
- header=default_header, header_is_fallback=True
- )
- stream.write ( "[END ebuild %s]\n" % efile )
- mfile = self._get_metadata_filepath()
+ stream.write ( "[END ebuild %s]\n" % efile )
- stream.write ( "[BEGIN %s]\n" % mfile )
- self._metadata.write ( stream )
- stream.write ( "[END %s]\n" % mfile )
+ mfile = self._get_metadata_filepath()
+ stream.write ( "[BEGIN %s]\n" % mfile )
+ self._metadata.write ( stream )
+ stream.write ( "[END %s]\n" % mfile )
- self._lock.release()
+ finally:
+ self._lock.release()
# --- end of show (...) ---
def _latest_package ( self, pkg_filter=None, use_lock=False ):
@@ -186,15 +178,16 @@ class PackageDir ( object ):
retpkg = None
if use_lock: self._lock.acquire()
- for p in self._packages.values():
- if pkg_filter is None or pkg_filter ( p ):
- newver = p ['version']
- if first or newver > retver:
- retver = newver
- retpkg = p
- first = False
-
- if use_lock: self._lock.release()
+ try:
+ for p in self._packages.values():
+ if pkg_filter is None or pkg_filter ( p ):
+ newver = p ['version']
+ if first or newver > retver:
+ retver = newver
+ retpkg = p
+ first = False
+ finally:
+ if use_lock: self._lock.release()
return retpkg
# --- end of _latest_package (...) ---
@@ -219,7 +212,7 @@ class PackageDir ( object ):
self.name, shortver
)
if SUPPRESS_EXCEPTIONS:
- self.logger.warning ( msg )
+ self.logger.info ( msg )
else:
raise Exception ( msg )
@@ -265,18 +258,20 @@ class PackageDir ( object ):
return
self._lock.acquire()
+ try:
- if self._metadata is None or not use_old_metadata:
- del self._metadata
- self._metadata = MetadataJob ( self.logger )
+ if self._metadata is None or not use_old_metadata:
+ del self._metadata
+ self._metadata = MetadataJob ( self.logger )
- if use_all_packages:
- for p_info in self._packages:
- self._metadata.update ( p_info )
- else:
- self._metadata.update ( self._latest_package() )
+ if use_all_packages:
+ for p_info in self._packages:
+ self._metadata.update ( p_info )
+ else:
+ self._metadata.update ( self._latest_package() )
- self._lock.release()
+ finally:
+ self._lock.release()
# --- end of generate_metadata (...) ---
def generate_manifest ( self ):
diff --git a/roverlay/overlay/worker.py b/roverlay/overlay/worker.py
index 471cb94..541de7f 100644
--- a/roverlay/overlay/worker.py
+++ b/roverlay/overlay/worker.py
@@ -3,16 +3,21 @@
# Distributed under the terms of the GNU General Public License v2
#import time
+import sys
import threading
from roverlay.depres.channels import EbuildJobChannel
from roverlay.ebuild.creation import EbuildCreation
+# this controls whether debug message from OverlayWorker.run() are printed
+# to stderr or suppressed
+DEBUG = True
+
class OverlayWorker ( object ):
"""Overlay package queue worker."""
def __init__ ( self,
- pkg_queue, depresolver, logger, pkg_done, use_threads
+ pkg_queue, depresolver, logger, pkg_done, use_threads, err_queue
):
"""Initializes a worker.
@@ -29,11 +34,12 @@ class OverlayWorker ( object ):
self.pkg_done = pkg_done
self.depresolver = depresolver
+ self.err_queue = err_queue
+
self._use_thread = use_threads
self._thread = None
-
- self.enabled = True
self.running = False
+ self.enabled = True
self.halting = False
# --- end of __init__ (...) ---
@@ -81,7 +87,8 @@ class OverlayWorker ( object ):
"""
job = EbuildCreation (
package_info,
- depres_channel_spawner=self._get_resolver_channel
+ depres_channel_spawner=self._get_resolver_channel,
+ err_queue=self.err_queue
)
job.run()
self.pkg_done ( package_info )
@@ -89,42 +96,66 @@ class OverlayWorker ( object ):
def _run ( self ):
"""Runs the worker (thread mode)."""
- self.running = True
- self.halting = False
- while self.enabled or (
- self.halting and not self.pkg_queue.empty()
- ):
- if not self.running:
- # exit now
- break
- p = self.pkg_queue.get()
- # drop empty requests that are used to unblock get()
- if p is not None:
- self._process ( p )
- elif self.halting:
- # receiving an empty request while halting means 'stop now',
- self.enabled = False
- self.halting = False
+ def debug ( msg ):
+ if DEBUG:
+ sys.stderr.write (
+ "%i WORKER: %s\n" % ( id ( self ), msg )
+ )
+
+ try:
+ self.running = True
+ self.halting = False
+ while self.enabled or (
+ self.halting and not self.pkg_queue.empty()
+ ):
+ if not self.err_queue.empty():
+ # other workers died (or exit request sent)
+ debug ( "STOPPING #1" )
+ break
+
+ debug ( "WAITING" )
+ p = self.pkg_queue.get()
+ debug ( "RECEIVED A TASK, " + str ( p ) )
+
+ if not self.err_queue.empty():
+ debug ( "STOPPING #2" )
+ break
+
+ # drop empty requests that are used to unblock get()
+ if p is not None:
+ debug ( "ENTER PROC" )
+ if self.err_queue.empty():
+ debug ( "__ empty exception/error queue!" )
+ self._process ( p )
+ elif self.halting:
+ # receiving an empty request while halting means 'stop now',
+ self.enabled = False
+ self.halting = False
+
+ self.pkg_queue.task_done()
+
+ debug ( "STOPPING - DONE" )
+ except ( Exception, KeyboardInterrupt ) as e:
+ self.logger.exception ( e )
+ self.err_queue.put_nowait ( ( id ( self ), e ) )
- self.pkg_queue.task_done()
self.running = False
+
# --- end of run (...) ---
def _run_nothread ( self ):
"""Runs the worker (no-thread mode)."""
self.running = True
while self.enabled and not self.pkg_queue.empty():
- if not self.running:
- # exit now
- break
- p = self.pkg_queue.get()
+ p = self.pkg_queue.get_nowait()
# drop empty requests that are used to unblock get()
if p is not None:
self._process ( p )
self.pkg_queue.task_done()
+
self.running = False
# --- end of _run_nothread (...) ---
^ permalink raw reply related [flat|nested] only message in thread
only message in thread, other threads:[~2012-07-02 16:52 UTC | newest]
Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2012-07-02 16:52 [gentoo-commits] proj/R_overlay:master commit in: roverlay/ebuild/, roverlay/depres/, roverlay/overlay/ André Erdmann
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox