Skip to content

Commit 4690e84

Browse files
Merge pull request #141 from TomAugspurger/use-distributed
Use distributed scheduler
2 parents 0005cf3 + af36d42 commit 4690e84

File tree

13 files changed

+162
-61
lines changed

13 files changed

+162
-61
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ data/weather-big
1111
data/myfile.hdf5
1212
data/flightjson
1313
data/nycflights
14+
data/myfile.zarr
1415
profile.html
1516
log
1617
.idea/

01_dask.delayed.ipynb

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,24 @@
1616
"This is a simple way to use `dask` to parallelize existing codebases or build [complex systems](http://matthewrocklin.com/blog/work/2018/02/09/credit-models-with-dask). This will also help us to develop an understanding for later sections."
1717
]
1818
},
19+
{
20+
"cell_type": "markdown",
21+
"metadata": {},
22+
"source": [
23+
"As well see in the [distributed scheduler notebook](05_distributed.ipynb), Dask has several ways of executing code in parallel. We'll use the distributed scheduler by creating a `dask.distributed.Client`. For now, this will provide us with some nice diagnostics. We'll talk about schedulers in depth later."
24+
]
25+
},
26+
{
27+
"cell_type": "code",
28+
"execution_count": null,
29+
"metadata": {},
30+
"outputs": [],
31+
"source": [
32+
"from dask.distributed import Client\n",
33+
"\n",
34+
"client = Client()"
35+
]
36+
},
1937
{
2038
"cell_type": "markdown",
2139
"metadata": {},
@@ -117,7 +135,7 @@
117135
"outputs": [],
118136
"source": [
119137
"%%time\n",
120-
"# This actually runs our computation using a local thread pool\n",
138+
"# This actually runs our computation using a local process pool\n",
121139
"\n",
122140
"z.compute()"
123141
]
@@ -625,6 +643,24 @@
625643
"- Experiment with delaying the call to `sum`. What does the graph look like if `sum` is delayed? What does the graph look like if it isn't?\n",
626644
"- Can you think of any reason why you'd want to do the reduction one way over the other?"
627645
]
646+
},
647+
{
648+
"cell_type": "markdown",
649+
"metadata": {},
650+
"source": [
651+
"# Close the Client\n",
652+
"\n",
653+
"Before moving on to the next exercise, make sure to close your client or stop this kernel."
654+
]
655+
},
656+
{
657+
"cell_type": "code",
658+
"execution_count": null,
659+
"metadata": {},
660+
"outputs": [],
661+
"source": [
662+
"client.close()"
663+
]
628664
}
629665
],
630666
"metadata": {
@@ -643,9 +679,9 @@
643679
"name": "python",
644680
"nbconvert_exporter": "python",
645681
"pygments_lexer": "ipython3",
646-
"version": "3.6.7"
682+
"version": "3.7.3"
647683
}
648684
},
649685
"nbformat": 4,
650-
"nbformat_minor": 2
686+
"nbformat_minor": 4
651687
}

02_bag.ipynb

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,24 @@
3535
"* [Bag API](http://dask.pydata.org/en/latest/bag-api.html)"
3636
]
3737
},
38+
{
39+
"cell_type": "markdown",
40+
"metadata": {},
41+
"source": [
42+
"Again, we'll use the distributed scheduler. Schedulers will be explained in depth [later](05_distributed.ipynb)."
43+
]
44+
},
45+
{
46+
"cell_type": "code",
47+
"execution_count": null,
48+
"metadata": {},
49+
"outputs": [],
50+
"source": [
51+
"from dask.distributed import Client\n",
52+
"\n",
53+
"client = Client()"
54+
]
55+
},
3856
{
3957
"cell_type": "markdown",
4058
"metadata": {},
@@ -422,9 +440,7 @@
422440
{
423441
"cell_type": "code",
424442
"execution_count": null,
425-
"metadata": {
426-
"scrolled": true
427-
},
443+
"metadata": {},
428444
"outputs": [],
429445
"source": [
430446
"%%time\n",
@@ -552,9 +568,7 @@
552568
{
553569
"cell_type": "code",
554570
"execution_count": null,
555-
"metadata": {
556-
"scrolled": true
557-
},
571+
"metadata": {},
558572
"outputs": [],
559573
"source": [
560574
"def denormalize(record):\n",
@@ -612,12 +626,21 @@
612626
" a normalised dataframe."
613627
]
614628
},
629+
{
630+
"cell_type": "markdown",
631+
"metadata": {},
632+
"source": [
633+
"## Shutdown"
634+
]
635+
},
615636
{
616637
"cell_type": "code",
617638
"execution_count": null,
618639
"metadata": {},
619640
"outputs": [],
620-
"source": []
641+
"source": [
642+
"client.shutdown()"
643+
]
621644
}
622645
],
623646
"metadata": {
@@ -637,9 +660,9 @@
637660
"name": "python",
638661
"nbconvert_exporter": "python",
639662
"pygments_lexer": "ipython3",
640-
"version": "3.6.7"
663+
"version": "3.7.3"
641664
}
642665
},
643666
"nbformat": 4,
644-
"nbformat_minor": 2
667+
"nbformat_minor": 4
645668
}

03_array.ipynb

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,17 @@
3131
"* [API reference](http://dask.readthedocs.io/en/latest/array-api.html)"
3232
]
3333
},
34+
{
35+
"cell_type": "code",
36+
"execution_count": null,
37+
"metadata": {},
38+
"outputs": [],
39+
"source": [
40+
"from dask.distributed import Client\n",
41+
"\n",
42+
"client = Client(processes=False)"
43+
]
44+
},
3445
{
3546
"cell_type": "markdown",
3647
"metadata": {},
@@ -292,9 +303,7 @@
292303
},
293304
{
294305
"cell_type": "markdown",
295-
"metadata": {
296-
"collapsed": true
297-
},
306+
"metadata": {},
298307
"source": [
299308
"Does this match your result from before?"
300309
]
@@ -505,6 +514,13 @@
505514
"dsets[0]"
506515
]
507516
},
517+
{
518+
"cell_type": "code",
519+
"execution_count": null,
520+
"metadata": {},
521+
"outputs": [],
522+
"source": []
523+
},
508524
{
509525
"cell_type": "code",
510526
"execution_count": null,
@@ -757,9 +773,7 @@
757773
{
758774
"cell_type": "code",
759775
"execution_count": null,
760-
"metadata": {
761-
"scrolled": true
762-
},
776+
"metadata": {},
763777
"outputs": [],
764778
"source": [
765779
"%time potential(cluster)"
@@ -846,9 +860,7 @@
846860
{
847861
"cell_type": "code",
848862
"execution_count": null,
849-
"metadata": {
850-
"scrolled": true
851-
},
863+
"metadata": {},
852864
"outputs": [],
853865
"source": [
854866
"e = potential_dask(dcluster)\n",
@@ -881,6 +893,15 @@
881893
" functions, like ``np.full_like`` have not been implemented purely out of\n",
882894
" laziness. These would make excellent community contributions."
883895
]
896+
},
897+
{
898+
"cell_type": "code",
899+
"execution_count": null,
900+
"metadata": {},
901+
"outputs": [],
902+
"source": [
903+
"client.shutdown()"
904+
]
884905
}
885906
],
886907
"metadata": {
@@ -900,9 +921,9 @@
900921
"name": "python",
901922
"nbconvert_exporter": "python",
902923
"pygments_lexer": "ipython3",
903-
"version": "3.6.7"
924+
"version": "3.7.3"
904925
}
905926
},
906927
"nbformat": 4,
907-
"nbformat_minor": 1
928+
"nbformat_minor": 4
908929
}

04_dataframe.ipynb

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@
4646
"## Setup"
4747
]
4848
},
49+
{
50+
"cell_type": "code",
51+
"execution_count": null,
52+
"metadata": {},
53+
"outputs": [],
54+
"source": [
55+
"from dask.distributed import Client\n",
56+
"\n",
57+
"client = Client()"
58+
]
59+
},
4960
{
5061
"cell_type": "markdown",
5162
"metadata": {},
@@ -60,7 +71,7 @@
6071
"outputs": [],
6172
"source": [
6273
"from prep import accounts_csvs\n",
63-
"accounts_csvs(3, 1000000, 500)\n",
74+
"accounts_csvs()\n",
6475
"\n",
6576
"import os\n",
6677
"import dask\n",
@@ -775,6 +786,15 @@
775786
" * From any set of functions creating sub dataframes via ``dd.from_delayed``.\n",
776787
" * Dask.bag: ``mybag.to_dataframe(columns=[...])``"
777788
]
789+
},
790+
{
791+
"cell_type": "code",
792+
"execution_count": null,
793+
"metadata": {},
794+
"outputs": [],
795+
"source": [
796+
"client.shutdown()"
797+
]
778798
}
779799
],
780800
"metadata": {
@@ -794,9 +814,9 @@
794814
"name": "python",
795815
"nbconvert_exporter": "python",
796816
"pygments_lexer": "ipython3",
797-
"version": "3.6.7"
817+
"version": "3.7.3"
798818
}
799819
},
800820
"nbformat": 4,
801-
"nbformat_minor": 2
821+
"nbformat_minor": 4
802822
}

05_distributed.ipynb

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -94,16 +94,13 @@
9494
"- How much faster was using threads over a single thread? Why does this differ from the optimal speedup?\n",
9595
"- Why is the multiprocessing scheduler so much slower here?\n",
9696
"\n",
97+
"The `threaded` scheduler is a fine choice for working with large datasets out-of-core on a single machine, as long as the functions being used release the [GIL](https://wiki.python.org/moin/GlobalInterpreterLock) most of the time. NumPy and pandas release the GIL in most places, so the `threaded` scheduler is the default for `dask.array` and `dask.dataframe`. The distributed scheduler, perhaps with `processes=False`, will also work well for these workloads on a single machine.\n",
9798
"\n",
98-
"For single-machine use, the threaded and multiprocessing schedulers are fine choices. They are solid, mature and performant, and require absolutely no set-up. As a rule of thumb, threaded will work well when the functions called release the [GIL](https://wiki.python.org/moin/GlobalInterpreterLock), whereas multiprocessing will always have a slower start-up time and suffer where a lot of communication is required between tasks. The *number* of workers is, in general, also important.\n",
99-
"\n"
100-
]
101-
},
102-
{
103-
"cell_type": "markdown",
104-
"metadata": {},
105-
"source": [
106-
"For scaling out work across a cluster, the distributed scheduler is required. Indeed, this is now generally preferred for all work, because it gives you additional monitoring information not available in the other schedulers. (Some of this monitoring is also available with an explicit progress bar and profiler, see [here](https://docs.dask.org/en/latest/diagnostics-local.html).)"
99+
"For workloads that do hold the GIL, as is common with `dask.bag` and custom code wrapped with `dask.delayed`, we recommend using the distributed scheduler, even on a single machine. Generally speaking, it's more intelligent and provides better diagnostics than the `processes` scheduler.\n",
100+
"\n",
101+
"https://docs.dask.org/en/latest/scheduling.html provides some additional details on choosing a scheduler.\n",
102+
"\n",
103+
"For scaling out work across a cluster, the distributed scheduler is required."
107104
]
108105
},
109106
{
@@ -147,9 +144,7 @@
147144
"cell_type": "markdown",
148145
"metadata": {},
149146
"source": [
150-
"Be sure to click the `Dashboard` link to open up the diagnostics dashboard.\n",
151-
"\n",
152-
"\n",
147+
"If you aren't in jupyterlab and using the `dask-labextension`, be sure to click the `Dashboard` link to open up the diagnostics dashboard.\n",
153148
"\n",
154149
"## Executing with the distributed client"
155150
]
@@ -299,6 +294,15 @@
299294
"# Average departure delay per day-of-week\n",
300295
"_ = df.groupby(df.Date.dt.dayofweek).DepDelay.mean().compute()"
301296
]
297+
},
298+
{
299+
"cell_type": "code",
300+
"execution_count": null,
301+
"metadata": {},
302+
"outputs": [],
303+
"source": [
304+
"client.shutdown()"
305+
]
302306
}
303307
],
304308
"metadata": {

06_distributed_advanced.ipynb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -655,9 +655,9 @@
655655
"name": "python",
656656
"nbconvert_exporter": "python",
657657
"pygments_lexer": "ipython3",
658-
"version": "3.7.2"
658+
"version": "3.7.3"
659659
}
660660
},
661661
"nbformat": 4,
662-
"nbformat_minor": 2
662+
"nbformat_minor": 4
663663
}

binder/environment.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,4 @@ dependencies:
3030
- ipywidgets>=7.5
3131
- cachey
3232
- python-graphviz
33+
- zarr

0 commit comments

Comments
 (0)