|
1 | | -"""Generic asynchronous performers.""" |
2 | | - |
3 | | -from functools import partial |
4 | | -from itertools import count |
5 | | - |
6 | | -from ._base import perform |
7 | | -from ._intents import FirstError |
8 | | - |
9 | | - |
10 | | -def perform_parallel_async(dispatcher, intent, box): |
11 | | - """ |
12 | | - A performer for :obj:`ParallelEffects` which works if all child Effects are |
13 | | - already asynchronous. Use this for things like Twisted, asyncio, etc. |
14 | | -
|
15 | | - WARNING: If this is used when child Effects have blocking performers, it |
16 | | - will run them in serial, not parallel. |
17 | | - """ |
18 | | - effects = list(intent.effects) |
19 | | - if not effects: |
20 | | - box.succeed([]) |
21 | | - return |
22 | | - num_results = count() |
23 | | - results = [None] * len(effects) |
24 | | - |
25 | | - def succeed(index, result): |
26 | | - results[index] = result |
27 | | - if next(num_results) + 1 == len(effects): |
28 | | - box.succeed(results) |
29 | | - |
30 | | - def fail(index, result): |
31 | | - box.fail((FirstError, |
32 | | - FirstError(exc_info=result, index=index), |
33 | | - result[2])) |
34 | | - |
35 | | - for index, effect in enumerate(effects): |
36 | | - perform( |
37 | | - dispatcher, |
38 | | - effect.on( |
39 | | - success=partial(succeed, index), |
40 | | - error=partial(fail, index))) |
| 1 | +from .parallel_async import perform_parallel_async |
| 2 | +__all__ = ['perform_parallel_async'] |
0 commit comments