Skip to content

Commit 0472d3c

Browse files
authored
Add more about retries in futures notebook (#258)
1 parent 44cf331 commit 0472d3c

File tree

1 file changed

+156
-8
lines changed

1 file changed

+156
-8
lines changed

05_futures.ipynb

Lines changed: 156 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111
"\n",
1212
"# Futures - non-blocking distributed calculations\n",
1313
"\n",
14-
"In the previous chapter, we showed that executing a calculation (created using delayed) with the distributed executor is identical to any other executor. However, we now have access to additional functionality, and control over what data is held in memory.\n",
14+
"Submit arbitrary functions for computation in a parallelized, eager, and non-blocking way. \n",
1515
"\n",
1616
"The `futures` interface (derived from the built-in `concurrent.futures`) provide fine-grained real-time execution for custom situations. We can submit individual functions for evaluation with one set of inputs, or evaluated over a sequence of inputs with `submit()` and `map()`. The call returns immediately, giving one or more *futures*, whose status begins as \"pending\" and later becomes \"finished\". There is no blocking of the local Python session.\n",
1717
"\n",
18+
"This is the important difference between futures and delayed. Both can be used to support arbitrary task scheduling, but delayed is lazy (it just constructs a graph) whereas futures are eager. With futures, as soon as the inputs are available and there is compute available, the computation starts. \n",
19+
"\n",
1820
"**Related Documentation**\n",
1921
"\n",
2022
"* [Futures documentation](https://docs.dask.org/en/latest/futures.html)\n",
@@ -183,9 +185,9 @@
183185
"cell_type": "markdown",
184186
"metadata": {},
185187
"source": [
186-
"### `client.compute`\n",
188+
"## `client.compute`\n",
187189
"\n",
188-
"Generally, any Dask operation that is executed using `.compute()` (or `dask.compute()` can be submitted for asynchronous execution using `client.compute()` instead, and this applies to all collections.\n",
190+
"Generally, any Dask operation that is executed using `.compute()` or `dask.compute()` can be submitted for asynchronous execution using `client.compute()` instead.\n",
189191
"\n",
190192
"Here is an example from the delayed notebook:"
191193
]
@@ -213,6 +215,13 @@
213215
"z = add(x, y)"
214216
]
215217
},
218+
{
219+
"cell_type": "markdown",
220+
"metadata": {},
221+
"source": [
222+
"So far we have a regular `dask.delayed` output. When we pass `z` to `client.compute` we get a future back and Dask starts evaluating the task graph. "
223+
]
224+
},
216225
{
217226
"cell_type": "code",
218227
"execution_count": null,
@@ -245,9 +254,9 @@
245254
"cell_type": "markdown",
246255
"metadata": {},
247256
"source": [
248-
"### `client.submit`\n",
257+
"## `client.submit`\n",
249258
"\n",
250-
"`client.submit` takes a function and arguments, pushes these to the cluster, returning a *Future* representing the result to be computed. The function is passed to a worker process for evaluation. This looks a lot like doing `client.compute()`, above, except now we are passing the function and arguments directly to the cluster."
259+
"`client.submit` takes a function and arguments, pushes these to the cluster, returning a `Future` representing the result to be computed. The function is passed to a worker process for evaluation. This looks a lot like doing `client.compute()`, above, except now we are passing the function and arguments directly to the cluster."
251260
]
252261
},
253262
{
@@ -290,11 +299,150 @@
290299
"\n",
291300
"Each future represents a result held, or being evaluated by the cluster. Thus we can control caching of intermediate values - when a future is no longer referenced, its value is forgotten. In the solution, above, futures are held for each of the function calls. These results would not need to be re-evaluated if we chose to submit more work that needed them.\n",
292301
"\n",
293-
"We can explicitly pass data from our local session into the cluster using `client.scatter()`, but usually it is better to construct functions that do the loading of data within the workers themselves, so that there is no need to serialize and communicate the data. Most of the loading functions within Dask, such as `dd.read_csv`, work this way. Similarly, we normally don't want to `gather()` results that are too big in memory.\n",
302+
"We can explicitly pass data from our local session into the cluster using `client.scatter()`, but usually it is better to construct functions that do the loading of data within the workers themselves, so that there is no need to serialize and communicate the data. Most of the loading functions within Dask, such as `dd.read_csv`, work this way. Similarly, we normally don't want to `gather()` results that are too big in memory."
303+
]
304+
},
305+
{
306+
"cell_type": "markdown",
307+
"metadata": {},
308+
"source": [
309+
"## Example: Sporadically failing task\n",
310+
"\n",
311+
"Let's imagine a task that sometimes fails. You might encounter this when dealing with input data where sometimes a file is malformed, or maybe a request times out."
312+
]
313+
},
314+
{
315+
"cell_type": "code",
316+
"execution_count": null,
317+
"metadata": {},
318+
"outputs": [],
319+
"source": [
320+
"from random import random\n",
321+
" \n",
322+
"def flaky_inc(i):\n",
323+
" if random() < 0.2:\n",
324+
" raise ValueError(\"You hit the error!\")\n",
325+
" return i + 1"
326+
]
327+
},
328+
{
329+
"cell_type": "markdown",
330+
"metadata": {},
331+
"source": [
332+
"If you run this function over and over again, it will sometimes fail. \n",
333+
"\n",
334+
"```python\n",
335+
">>> flaky_inc(2)\n",
336+
"---------------------------------------------------------------------------\n",
337+
"ValueError Traceback (most recent call last)\n",
338+
"Input In [65], in <cell line: 1>()\n",
339+
"----> 1 flaky_inc(2)\n",
340+
"\n",
341+
"Input In [61], in flaky_inc(i)\n",
342+
" 3 def flaky_inc(i):\n",
343+
" 4 if random() < 0.5:\n",
344+
"----> 5 raise ValueError(\"You hit the error!\")\n",
345+
" 6 return i + 1\n",
346+
"\n",
347+
"ValueError: You hit the error!\n",
348+
"```"
349+
]
350+
},
351+
{
352+
"cell_type": "markdown",
353+
"metadata": {},
354+
"source": [
355+
"We can run this function on a range of inputs using `client.map`."
356+
]
357+
},
358+
{
359+
"cell_type": "code",
360+
"execution_count": null,
361+
"metadata": {},
362+
"outputs": [],
363+
"source": [
364+
"futures = client.map(flaky_inc, range(10))"
365+
]
366+
},
367+
{
368+
"cell_type": "markdown",
369+
"metadata": {},
370+
"source": [
371+
"Notice how the cell returned even though some of the computations failed. We can inspect these futures one by one and find the ones that failed:"
372+
]
373+
},
374+
{
375+
"cell_type": "code",
376+
"execution_count": null,
377+
"metadata": {},
378+
"outputs": [],
379+
"source": [
380+
"for i, future in enumerate(futures):\n",
381+
" print(i, future.status)"
382+
]
383+
},
384+
{
385+
"cell_type": "markdown",
386+
"metadata": {},
387+
"source": [
388+
"You can rerun those specific futures to try to get the task to successfully complete:"
389+
]
390+
},
391+
{
392+
"cell_type": "code",
393+
"execution_count": null,
394+
"metadata": {},
395+
"outputs": [],
396+
"source": [
397+
"futures[5].retry()"
398+
]
399+
},
400+
{
401+
"cell_type": "code",
402+
"execution_count": null,
403+
"metadata": {},
404+
"outputs": [],
405+
"source": [
406+
"for i, future in enumerate(futures):\n",
407+
" print(i, future.status)"
408+
]
409+
},
410+
{
411+
"cell_type": "markdown",
412+
"metadata": {},
413+
"source": [
414+
"A more concise way of retrying in the case of sporadic failures is by setting the number of retries in the `client.compute`, `client.submit` or `client.map` method.\n",
415+
"\n",
416+
"**Note**: In this example we also need to set `pure=False` to let Dask know that the arguments to the function do not totally determine the output."
417+
]
418+
},
419+
{
420+
"cell_type": "code",
421+
"execution_count": null,
422+
"metadata": {},
423+
"outputs": [],
424+
"source": [
425+
"futures = client.map(flaky_inc, range(10), retries=5, pure=False)\n",
426+
"future_z = client.submit(sum, futures)\n",
427+
"future_z.result()"
428+
]
429+
},
430+
{
431+
"cell_type": "markdown",
432+
"metadata": {},
433+
"source": [
434+
"You will see a lot of warnings, but the computation should eventually succeed."
435+
]
436+
},
437+
{
438+
"cell_type": "markdown",
439+
"metadata": {},
440+
"source": [
441+
"## Why use Futures?\n",
294442
"\n",
295-
"The [full API](http://distributed.readthedocs.io/en/latest/api.html) of the distributed scheduler gives details of interacting with the cluster, which remember, can be on your local machine or possibly on a massive computational resource. \n",
443+
"The futures API offers a work submission style that can easily emulate the map/reduce paradigm. If that is familiar to you then futures might be the simplest entrypoint into Dask. \n",
296444
"\n",
297-
"The futures API offers a work submission style that can easily emulate the map/reduce paradigm (see `client.map()`) that may be familiar to many people. The intermediate results, represented by futures, can be passed to new tasks without having to bring the pull locally from the cluster, and new work can be assigned to work on the output of previous jobs that haven't even begun yet."
445+
"The other big benefit of futures is that the intermediate results, represented by futures, can be passed to new tasks without having to pull data locally from the cluster. New operations can be setup to work on the output of previous jobs that haven't even begun yet."
298446
]
299447
}
300448
],

0 commit comments

Comments
 (0)