17
17
18
18
# --- helper functions for multiprocessing --
19
19
20
- def _initializer (table , jobs , populate_kwargs ):
20
+ def _initialize_populate (table , jobs , populate_kwargs ):
21
21
"""
22
- Process initializer for mulitprocessing.
22
+ Initialize the process for mulitprocessing.
23
23
Saves the unpickled copy of the table to the current process and reconnects.
24
24
"""
25
25
process = mp .current_process ()
@@ -159,9 +159,9 @@ def handler(signum, frame):
159
159
160
160
logger .info ('Found %d keys to populate' % len (keys ))
161
161
162
- if max_calls is not None :
163
- keys = keys [:max_calls ]
162
+ keys = keys [:max_calls ]
164
163
nkeys = len (keys )
164
+
165
165
if processes > 1 :
166
166
processes = min (processes , nkeys , mp .cpu_count ())
167
167
@@ -179,7 +179,7 @@ def handler(signum, frame):
179
179
# spawn multiple processes
180
180
self .connection .close () # disconnect parent process from MySQL server
181
181
del self .connection ._conn .ctx # SSLContext is not pickleable
182
- with mp .Pool (processes , _initializer , (self , populate_kwargs )) as pool :
182
+ with mp .Pool (processes , _initialize_populate , (self , populate_kwargs )) as pool :
183
183
if display_progress :
184
184
with tqdm (desc = "Processes: " , total = nkeys ) as pbar :
185
185
for error in pool .imap (_call_populate1 , keys , chunksize = 1 ):
@@ -201,12 +201,12 @@ def handler(signum, frame):
201
201
202
202
def _populate1 (self , key , jobs , suppress_errors , return_exception_objects ):
203
203
"""
204
- populates table for one key, calling self.make inside a transaction
204
+ populates table for one source key, calling self.make inside a transaction.
205
205
:param jobs: the jobs table or None if not reserve_jobs
206
206
:param key: dict specifying job to populate
207
207
:param suppress_errors: bool if errors should be suppressed and returned
208
208
:param return_exception_objects: if True, errors must be returned as objects
209
- :return: key and error when suppress_errors=True, otherwise None
209
+ :return: ( key, error) when suppress_errors=True, otherwise None
210
210
"""
211
211
make = self ._make_tuples if hasattr (self , '_make_tuples' ) else self .make
212
212
@@ -248,8 +248,8 @@ def _populate1(self, key, jobs, suppress_errors, return_exception_objects):
248
248
249
249
def progress (self , * restrictions , display = True ):
250
250
"""
251
- report progress of populating the table
252
- :return: remaining, total -- tuples to be populated
251
+ Report the progress of populating the table.
252
+ :return: ( remaining, total) -- numbers of tuples to be populated
253
253
"""
254
254
todo = self ._jobs_to_do (restrictions )
255
255
total = len (todo )
0 commit comments