public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
From: "Zac Medico" <zmedico@gentoo.org>
To: gentoo-commits@lists.gentoo.org
Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/
Date: Fri, 24 Mar 2017 20:33:40 +0000 (UTC)	[thread overview]
Message-ID: <1490387545.86400e9f864e86f8f677ccda9ce4103d6d02ef87.zmedico@gentoo> (raw)

commit:     86400e9f864e86f8f677ccda9ce4103d6d02ef87
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Mar 21 06:56:55 2017 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Mar 24 20:32:25 2017 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=86400e9f

PollScheduler: terminate via call_soon for asyncio compat

Use call_soon to schedule the _termination_check callback when needed.
The previous idle_add usage was relatively inefficient, because it
scheduled the _termination_check callback to be called in every
iteration of the event loop.

Add a _cleanup method to handle cleanup of callbacks registered with
the global event loop. Since the terminate method is thread safe and it
interacts with self._term_callback_handle, use this variable only while
holding a lock.

 pym/_emerge/PollScheduler.py              | 57 +++++++++++++++++++++++--------
 pym/_emerge/Scheduler.py                  |  7 ++--
 pym/portage/util/_async/AsyncScheduler.py | 16 ++++-----
 3 files changed, 54 insertions(+), 26 deletions(-)

diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
index b118ac157..569879b36 100644
--- a/pym/_emerge/PollScheduler.py
+++ b/pym/_emerge/PollScheduler.py
@@ -25,8 +25,10 @@ class PollScheduler(object):
 			a non-main thread)
 		@type main: bool
 		"""
+		self._term_rlock = threading.RLock()
 		self._terminated = threading.Event()
 		self._terminated_tasks = False
+		self._term_check_handle = None
 		self._max_jobs = 1
 		self._max_load = None
 		self._scheduling = False
@@ -44,6 +46,21 @@ class PollScheduler(object):
 	def _is_background(self):
 		return self._background
 
+	def _cleanup(self):
+		"""
+		Cleanup any callbacks that have been registered with the global
+		event loop.
+		"""
+		# The self._term_check_handle attribute requires locking
+		# since it's modified by the thread safe terminate method.
+		with self._term_rlock:
+			if self._term_check_handle not in (None, False):
+				self._term_check_handle.cancel()
+			# This prevents the terminate method from scheduling
+			# any more callbacks (since _cleanup must eliminate all
+			# callbacks in order to ensure complete cleanup).
+			self._term_check_handle = False
+
 	def terminate(self):
 		"""
 		Schedules asynchronous, graceful termination of the scheduler
@@ -51,26 +68,36 @@ class PollScheduler(object):
 
 		This method is thread-safe (and safe for signal handlers).
 		"""
-		self._terminated.set()
+		with self._term_rlock:
+			if self._term_check_handle is None:
+				self._terminated.set()
+				self._term_check_handle = self._event_loop.call_soon_threadsafe(
+					self._termination_check, True)
 
-	def _termination_check(self):
+	def _termination_check(self, retry=False):
 		"""
 		Calls _terminate_tasks() if appropriate. It's guaranteed not to
-		call it while _schedule_tasks() is being called. The check should
-		be executed for each iteration of the event loop, for response to
-		termination signals at the earliest opportunity. It always returns
-		True, for continuous scheduling via idle_add.
+		call it while _schedule_tasks() is being called. This method must
+		only be called via the event loop thread.
+
+		@param retry: If True then reschedule if scheduling state prevents
+			immediate termination.
+		@type retry: bool
 		"""
-		if not self._scheduling and \
-			self._terminated.is_set() and \
+		if self._terminated.is_set() and \
 			not self._terminated_tasks:
-			self._scheduling = True
-			try:
-				self._terminated_tasks = True
-				self._terminate_tasks()
-			finally:
-				self._scheduling = False
-		return True
+			if not self._scheduling:
+				self._scheduling = True
+				try:
+					self._terminated_tasks = True
+					self._terminate_tasks()
+				finally:
+					self._scheduling = False
+
+			elif retry:
+				with self._term_rlock:
+					self._term_check_handle = self._event_loop.call_soon(
+						self._termination_check, True)
 
 	def _terminate_tasks(self):
 		"""

diff --git a/pym/_emerge/Scheduler.py b/pym/_emerge/Scheduler.py
index 71fe75f62..58ff97139 100644
--- a/pym/_emerge/Scheduler.py
+++ b/pym/_emerge/Scheduler.py
@@ -1055,6 +1055,7 @@ class Scheduler(PollScheduler):
 				else:
 					signal.signal(signal.SIGCONT, signal.SIG_DFL)
 
+			self._termination_check()
 			if received_signal:
 				sys.exit(received_signal[0])
 
@@ -1091,6 +1092,10 @@ class Scheduler(PollScheduler):
 				if isinstance(x, Package) and x.operation == "merge"])
 			self._status_display.maxval = self._pkg_count.maxval
 
+		# Cleanup any callbacks that have been registered with the global
+		# event loop by calls to the terminate method.
+		self._cleanup()
+
 		self._logger.log(" *** Finished. Cleaning up...")
 
 		if failed_pkgs:
@@ -1393,7 +1398,6 @@ class Scheduler(PollScheduler):
 		blocker_db.discardBlocker(pkg)
 
 	def _main_loop(self):
-		term_check_id = self._event_loop.idle_add(self._termination_check)
 		loadavg_check_id = None
 		if self._max_load is not None and \
 			self._loadavg_latency is not None and \
@@ -1420,7 +1424,6 @@ class Scheduler(PollScheduler):
 			while self._is_work_scheduled():
 				self._event_loop.iteration()
 		finally:
-			self._event_loop.source_remove(term_check_id)
 			if loadavg_check_id is not None:
 				self._event_loop.source_remove(loadavg_check_id)
 

diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py
index 9b96c6f36..3deb6cb04 100644
--- a/pym/portage/util/_async/AsyncScheduler.py
+++ b/pym/portage/util/_async/AsyncScheduler.py
@@ -18,7 +18,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
 		self._error_count = 0
 		self._running_tasks = set()
 		self._remaining_tasks = True
-		self._term_check_id = None
 		self._loadavg_check_id = None
 
 	def _poll(self):
@@ -65,7 +64,6 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
 		self._schedule()
 
 	def _start(self):
-		self._term_check_id = self._event_loop.idle_add(self._termination_check)
 		if self._max_load is not None and \
 			self._loadavg_latency is not None and \
 			(self._max_jobs is True or self._max_jobs > 1):
@@ -75,6 +73,12 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
 				self._loadavg_latency, self._schedule)
 		self._schedule()
 
+	def _cleanup(self):
+		super(AsyncScheduler, self)._cleanup()
+		if self._loadavg_check_id is not None:
+			self._event_loop.source_remove(self._loadavg_check_id)
+			self._loadavg_check_id = None
+
 	def _wait(self):
 		# Loop while there are jobs to be scheduled.
 		while self._keep_scheduling():
@@ -86,13 +90,7 @@ class AsyncScheduler(AsynchronousTask, PollScheduler):
 		while self._is_work_scheduled():
 			self._event_loop.iteration()
 
-		if self._term_check_id is not None:
-			self._event_loop.source_remove(self._term_check_id)
-			self._term_check_id = None
-
-		if self._loadavg_check_id is not None:
-			self._event_loop.source_remove(self._loadavg_check_id)
-			self._loadavg_check_id = None
+		self._cleanup()
 
 		if self._error_count > 0:
 			self.returncode = 1


             reply	other threads:[~2017-03-24 20:33 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-03-24 20:33 Zac Medico [this message]
  -- strict thread matches above, loose matches on Subject: below --
2013-01-06 11:16 [gentoo-commits] proj/portage:master commit in: pym/portage/util/_async/, pym/_emerge/ Zac Medico
2012-10-19  1:23 Zac Medico
2012-10-19  1:15 Zac Medico
2012-10-16 19:28 Zac Medico
2012-10-08 20:44 Zac Medico
2012-10-07 18:51 Zac Medico
2012-10-07 18:22 Zac Medico
2012-10-07 18:17 Zac Medico

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=1490387545.86400e9f864e86f8f677ccda9ce4103d6d02ef87.zmedico@gentoo \
    --to=zmedico@gentoo.org \
    --cc=gentoo-commits@lists.gentoo.org \
    --cc=gentoo-dev@lists.gentoo.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox