Skip to content

Commit 875988a

Browse files
authored
Merge pull request #289 from mszhanyi/zhanyi/parallel
update parallel
2 parents dd3e161 + 07c7de4 commit 875988a

File tree

5 files changed

+87
-31
lines changed

5 files changed

+87
-31
lines changed

fastcore/_nbdev.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@
182182
"threaded": "03a_parallel.ipynb",
183183
"startthread": "03a_parallel.ipynb",
184184
"set_num_threads": "03a_parallel.ipynb",
185+
"check_parallel_num": "03a_parallel.ipynb",
185186
"ThreadPoolExecutor": "03a_parallel.ipynb",
186187
"ProcessPoolExecutor": "03a_parallel.ipynb",
187188
"parallel": "03a_parallel.ipynb",

fastcore/parallel.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/03a_parallel.ipynb (unless otherwise specified).
22

3-
__all__ = ['threaded', 'startthread', 'set_num_threads', 'ThreadPoolExecutor', 'ProcessPoolExecutor', 'parallel',
4-
'run_procs', 'parallel_gen']
3+
__all__ = ['threaded', 'startthread', 'set_num_threads', 'check_parallel_num', 'ThreadPoolExecutor',
4+
'ProcessPoolExecutor', 'parallel', 'run_procs', 'parallel_gen']
55

66
# Cell
77
from .imports import *
@@ -57,6 +57,14 @@ def _call(lock, pause, n, g, item):
5757
if l: lock.release()
5858
return g(item)
5959

60+
# Cell
61+
def check_parallel_num(param_name, num_workers):
62+
if sys.platform == "win32" and IN_NOTEBOOK and num_workers > 0:
63+
print("Due to IPython and Windows limitation, python multiprocessing isn't available now.")
64+
print(f"So `{param_name}` is changed to 0 to avoid getting stuck")
65+
num_workers = 0
66+
return num_workers
67+
6068
# Cell
6169
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
6270
"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
@@ -80,6 +88,7 @@ class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
8088
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
8189
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
8290
if max_workers is None: max_workers=defaults.cpus
91+
max_workers = check_parallel_num('max_workers', max_workers)
8392
store_attr()
8493
self.not_parallel = max_workers==0
8594
if self.not_parallel: max_workers=1
@@ -126,6 +135,7 @@ def _done_pg(queue, items): return (queue.get() for _ in items)
126135
# Cell
127136
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
128137
"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
138+
n_workers = check_parallel_num('n_workers', n_workers)
129139
if n_workers==0:
130140
yield from enumerate(list(cls(**kwargs)(items)))
131141
return

nbs/03_xtras.ipynb

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@
618618
{
619619
"data": {
620620
"text/plain": [
621-
"['e', 'f', 'h', 'a', 'd', 'c', 'g', 'b']"
621+
"['c', 'f', 'e', 'g', 'h', 'b', 'd', 'a']"
622622
]
623623
},
624624
"execution_count": null,
@@ -710,7 +710,7 @@
710710
" with maybe_open(fn) as f: return f.encoding\n",
711711
"\n",
712712
"fname = '00_test.ipynb'\n",
713-
"sys_encoding = 'cp1252' if sys.platform == \"win32\" else 'UTF-8'\n",
713+
"sys_encoding = 'cp1252' if sys.platform == 'win32' else 'UTF-8'\n",
714714
"test_eq(_f(fname), sys_encoding)\n",
715715
"with open(fname) as fh: test_eq(_f(fh), sys_encoding)"
716716
]
@@ -1264,7 +1264,7 @@
12641264
{
12651265
"data": {
12661266
"text/plain": [
1267-
"Path('08_script.ipynb')"
1267+
"Path('.gitattributes')"
12681268
]
12691269
},
12701270
"execution_count": null,
@@ -1298,7 +1298,7 @@
12981298
{
12991299
"data": {
13001300
"text/plain": [
1301-
"(Path('../fastcore/xtras.py'), Path('08_script.ipynb'))"
1301+
"(Path('../fastcore/__init__.py'), Path('01_basics.ipynb'))"
13021302
]
13031303
},
13041304
"execution_count": null,
@@ -1575,8 +1575,8 @@
15751575
"name": "stdout",
15761576
"output_type": "stream",
15771577
"text": [
1578-
"Num Events: 1, Freq/sec: 340.3\n",
1579-
"Most recent: ▅▁▇▁▅ 287.7 217.2 325.6 229.3 290.2\n"
1578+
"Num Events: 8, Freq/sec: 423.0\n",
1579+
"Most recent: ▂▂▁▁▇ 318.5 319.0 266.9 275.6 427.7\n"
15801580
]
15811581
}
15821582
],
@@ -1725,7 +1725,7 @@
17251725
"name": "stdout",
17261726
"output_type": "stream",
17271727
"text": [
1728-
"2000-01-01 12:00:00 UTC is 2000-01-01 04:00:00-08:00 local time\n"
1728+
"2000-01-01 12:00:00 UTC is 2000-01-01 12:00:00+00:00 local time\n"
17291729
]
17301730
}
17311731
],
@@ -1755,7 +1755,7 @@
17551755
"name": "stdout",
17561756
"output_type": "stream",
17571757
"text": [
1758-
"2000-01-01 12:00:00 local is 2000-01-01 20:00:00+00:00 UTC time\n"
1758+
"2000-01-01 12:00:00 local is 2000-01-01 12:00:00+00:00 UTC time\n"
17591759
]
17601760
}
17611761
],

nbs/03a_parallel.ipynb

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,21 @@
186186
" return g(item)"
187187
]
188188
},
189+
{
190+
"cell_type": "code",
191+
"execution_count": null,
192+
"metadata": {},
193+
"outputs": [],
194+
"source": [
195+
"#export\n",
196+
"def check_parallel_num(param_name, num_workers):\n",
197+
" if sys.platform == \"win32\" and IN_NOTEBOOK and num_workers > 0:\n",
198+
" print(\"Due to IPython and Windows limitation, python multiprocessing isn't available now.\")\n",
199+
" print(f\"So `{param_name}` is changed to 0 to avoid getting stuck\")\n",
200+
" num_workers = 0\n",
201+
" return num_workers"
202+
]
203+
},
189204
{
190205
"cell_type": "code",
191206
"execution_count": null,
@@ -221,7 +236,7 @@
221236
"text/markdown": [
222237
"<h4 id=\"ThreadPoolExecutor\" class=\"doc_header\"><code>class</code> <code>ThreadPoolExecutor</code><a href=\"\" class=\"source_link\" style=\"float:right\">[source]</a></h4>\n",
223238
"\n",
224-
"> <code>ThreadPoolExecutor</code>(**`max_workers`**=*`16`*, **`on_exc`**=*`print`*, **`pause`**=*`0`*, **\\*\\*`kwargs`**) :: [`ThreadPoolExecutor`](/parallel.html#ThreadPoolExecutor)\n",
239+
"> <code>ThreadPoolExecutor</code>(**`max_workers`**=*`20`*, **`on_exc`**=*`print`*, **`pause`**=*`0`*, **\\*\\*`kwargs`**) :: [`ThreadPoolExecutor`](/parallel.html#ThreadPoolExecutor)\n",
225240
"\n",
226241
"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
227242
],
@@ -248,6 +263,7 @@
248263
" \"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution\"\n",
249264
" def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):\n",
250265
" if max_workers is None: max_workers=defaults.cpus\n",
266+
" max_workers = check_parallel_num('max_workers', max_workers)\n",
251267
" store_attr()\n",
252268
" self.not_parallel = max_workers==0\n",
253269
" if self.not_parallel: max_workers=1\n",
@@ -272,7 +288,7 @@
272288
"text/markdown": [
273289
"<h4 id=\"ProcessPoolExecutor\" class=\"doc_header\"><code>class</code> <code>ProcessPoolExecutor</code><a href=\"\" class=\"source_link\" style=\"float:right\">[source]</a></h4>\n",
274290
"\n",
275-
"> <code>ProcessPoolExecutor</code>(**`max_workers`**=*`16`*, **`on_exc`**=*`print`*, **`pause`**=*`0`*, **\\*\\*`kwargs`**) :: [`ProcessPoolExecutor`](/parallel.html#ProcessPoolExecutor)\n",
291+
"> <code>ProcessPoolExecutor</code>(**`max_workers`**=*`20`*, **`on_exc`**=*`print`*, **`pause`**=*`0`*, **\\*\\*`kwargs`**) :: [`ProcessPoolExecutor`](/parallel.html#ProcessPoolExecutor)\n",
276292
"\n",
277293
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
278294
],
@@ -305,7 +321,7 @@
305321
"metadata": {},
306322
"outputs": [],
307323
"source": [
308-
"#export \n",
324+
"#export\n",
309325
"def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,\n",
310326
" threadpool=False, timeout=None, chunksize=1, **kwargs):\n",
311327
" \"Applies `func` in parallel to `items`, using `n_workers`\"\n",
@@ -329,10 +345,10 @@
329345
" return x+a\n",
330346
"\n",
331347
"inp,exp = range(50),range(1,51)\n",
332-
"if sys.platform != \"win32\":\n",
333-
" test_eq(parallel(add_one, inp, n_workers=2, progress=False), exp)\n",
334-
" test_eq(parallel(add_one, inp, threadpool=True, n_workers=2, progress=False), exp)\n",
335-
" test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52))\n",
348+
"\n",
349+
"test_eq(parallel(add_one, inp, n_workers=2, progress=False), exp)\n",
350+
"test_eq(parallel(add_one, inp, threadpool=True, n_workers=2, progress=False), exp)\n",
351+
"test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52))\n",
336352
"test_eq(parallel(add_one, inp, n_workers=0), exp)\n",
337353
"test_eq(parallel(add_one, inp, n_workers=0, a=2), range(2,52))"
338354
]
@@ -362,11 +378,11 @@
362378
"name": "stdout",
363379
"output_type": "stream",
364380
"text": [
365-
"0 2021-01-22 21:17:38.942321\n",
366-
"1 2021-01-22 21:17:39.192929\n",
367-
"2 2021-01-22 21:17:39.444098\n",
368-
"3 2021-01-22 21:17:39.695087\n",
369-
"4 2021-01-22 21:17:39.946463\n"
381+
"0 2021-02-03 09:51:30.561681\n",
382+
"1 2021-02-03 09:51:30.812066\n",
383+
"2 2021-02-03 09:51:31.063662\n",
384+
"3 2021-02-03 09:51:31.313478\n",
385+
"4 2021-02-03 09:51:31.564776\n"
370386
]
371387
}
372388
],
@@ -375,8 +391,7 @@
375391
" time.sleep(random.random()/1000)\n",
376392
" print(i, datetime.now())\n",
377393
"\n",
378-
"test_n_workers = 0 if sys.platform == \"win32\" else 2\n",
379-
"parallel(print_time, range(5), n_workers=test_n_workers, pause=0.25);"
394+
"parallel(print_time, range(5), n_workers=2, pause=0.25);"
380395
]
381396
},
382397
{
@@ -423,6 +438,7 @@
423438
"#export \n",
424439
"def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):\n",
425440
" \"Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel.\"\n",
441+
" n_workers = check_parallel_num('n_workers', n_workers)\n",
426442
" if n_workers==0:\n",
427443
" yield from enumerate(list(cls(**kwargs)(items)))\n",
428444
" return\n",
@@ -461,10 +477,9 @@
461477
"idxs,dat1 = zip(*res.sorted(itemgetter(0)))\n",
462478
"test_eq(dat1, range(1,6))\n",
463479
"\n",
464-
"if sys.platform != \"win32\":\n",
465-
" res = L(parallel_gen(_C, items, n_workers=3))\n",
466-
" idxs,dat2 = zip(*res.sorted(itemgetter(0)))\n",
467-
" test_eq(dat2, dat1)"
480+
"res = L(parallel_gen(_C, items, n_workers=3))\n",
481+
"idxs,dat2 = zip(*res.sorted(itemgetter(0)))\n",
482+
"test_eq(dat2, dat1)"
468483
]
469484
},
470485
{
@@ -500,8 +515,8 @@
500515
" yield k+self.a\n",
501516
"\n",
502517
"x = np.linspace(0,0.99,20)\n",
503-
"test_n_workers = 0 if sys.platform == \"win32\" else 2\n",
504-
"res = L(parallel_gen(TestSleepyBatchFunc, x, n_workers=test_n_workers))\n",
518+
"\n",
519+
"res = L(parallel_gen(TestSleepyBatchFunc, x, n_workers=2))\n",
505520
"test_eq(res.sorted().itemgot(1), x+1)"
506521
]
507522
},
@@ -546,7 +561,14 @@
546561
"execution_count": null,
547562
"metadata": {},
548563
"outputs": [],
549-
"source": []
564+
"source": [
565+
"from subprocess import Popen, PIPE\n",
566+
"# test num_workers > 0 in scripts works when python process start method is spawn\n",
567+
"process = Popen([\"python\", \"parallel_test.py\"], stdout=PIPE)\n",
568+
"_, err = process.communicate(timeout=5)\n",
569+
"exit_code = process.wait()\n",
570+
"test_eq(exit_code, 0)"
571+
]
550572
}
551573
],
552574
"metadata": {

nbs/parallel_test.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from fastcore.parallel import *
2+
from datetime import *
3+
import random
4+
import os
5+
6+
# from contextlib import contextmanager,ExitStack
7+
from multiprocessing import Process, Queue
8+
import concurrent.futures,time
9+
from multiprocessing import Manager
10+
11+
def print_time(i):
12+
time.sleep(random.random()/1000)
13+
print(i, os.getpid(), datetime.now())
14+
15+
class _C:
16+
def __call__(self, o): return ((i+1) for i in o)
17+
18+
items = range(5)
19+
20+
if __name__ == "__main__":
21+
parallel(print_time, range(5), n_workers=2, pause=0.25);
22+
res = list(parallel_gen(_C, items, n_workers=2))
23+
print(res)

0 commit comments

Comments
 (0)