Skip to content

Commit f2f4c26

Browse files
sispmartindurant
andauthored
Limit coroutines using a pool instead of chunks (#1544)
--------- Co-authored-by: Martin Durant <[email protected]>
1 parent 29c06ea commit f2f4c26

File tree

1 file changed

+29
-14
lines changed

1 file changed

+29
-14
lines changed

fsspec/asyn.py

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -239,20 +239,35 @@ async def _run_coros_in_chunks(
239239
batch_size = len(coros)
240240

241241
assert batch_size > 0
242-
results = []
243-
for start in range(0, len(coros), batch_size):
244-
chunk = [
245-
asyncio.Task(asyncio.wait_for(c, timeout=timeout))
246-
for c in coros[start : start + batch_size]
247-
]
248-
if callback is not DEFAULT_CALLBACK:
249-
[
250-
t.add_done_callback(lambda *_, **__: callback.relative_update(1))
251-
for t in chunk
252-
]
253-
results.extend(
254-
await asyncio.gather(*chunk, return_exceptions=return_exceptions),
255-
)
242+
243+
async def _run_coro(coro, i):
244+
try:
245+
return await asyncio.wait_for(coro, timeout=timeout), i
246+
except Exception as e:
247+
if not return_exceptions:
248+
raise
249+
return e, i
250+
finally:
251+
callback.relative_update(1)
252+
253+
i = 0
254+
n = len(coros)
255+
results = [None] * n
256+
pending = set()
257+
258+
while pending or i < n:
259+
while len(pending) < batch_size and i < n:
260+
pending.add(asyncio.ensure_future(_run_coro(coros[i], i)))
261+
i += 1
262+
263+
if not pending:
264+
break
265+
266+
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
267+
while done:
268+
result, k = await done.pop()
269+
results[k] = result
270+
256271
return results
257272

258273

0 commit comments

Comments
 (0)