Skip to content

Commit d1c40b3

Browse files
committed
fixes #102
1 parent d5e43c9 commit d1c40b3

File tree

6 files changed

+60
-22
lines changed

6 files changed

+60
-22
lines changed

fastcore/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.0.15"
1+
__version__ = "1.0.16"

fastcore/_nbdev.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
"ContextManagers": "02_utils.ipynb",
121121
"set_num_threads": "02_utils.ipynb",
122122
"ProcessPoolExecutor": "02_utils.ipynb",
123+
"ThreadPoolExecutor": "02_utils.ipynb",
123124
"parallel": "02_utils.ipynb",
124125
"run_procs": "02_utils.ipynb",
125126
"parallel_gen": "02_utils.ipynb",

fastcore/all.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@
77
from .meta import *
88
from .imports import *
99
from .logargs import *
10+
from .script import *

fastcore/utils.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
'partialler', 'mapped', 'instantiate', 'using_attr', 'Self', 'Self', 'remove_patches_path', 'bunzip',
1010
'join_path_file', 'urlread', 'urljson', 'run_proc', 'do_request', 'sort_by_run', 'PrettyString',
1111
'round_multiple', 'even_mults', 'num_cpus', 'add_props', 'ContextManagers', 'set_num_threads',
12-
'ProcessPoolExecutor', 'parallel', 'run_procs', 'parallel_gen']
12+
'ProcessPoolExecutor', 'ThreadPoolExecutor', 'parallel', 'run_procs', 'parallel_gen']
1313

1414
# Cell
1515
from .imports import *
@@ -694,16 +694,35 @@ def map(self, f, items, timeout=None, chunksize=1, *args, **kwargs):
694694
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
695695
except Exception as e: self.on_exc(e)
696696

697+
# Cell
698+
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
699+
"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
700+
def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
701+
if max_workers is None: max_workers=defaults.cpus
702+
store_attr()
703+
self.not_parallel = max_workers==0
704+
if self.not_parallel: max_workers=1
705+
super().__init__(max_workers, **kwargs)
706+
707+
def map(self, f, items, timeout=None, chunksize=1, *args, **kwargs):
708+
self.lock = Manager().Lock()
709+
g = partial(f, *args, **kwargs)
710+
if self.not_parallel: return map(g, items)
711+
_g = partial(_call, self.lock, self.pause, self.max_workers, g)
712+
try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
713+
except Exception as e: self.on_exc(e)
714+
697715
# Cell
698716
try: from fastprogress import progress_bar
699717
except: progress_bar = None
700718

701719
# Cell
702720
def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
703-
timeout=None, chunksize=1, **kwargs):
721+
threadpool=False, timeout=None, chunksize=1, **kwargs):
704722
"Applies `func` in parallel to `items`, using `n_workers`"
705723
if progress is None: progress = progress_bar is not None
706-
with ProcessPoolExecutor(n_workers, pause=pause) as ex:
724+
pool = ThreadPoolExecutor if threadpool else ProcessPoolExecutor
725+
with pool(n_workers, pause=pause) as ex:
707726
r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)
708727
if progress:
709728
if total is None: total = len(items)

nbs/02_utils.ipynb

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@
294294
{
295295
"data": {
296296
"text/plain": [
297-
"<__main__._t at 0x7efe2eb5ef90>"
297+
"<__main__._t at 0x7f756accd110>"
298298
]
299299
},
300300
"execution_count": null,
@@ -1668,7 +1668,7 @@
16681668
{
16691669
"data": {
16701670
"text/plain": [
1671-
"['a', 'e', 'b', 'h', 'c', 'd', 'g', 'f']"
1671+
"['c', 'a', 'f', 'e', 'g', 'h', 'd', 'b']"
16721672
]
16731673
},
16741674
"execution_count": null,
@@ -3570,6 +3570,31 @@
35703570
" except Exception as e: self.on_exc(e)"
35713571
]
35723572
},
3573+
{
3574+
"cell_type": "code",
3575+
"execution_count": null,
3576+
"metadata": {},
3577+
"outputs": [],
3578+
"source": [
3579+
"#export\n",
3580+
"class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):\n",
3581+
" \"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution\"\n",
3582+
" def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):\n",
3583+
" if max_workers is None: max_workers=defaults.cpus\n",
3584+
" store_attr()\n",
3585+
" self.not_parallel = max_workers==0\n",
3586+
" if self.not_parallel: max_workers=1\n",
3587+
" super().__init__(max_workers, **kwargs)\n",
3588+
"\n",
3589+
" def map(self, f, items, timeout=None, chunksize=1, *args, **kwargs):\n",
3590+
" self.lock = Manager().Lock()\n",
3591+
" g = partial(f, *args, **kwargs)\n",
3592+
" if self.not_parallel: return map(g, items)\n",
3593+
" _g = partial(_call, self.lock, self.pause, self.max_workers, g)\n",
3594+
" try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)\n",
3595+
" except Exception as e: self.on_exc(e)"
3596+
]
3597+
},
35733598
{
35743599
"cell_type": "code",
35753600
"execution_count": null,
@@ -3580,7 +3605,7 @@
35803605
"text/markdown": [
35813606
"<h4 id=\"ProcessPoolExecutor\" class=\"doc_header\"><code>class</code> <code>ProcessPoolExecutor</code><a href=\"\" class=\"source_link\" style=\"float:right\">[source]</a></h4>\n",
35823607
"\n",
3583-
"> <code>ProcessPoolExecutor</code>(**`max_workers`**=*`64`*, **`on_exc`**=*`print`*, **`pause`**=*`0`*, **`mp_context`**=*`None`*, **`initializer`**=*`None`*, **`initargs`**=*`()`*) :: [`ProcessPoolExecutor`](/utils.html#ProcessPoolExecutor)\n",
3608+
"> <code>ProcessPoolExecutor</code>(**`max_workers`**=*`64`*, **`on_exc`**=*`print`*, **`pause`**=*`0`*, **\\*\\*`kwargs`**) :: [`ProcessPoolExecutor`](/utils.html#ProcessPoolExecutor)\n",
35843609
"\n",
35853610
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
35863611
],
@@ -3615,10 +3640,11 @@
36153640
"source": [
36163641
"#export \n",
36173642
"def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,\n",
3618-
" timeout=None, chunksize=1, **kwargs):\n",
3643+
" threadpool=False, timeout=None, chunksize=1, **kwargs):\n",
36193644
" \"Applies `func` in parallel to `items`, using `n_workers`\"\n",
36203645
" if progress is None: progress = progress_bar is not None\n",
3621-
" with ProcessPoolExecutor(n_workers, pause=pause) as ex:\n",
3646+
" pool = ThreadPoolExecutor if threadpool else ProcessPoolExecutor\n",
3647+
" with pool(n_workers, pause=pause) as ex:\n",
36223648
" r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)\n",
36233649
" if progress:\n",
36243650
" if total is None: total = len(items)\n",
@@ -3651,16 +3677,6 @@
36513677
"metadata": {},
36523678
"output_type": "display_data"
36533679
},
3654-
{
3655-
"data": {
3656-
"text/html": [],
3657-
"text/plain": [
3658-
"<IPython.core.display.HTML object>"
3659-
]
3660-
},
3661-
"metadata": {},
3662-
"output_type": "display_data"
3663-
},
36643680
{
36653681
"data": {
36663682
"text/html": [],
@@ -3678,7 +3694,8 @@
36783694
" return x+a\n",
36793695
"\n",
36803696
"inp,exp = range(50),range(1,51)\n",
3681-
"test_eq(parallel(add_one, inp, n_workers=2), exp)\n",
3697+
"test_eq(parallel(add_one, inp, n_workers=2, progress=False), exp)\n",
3698+
"test_eq(parallel(add_one, inp, threadpool=True, n_workers=2, progress=False), exp)\n",
36823699
"test_eq(parallel(add_one, inp, n_workers=0), exp)\n",
36833700
"test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52))\n",
36843701
"test_eq(parallel(add_one, inp, n_workers=0, a=2), range(2,52))"
@@ -3688,7 +3705,7 @@
36883705
"cell_type": "markdown",
36893706
"metadata": {},
36903707
"source": [
3691-
"Use the `pause` parameter to ensure a pause of `pause` seconds between processes starting. This is in case there are race conditions in starting some process, or to stagger the time each process starts, for example when making many requests to a webserver."
3708+
"Use the `pause` parameter to ensure a pause of `pause` seconds between processes starting. This is in case there are race conditions in starting some process, or to stagger the time each process starts, for example when making many requests to a webserver. Set `threadpool=True` to use `ThreadPoolExecutor` instead of `ProcessPoolExecutor`."
36923709
]
36933710
},
36943711
{

settings.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ author = Jeremy Howard and Sylvain Gugger
77
author_email = [email protected]
88
copyright = fast.ai
99
branch = master
10-
version = 1.0.15
10+
version = 1.0.16
1111
min_python = 3.6
1212
audience = Developers
1313
language = English

0 commit comments

Comments
 (0)