Skip to content

Commit 915bd93

Browse files
authored
Merge pull request #294 from mszhanyi/zhanyi/moremp
parallel in win notebook
2 parents 5d311bf + e1536a5 commit 915bd93

File tree

4 files changed

+99
-27
lines changed

4 files changed

+99
-27
lines changed

fastcore/_nbdev.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,11 @@
182182
"threaded": "03a_parallel.ipynb",
183183
"startthread": "03a_parallel.ipynb",
184184
"set_num_threads": "03a_parallel.ipynb",
185-
"check_parallel_num": "03a_parallel.ipynb",
185+
"parallelable": "03a_parallel.ipynb",
186186
"ThreadPoolExecutor": "03a_parallel.ipynb",
187187
"ProcessPoolExecutor": "03a_parallel.ipynb",
188188
"parallel": "03a_parallel.ipynb",
189+
"add_one": "03a_parallel.ipynb",
189190
"run_procs": "03a_parallel.ipynb",
190191
"parallel_gen": "03a_parallel.ipynb",
191192
"url_default_headers": "03b_net.ipynb",

fastcore/parallel.py

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/03a_parallel.ipynb (unless otherwise specified).
22

3-
__all__ = ['threaded', 'startthread', 'set_num_threads', 'check_parallel_num', 'ThreadPoolExecutor',
4-
'ProcessPoolExecutor', 'parallel', 'run_procs', 'parallel_gen']
3+
__all__ = ['threaded', 'startthread', 'set_num_threads', 'parallelable', 'ThreadPoolExecutor', 'ProcessPoolExecutor',
4+
'parallel', 'add_one', 'run_procs', 'parallel_gen']
55

66
# Cell
77
from .imports import *
@@ -58,12 +58,13 @@ def _call(lock, pause, n, g, item):
5858
return g(item)
5959

6060
# Cell
61-
def check_parallel_num(param_name, num_workers):
62-
if sys.platform == "win32" and IN_NOTEBOOK and num_workers > 0:
61+
def parallelable(param_name, num_workers, f=None):
62+
f_in_main = f == None or sys.modules[f.__module__].__name__ == "__main__"
63+
if sys.platform == "win32" and IN_NOTEBOOK and num_workers > 0 and f_in_main:
6364
print("Due to IPython and Windows limitation, python multiprocessing isn't available now.")
64-
print(f"So `{param_name}` is changed to 0 to avoid getting stuck")
65-
num_workers = 0
66-
return num_workers
65+
print(f"So `{param_name}` has to be changed to 0 to avoid getting stuck")
66+
return False
67+
return True
6768

6869
# Cell
6970
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
@@ -88,13 +89,16 @@ class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
8889
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
8990
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
9091
if max_workers is None: max_workers=defaults.cpus
91-
max_workers = check_parallel_num('max_workers', max_workers)
9292
store_attr()
9393
self.not_parallel = max_workers==0
9494
if self.not_parallel: max_workers=1
9595
super().__init__(max_workers, **kwargs)
9696

9797
def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
98+
if not parallelable('max_workers', self.max_workers, f): self.max_workers = 0
99+
self.not_parallel = self.max_workers==0
100+
if self.not_parallel: self.max_workers=1
101+
98102
if self.not_parallel == False: self.lock = Manager().Lock()
99103
g = partial(f, *args, **kwargs)
100104
if self.not_parallel: return map(g, items)
@@ -118,6 +122,13 @@ def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None
118122
r = progress_bar(r, total=total, leave=False)
119123
return L(r)
120124

125+
# Cell
126+
def add_one(x, a=1):
127+
# this import is necessary for multiprocessing in notebook on windows
128+
import random
129+
time.sleep(random.random()/80)
130+
return x+a
131+
121132
# Cell
122133
def run_procs(f, f_done, args):
123134
"Call `f` for each item in `args` in parallel, yielding `f_done`"
@@ -135,7 +146,7 @@ def _done_pg(queue, items): return (queue.get() for _ in items)
135146
# Cell
136147
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
137148
"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
138-
n_workers = check_parallel_num('n_workers', n_workers)
149+
if not parallelable('n_workers', n_workers): n_workers = 0
139150
if n_workers==0:
140151
yield from enumerate(list(cls(**kwargs)(items)))
141152
return

nbs/03a_parallel.ipynb

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,13 @@
193193
"outputs": [],
194194
"source": [
195195
"#export\n",
196-
"def check_parallel_num(param_name, num_workers):\n",
197-
" if sys.platform == \"win32\" and IN_NOTEBOOK and num_workers > 0:\n",
196+
"def parallelable(param_name, num_workers, f=None):\n",
197+
" f_in_main = f == None or sys.modules[f.__module__].__name__ == \"__main__\" \n",
198+
" if sys.platform == \"win32\" and IN_NOTEBOOK and num_workers > 0 and f_in_main:\n",
198199
" print(\"Due to IPython and Windows limitation, python multiprocessing isn't available now.\")\n",
199-
" print(f\"So `{param_name}` is changed to 0 to avoid getting stuck\")\n",
200-
" num_workers = 0\n",
201-
" return num_workers"
200+
" print(f\"So `{param_name}` has to be changed to 0 to avoid getting stuck\")\n",
201+
" return False\n",
202+
" return True"
202203
]
203204
},
204205
{
@@ -263,13 +264,16 @@
263264
" \"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution\"\n",
264265
" def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):\n",
265266
" if max_workers is None: max_workers=defaults.cpus\n",
266-
" max_workers = check_parallel_num('max_workers', max_workers)\n",
267267
" store_attr()\n",
268268
" self.not_parallel = max_workers==0\n",
269269
" if self.not_parallel: max_workers=1\n",
270270
" super().__init__(max_workers, **kwargs)\n",
271271
"\n",
272272
" def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):\n",
273+
" if not parallelable('max_workers', self.max_workers, f): self.max_workers = 0\n",
274+
" self.not_parallel = self.max_workers==0\n",
275+
" if self.not_parallel: self.max_workers=1\n",
276+
" \n",
273277
" if self.not_parallel == False: self.lock = Manager().Lock()\n",
274278
" g = partial(f, *args, **kwargs)\n",
275279
" if self.not_parallel: return map(g, items)\n",
@@ -340,10 +344,20 @@
340344
"metadata": {},
341345
"outputs": [],
342346
"source": [
343-
"def add_one(x, a=1): \n",
347+
"#export\n",
348+
"def add_one(x, a=1):\n",
349+
" # this import is necessary for multiprocessing in notebook on windows\n",
350+
" import random\n",
344351
" time.sleep(random.random()/80)\n",
345-
" return x+a\n",
346-
"\n",
352+
" return x+a"
353+
]
354+
},
355+
{
356+
"cell_type": "code",
357+
"execution_count": null,
358+
"metadata": {},
359+
"outputs": [],
360+
"source": [
347361
"inp,exp = range(50),range(1,51)\n",
348362
"\n",
349363
"test_eq(parallel(add_one, inp, n_workers=2, progress=False), exp)\n",
@@ -378,11 +392,11 @@
378392
"name": "stdout",
379393
"output_type": "stream",
380394
"text": [
381-
"0 2021-02-03 09:51:30.561681\n",
382-
"1 2021-02-03 09:51:30.812066\n",
383-
"2 2021-02-03 09:51:31.063662\n",
384-
"3 2021-02-03 09:51:31.313478\n",
385-
"4 2021-02-03 09:51:31.564776\n"
395+
"0 2021-02-23 06:38:58.778425\n",
396+
"1 2021-02-23 06:38:59.028804\n",
397+
"2 2021-02-23 06:38:59.280227\n",
398+
"3 2021-02-23 06:38:59.530889\n",
399+
"4 2021-02-23 06:38:59.781011\n"
386400
]
387401
}
388402
],
@@ -438,15 +452,15 @@
438452
"#export \n",
439453
"def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):\n",
440454
" \"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel.\"\n",
441-
" n_workers = check_parallel_num('n_workers', n_workers)\n",
455+
" if not parallelable('n_workers', n_workers): n_workers = 0\n",
442456
" if n_workers==0:\n",
443457
" yield from enumerate(list(cls(**kwargs)(items)))\n",
444458
" return\n",
445459
" batches = L(chunked(items, n_chunks=n_workers))\n",
446460
" idx = L(itertools.accumulate(0 + batches.map(len)))\n",
447461
" queue = Queue()\n",
448462
" if progress_bar: items = progress_bar(items, leave=False)\n",
449-
" f=partial(_f_pg, cls(**kwargs), queue)\n",
463+
" f=partial(_f_pg, cls(**kwargs), queue) \n",
450464
" done=partial(_done_pg, queue, items)\n",
451465
" yield from run_procs(f, done, L(batches,idx).zip())"
452466
]
@@ -546,7 +560,8 @@
546560
"Converted 05_transform.ipynb.\n",
547561
"Converted 07_meta.ipynb.\n",
548562
"Converted 08_script.ipynb.\n",
549-
"Converted index.ipynb.\n"
563+
"Converted index.ipynb.\n",
564+
"Converted parallel_win.ipynb.\n"
550565
]
551566
}
552567
],
@@ -569,6 +584,13 @@
569584
"exit_code = process.wait()\n",
570585
"test_eq(exit_code, 0)"
571586
]
587+
},
588+
{
589+
"cell_type": "code",
590+
"execution_count": null,
591+
"metadata": {},
592+
"outputs": [],
593+
"source": []
572594
}
573595
],
574596
"metadata": {

nbs/parallel_win.ipynb

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"id": "informational-central",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"from fastcore.test import *\n",
11+
"from fastcore.parallel import *\n",
12+
"\n",
13+
"if __name__ == \"__main__\":\n",
14+
" inp,exp = range(50),range(1,51)\n",
15+
"\n",
16+
" test_eq(parallel(add_one, inp, n_workers=2, progress=False), exp)\n",
17+
" test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52))"
18+
]
19+
},
20+
{
21+
"cell_type": "code",
22+
"execution_count": null,
23+
"id": "developing-darwin",
24+
"metadata": {},
25+
"outputs": [],
26+
"source": []
27+
}
28+
],
29+
"metadata": {
30+
"kernelspec": {
31+
"display_name": "Python 3",
32+
"language": "python",
33+
"name": "python3"
34+
}
35+
},
36+
"nbformat": 4,
37+
"nbformat_minor": 5
38+
}

0 commit comments

Comments
 (0)