@@ -217,8 +217,10 @@ def _submit_mapnode(self, jobid):
217
217
return False
218
218
219
219
def _send_procs_to_workers (self , updatehash = False , graph = None ):
220
- """ Sends jobs to workers
221
220
"""
221
+ Sends jobs to workers
222
+ """
223
+
222
224
while not np .all (self .proc_done ):
223
225
num_jobs = len (self .pending_tasks )
224
226
if np .isinf (self .max_jobs ):
@@ -258,27 +260,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
258
260
(self .procs [jobid ]._id , jobid ))
259
261
if self ._status_callback :
260
262
self ._status_callback (self .procs [jobid ], 'start' )
261
- continue_with_submission = True
262
- if str2bool (self .procs [jobid ].config ['execution' ]
263
- ['local_hash_check' ]):
264
- logger .debug ('checking hash locally' )
265
- try :
266
- hash_exists , _ , _ , _ = self .procs [
267
- jobid ].hash_exists ()
268
- logger .debug ('Hash exists %s' % str (hash_exists ))
269
- if (hash_exists and (self .procs [jobid ].overwrite is False or
270
- (self .procs [jobid ].overwrite is None and not
271
- self .procs [jobid ]._interface .always_run ))):
272
- continue_with_submission = False
273
- self ._task_finished_cb (jobid )
274
- self ._remove_node_dirs ()
275
- except Exception :
276
- self ._clean_queue (jobid , graph )
277
- self .proc_pending [jobid ] = False
278
- continue_with_submission = False
279
- logger .debug ('Finished checking hash %s' %
280
- str (continue_with_submission ))
281
- if continue_with_submission :
263
+
264
+ if not self ._local_hash_check (jobid , graph ):
282
265
if self .procs [jobid ].run_without_submitting :
283
266
logger .debug ('Running node %s on master thread' %
284
267
self .procs [jobid ])
@@ -301,6 +284,31 @@ def _send_procs_to_workers(self, updatehash=False, graph=None):
301
284
else :
302
285
break
303
286
287
+ def _local_hash_check (self , jobid , graph ):
288
+ if not str2bool (self .procs [jobid ].config ['execution' ]['local_hash_check' ]):
289
+ return False
290
+
291
+ logger .debug ('Checking hash (%d) locally' , jobid )
292
+
293
+ hash_exists , _ , _ , _ = self .procs [jobid ].hash_exists ()
294
+ overwrite = self .procs [jobid ].overwrite
295
+ always_run = self .procs [jobid ]._interface .always_run
296
+
297
+ if hash_exists and (overwrite is False or
298
+ overwrite is None and not always_run ):
299
+ logger .debug ('Skipping cached node %s with ID %s.' ,
300
+ self .procs [jobid ]._id , jobid )
301
+ try :
302
+ self ._task_finished_cb (jobid )
303
+ self ._remove_node_dirs ()
304
+ except Exception :
305
+ logger .debug ('Error skipping cached node %s (%s).' ,
306
+ self .procs [jobid ]._id , jobid )
307
+ self ._clean_queue (jobid , graph )
308
+ self .proc_pending [jobid ] = False
309
+ return True
310
+ return False
311
+
304
312
def _task_finished_cb (self , jobid ):
305
313
""" Extract outputs and assign to inputs of dependent tasks
306
314
0 commit comments