@@ -367,47 +367,68 @@ await runner.task # This is not needed in a notebook environment!
367
367
timer.result()
368
368
```
369
369
370
- ## Executing an asynchronous runner
370
+ ## Custom parallelization using coroutines
371
371
372
- In this example we will show how to send complex tasks to adaptive as coroutines.
373
- We require an asynchronous client to perform the execution of asynchronous tasks.
374
- Here we will use ` dask.distributed ` .
372
+ Adaptive by itself does not implement a way of sharing partial results between function executions.
373
+ Its implementation of parallel computation using executors is minimal by design.
374
+ Instead the appropriate way to implement custom parallelization is by using coroutines (asynchronous functions).
375
+
376
+ We illustrate this approach by using ` dask.distributed ` for parallel computations in part because it supports asynchronous operation out-of-the-box.
377
+ Let us consider the computation below which has a slow but reusable part, and a fast part that cannot be reused.
375
378
376
379
``` {code-cell} ipython3
377
- from dask.distributed import Client
380
+ import time
378
381
379
- client = await Client(asynchronous=True)
380
- ```
381
382
382
- Once we have an asynchronous client, all its instances will be coroutines and they need to be called accordingly.
383
- For example, ` await client.close() ` .
383
+ def g(x):
384
+ """Slow but reusable function"""
385
+ time.sleep(random.randrange(5))
386
+ return x**3
387
+
388
+
389
+ def h(x):
390
+ """Fast function"""
391
+ return x**4
392
+ ```
384
393
385
- In this case, we consider a function ` h ` that has some internal dependency on a function ` g ` .
386
- The function to be learned is ` async_h ` , which submits ` h ` as a coroutine to the client .
394
+ We need to convert ` f ` into a dask graph by using ` dask.delayed ` .
395
+ In this example we will show how to send complex tasks to adaptive as coroutines .
387
396
388
397
``` {code-cell} ipython3
389
- import time
398
+ from dask import delayed
390
399
400
+ # Convert f and g to dask.Delayed objects
401
+ g, h = delayed(g), delayed(h)
391
402
392
- def h(x, offset=offset):
393
- a = 0.01
394
- x = g(x)
395
- return x + a**2 / (a**2 + (x - offset) ** 2)
403
+ @delayed
404
+ def f(x, y):
405
+ return (x + y)**2
406
+ ```
396
407
408
+ Next we define a computation using coroutines such that it reuses previously submitted tasks.
397
409
398
- def g(x):
399
- time.sleep(random.randrange(5))
400
- return x**2
410
+ ``` {code-cell} ipython3
411
+ from collections import defaultdict
412
+ from dask.distributed import Client
413
+
414
+ client = await Client(asynchronous=True)
415
+
416
+ g_futures = {}
417
+
418
+ async def f_parallel(x):
419
+ # Get or sumbit the slow function future
420
+ if (g_future := g_futures.get(int(x))) is None:
421
+ g_futures[int(x)] = g_future = client.compute(g(int(x)))
401
422
423
+ future_f = client.compute(f(g_future, h(x % 1)))
402
424
403
- async def async_h(x):
404
- return await client.submit(h, x)
425
+ return await future_f
405
426
```
406
427
407
- When providing the asynchronous function to the ` learner ` and run it via ` AsyncRunner ` .
428
+ Finally we provide the asynchronous function to the ` learner ` and run it via ` AsyncRunner ` .
408
429
409
430
``` {code-cell} ipython3
410
- learner = adaptive.Learner1D(async_h, bounds=(-1, 1 ))
431
+ learner = adaptive.Learner1D(async_h, bounds=(-3.5, 3.5 ))
411
432
412
433
runner = adaptive.AsyncRunner(learner, goal=lambda l: l.loss() < 0.01, ntasks=20)
413
434
```
0 commit comments