11# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/03a_parallel.ipynb.
22
33# %% auto 0
4- __all__ = ['threaded' , 'startthread' , 'parallelable ' , 'ThreadPoolExecutor ' , 'ProcessPoolExecutor ' , 'parallel ' , 'add_one ' ,
5- 'run_procs' , 'parallel_gen' ]
4+ __all__ = ['threaded' , 'startthread' , 'startproc ' , 'parallelable ' , 'ThreadPoolExecutor ' , 'ProcessPoolExecutor ' , 'parallel ' ,
5+ 'add_one' , ' run_procs' , 'parallel_gen' ]
66
77# %% ../nbs/03a_parallel.ipynb 1
88from .imports import *
@@ -38,9 +38,14 @@ def _f(*args, **kwargs):
3838# %% ../nbs/03a_parallel.ipynb 6
3939def startthread (f ):
4040 "Like `threaded`, but start thread immediately"
41- threaded (f )()
41+ return threaded (f )()
4242
4343# %% ../nbs/03a_parallel.ipynb 8
44+ def startproc (f ):
45+ "Like `threaded(True)`, but start Process immediately"
46+ return threaded (True )(f )()
47+
48+ # %% ../nbs/03a_parallel.ipynb 10
4449def _call (lock , pause , n , g , item ):
4550 l = False
4651 if pause :
@@ -51,7 +56,7 @@ def _call(lock, pause, n, g, item):
5156 if l : lock .release ()
5257 return g (item )
5358
54- # %% ../nbs/03a_parallel.ipynb 9
59+ # %% ../nbs/03a_parallel.ipynb 11
5560def parallelable (param_name , num_workers , f = None ):
5661 f_in_main = f == None or sys .modules [f .__module__ ].__name__ == "__main__"
5762 if sys .platform == "win32" and IN_NOTEBOOK and num_workers > 0 and f_in_main :
@@ -60,7 +65,7 @@ def parallelable(param_name, num_workers, f=None):
6065 return False
6166 return True
6267
63- # %% ../nbs/03a_parallel.ipynb 10
68+ # %% ../nbs/03a_parallel.ipynb 12
6469class ThreadPoolExecutor (concurrent .futures .ThreadPoolExecutor ):
6570 "Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
6671 def __init__ (self , max_workers = defaults .cpus , on_exc = print , pause = 0 , ** kwargs ):
@@ -78,7 +83,7 @@ def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
7883 try : return super ().map (_g , items , timeout = timeout , chunksize = chunksize )
7984 except Exception as e : self .on_exc (e )
8085
81- # %% ../nbs/03a_parallel.ipynb 12
86+ # %% ../nbs/03a_parallel.ipynb 14
8287@delegates ()
8388class ProcessPoolExecutor (concurrent .futures .ProcessPoolExecutor ):
8489 "Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
@@ -101,11 +106,11 @@ def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
101106 try : return super ().map (_g , items , timeout = timeout , chunksize = chunksize )
102107 except Exception as e : self .on_exc (e )
103108
104- # %% ../nbs/03a_parallel.ipynb 14
109+ # %% ../nbs/03a_parallel.ipynb 16
105110try : from fastprogress import progress_bar
106111except : progress_bar = None
107112
108- # %% ../nbs/03a_parallel.ipynb 15
113+ # %% ../nbs/03a_parallel.ipynb 17
109114def parallel (f , items , * args , n_workers = defaults .cpus , total = None , progress = None , pause = 0 ,
110115 method = None , threadpool = False , timeout = None , chunksize = 1 , ** kwargs ):
111116 "Applies `func` in parallel to `items`, using `n_workers`"
@@ -122,28 +127,28 @@ def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None
122127 r = progress_bar (r , total = total , leave = False )
123128 return L (r )
124129
125- # %% ../nbs/03a_parallel.ipynb 16
130+ # %% ../nbs/03a_parallel.ipynb 18
126131def add_one (x , a = 1 ):
127132 # this import is necessary for multiprocessing in notebook on windows
128133 import random
129134 time .sleep (random .random ()/ 80 )
130135 return x + a
131136
132- # %% ../nbs/03a_parallel.ipynb 22
137+ # %% ../nbs/03a_parallel.ipynb 24
133138def run_procs (f , f_done , args ):
134139 "Call `f` for each item in `args` in parallel, yielding `f_done`"
135140 processes = L (args ).map (Process , args = arg0 , target = f )
136141 for o in processes : o .start ()
137142 yield from f_done ()
138143 processes .map (Self .join ())
139144
140- # %% ../nbs/03a_parallel.ipynb 23
145+ # %% ../nbs/03a_parallel.ipynb 25
141146def _f_pg (obj , queue , batch , start_idx ):
142147 for i ,b in enumerate (obj (batch )): queue .put ((start_idx + i ,b ))
143148
144149def _done_pg (queue , items ): return (queue .get () for _ in items )
145150
146- # %% ../nbs/03a_parallel.ipynb 24
151+ # %% ../nbs/03a_parallel.ipynb 26
147152def parallel_gen (cls , items , n_workers = defaults .cpus , ** kwargs ):
148153 "Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
149154 if not parallelable ('n_workers' , n_workers ): n_workers = 0
0 commit comments