* [gentoo-portage-dev] [PATCH 1/2] Add iter_gather function (bug 653946)
@ 2018-04-24 23:45 99% ` Zac Medico
0 siblings, 0 replies; 1+ results
From: Zac Medico @ 2018-04-24 23:45 UTC (permalink / raw
To: gentoo-portage-dev; +Cc: Zac Medico
This is similar to asyncio.gather, but takes an iterator of
futures as input, and includes support for max_jobs and max_load
parameters. For bug 653946, this will be used to asynchronously
gather the results of the portdbapi.async_fetch_map calls that
are required to generate a Manifest, while using the max_jobs
parameter to limit the number of concurrent async_aux_get calls.
Bug: https://bugs.gentoo.org/653946
---
pym/portage/util/futures/iter_completed.py | 68 ++++++++++++++++++++++++++++++
1 file changed, 68 insertions(+)
diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py
index 5ad075305..4e52a499f 100644
--- a/pym/portage/util/futures/iter_completed.py
+++ b/pym/portage/util/futures/iter_completed.py
@@ -112,3 +112,71 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
# cleanup in case of interruption by SIGINT, etc
scheduler.cancel()
scheduler.wait()
+
+
+def iter_gather(futures, max_jobs=None, max_load=None, loop=None):
+ """
+ This is similar to asyncio.gather, but takes an iterator of
+ futures as input, and includes support for max_jobs and max_load
+ parameters.
+
+ @param futures: iterator of asyncio.Future (or compatible)
+ @type futures: iterator
+ @param max_jobs: max number of futures to process concurrently (default
+ is multiprocessing.cpu_count())
+ @type max_jobs: int
+ @param max_load: max load allowed when scheduling a new future,
+ otherwise schedule no more than 1 future at a time (default
+ is multiprocessing.cpu_count())
+ @type max_load: int or float
+ @param loop: event loop
+ @type loop: EventLoop
+ @return: a Future resulting in a list of done input futures, in the
+ same order that they were yielded from the input iterator
+ @rtype: asyncio.Future (or compatible)
+ """
+ loop = loop or global_event_loop()
+ loop = getattr(loop, '_asyncio_wrapper', loop)
+ result = loop.create_future()
+ futures_list = []
+
+ def future_generator():
+ for future in futures:
+ futures_list.append(future)
+ yield future
+
+ completed_iter = async_iter_completed(
+ future_generator(),
+ max_jobs=max_jobs,
+ max_load=max_load,
+ loop=loop,
+ )
+
+ def handle_result(future_done_set):
+ if result.cancelled():
+ return
+
+ try:
+ handle_result.current_task = next(completed_iter)
+ except StopIteration:
+ result.set_result(futures_list)
+ else:
+ handle_result.current_task.add_done_callback(handle_result)
+
+ try:
+ handle_result.current_task = next(completed_iter)
+ except StopIteration:
+ handle_result.current_task = None
+ result.set_result(futures_list)
+ else:
+ handle_result.current_task.add_done_callback(handle_result)
+
+ def cancel_callback(result):
+ if (result.cancelled() and
+ handle_result.current_task is not None and
+ not handle_result.current_task.done()):
+ handle_result.current_task.cancel()
+
+ result.add_done_callback(cancel_callback)
+
+ return result
--
2.13.6
^ permalink raw reply related [relevance 99%]
Results 1-1 of 1 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2018-04-24 23:45 [gentoo-portage-dev] [PATCH 0/2] ManifestScheduler: async fetchlist_dict (bug 653946) Zac Medico
2018-04-24 23:45 99% ` [gentoo-portage-dev] [PATCH 1/2] Add iter_gather function " Zac Medico
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox