public inbox for gentoo-portage-dev@lists.gentoo.org
 help / color / mirror / Atom feed
Search results ordered by [date|relevance]  view[summary|nested|Atom feed]
thread overview below | download: 
* [gentoo-portage-dev] [PATCH 1/2] Add iter_gather function (bug 653946)
  @ 2018-04-24 23:45 99% ` Zac Medico
  0 siblings, 0 replies; 1+ results
From: Zac Medico @ 2018-04-24 23:45 UTC (permalink / raw
  To: gentoo-portage-dev; +Cc: Zac Medico

This is similar to asyncio.gather, but takes an iterator of
futures as input, and includes support for max_jobs and max_load
parameters. For bug 653946, this will be used to asynchronously
gather the results of the portdbapi.async_fetch_map calls that
are required to generate a Manifest, while using the max_jobs
parameter to limit the number of concurrent async_aux_get calls.

Bug: https://bugs.gentoo.org/653946
---
 pym/portage/util/futures/iter_completed.py | 68 ++++++++++++++++++++++++++++++
 1 file changed, 68 insertions(+)

diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py
index 5ad075305..4e52a499f 100644
--- a/pym/portage/util/futures/iter_completed.py
+++ b/pym/portage/util/futures/iter_completed.py
@@ -112,3 +112,71 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
 		# cleanup in case of interruption by SIGINT, etc
 		scheduler.cancel()
 		scheduler.wait()
+
+
+def iter_gather(futures, max_jobs=None, max_load=None, loop=None):
+	"""
+	This is similar to asyncio.gather, but takes an iterator of
+	futures as input, and includes support for max_jobs and max_load
+	parameters.
+
+	@param futures: iterator of asyncio.Future (or compatible)
+	@type futures: iterator
+	@param max_jobs: max number of futures to process concurrently (default
+		is multiprocessing.cpu_count())
+	@type max_jobs: int
+	@param max_load: max load allowed when scheduling a new future,
+		otherwise schedule no more than 1 future at a time (default
+		is multiprocessing.cpu_count())
+	@type max_load: int or float
+	@param loop: event loop
+	@type loop: EventLoop
+	@return: a Future resulting in a list of done input futures, in the
+		same order that they were yielded from the input iterator
+	@rtype: asyncio.Future (or compatible)
+	"""
+	loop = loop or global_event_loop()
+	loop = getattr(loop, '_asyncio_wrapper', loop)
+	result = loop.create_future()
+	futures_list = []
+
+	def future_generator():
+		for future in futures:
+			futures_list.append(future)
+			yield future
+
+	completed_iter = async_iter_completed(
+		future_generator(),
+		max_jobs=max_jobs,
+		max_load=max_load,
+		loop=loop,
+	)
+
+	def handle_result(future_done_set):
+		if result.cancelled():
+			return
+
+		try:
+			handle_result.current_task = next(completed_iter)
+		except StopIteration:
+			result.set_result(futures_list)
+		else:
+			handle_result.current_task.add_done_callback(handle_result)
+
+	try:
+		handle_result.current_task = next(completed_iter)
+	except StopIteration:
+		handle_result.current_task = None
+		result.set_result(futures_list)
+	else:
+		handle_result.current_task.add_done_callback(handle_result)
+
+	def cancel_callback(result):
+		if (result.cancelled() and
+			handle_result.current_task is not None and
+			not handle_result.current_task.done()):
+			handle_result.current_task.cancel()
+
+	result.add_done_callback(cancel_callback)
+
+	return result
-- 
2.13.6



^ permalink raw reply related	[relevance 99%]

Results 1-1 of 1 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2018-04-24 23:45     [gentoo-portage-dev] [PATCH 0/2] ManifestScheduler: async fetchlist_dict (bug 653946) Zac Medico
2018-04-24 23:45 99% ` [gentoo-portage-dev] [PATCH 1/2] Add iter_gather function " Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox