@@ -218,7 +218,7 @@ def run(self, graph, config, updatehash=False):
218
218
# setup polling - TODO: change to threaded model
219
219
notrun = []
220
220
while np .any (self .proc_done == False ) | \
221
- np .any (self .proc_pending == True ):
221
+ np .any (self .proc_pending == True ):
222
222
toappend = []
223
223
# trigger callbacks for any pending results
224
224
while self .pending_tasks :
@@ -297,11 +297,12 @@ def _submit_mapnode(self, jobid):
297
297
self .procs .extend (mapnodesubids )
298
298
self .depidx = ssp .vstack ((self .depidx ,
299
299
ssp .lil_matrix (np .zeros ((numnodes ,
300
- self .depidx .shape [1 ])))),
300
+ self .depidx .shape [1 ])))),
301
301
'lil' )
302
302
self .depidx = ssp .hstack ((self .depidx ,
303
- ssp .lil_matrix (np .zeros ((self .depidx .shape [0 ],
304
- numnodes )))),
303
+ ssp .lil_matrix (
304
+ np .zeros ((self .depidx .shape [0 ],
305
+ numnodes )))),
305
306
'lil' )
306
307
self .depidx [- numnodes :, jobid ] = 1
307
308
self .proc_done = np .concatenate ((self .proc_done ,
@@ -315,7 +316,7 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
315
316
"""
316
317
while np .any (self .proc_done == False ):
317
318
# Check to see if a job is available
318
- jobids = np .flatnonzero ((self .proc_done == False ) & \
319
+ jobids = np .flatnonzero ((self .proc_done == False ) &
319
320
(self .depidx .sum (axis = 0 ) == 0 ).__array__ ())
320
321
if len (jobids ) > 0 :
321
322
# send all available jobs
@@ -336,20 +337,21 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
336
337
self .proc_done [jobid ] = True
337
338
self .proc_pending [jobid ] = True
338
339
# Send job to task manager and add to pending tasks
339
- logger .info ('Executing: %s ID: %d' % \
340
- (self .procs [jobid ]._id , jobid ))
340
+ logger .info ('Executing: %s ID: %d' %
341
+ (self .procs [jobid ]._id , jobid ))
341
342
if self ._status_callback :
342
343
self ._status_callback (self .procs [jobid ], 'start' )
343
344
continue_with_submission = True
344
345
if str2bool (self .procs [jobid ].config ['execution' ]['local_hash_check' ]):
345
346
logger .debug ('checking hash locally' )
346
347
try :
347
- hash_exists , _ , _ , _ = self .procs [jobid ].hash_exists ()
348
+ hash_exists , _ , _ , _ = self .procs [
349
+ jobid ].hash_exists ()
348
350
logger .debug ('Hash exists %s' % str (hash_exists ))
349
351
if (hash_exists and
350
- (self .procs [jobid ].overwrite == False or
351
- (self .procs [jobid ].overwrite == None and
352
- not self .procs [jobid ]._interface .always_run ))):
352
+ (self .procs [jobid ].overwrite == False or
353
+ (self .procs [jobid ].overwrite == None and
354
+ not self .procs [jobid ]._interface .always_run ))):
353
355
continue_with_submission = False
354
356
self ._task_finished_cb (jobid )
355
357
self ._remove_node_dirs ()
@@ -385,7 +387,7 @@ def _task_finished_cb(self, jobid):
385
387
386
388
This is called when a job is completed.
387
389
"""
388
- logger .info ('[Job finished] jobname: %s jobid: %d' % \
390
+ logger .info ('[Job finished] jobname: %s jobid: %d' %
389
391
(self .procs [jobid ]._id , jobid ))
390
392
if self ._status_callback :
391
393
self ._status_callback (self .procs [jobid ], 'end' )
@@ -431,7 +433,7 @@ def _remove_node_dirs(self):
431
433
self .refidx [idx , idx ] = - 1
432
434
outdir = self .procs [idx ]._output_directory ()
433
435
logger .info (('[node dependencies finished] '
434
- 'removing node: %s from directory %s' ) % \
436
+ 'removing node: %s from directory %s' ) %
435
437
(self .procs [idx ]._id , outdir ))
436
438
shutil .rmtree (outdir )
437
439
@@ -563,14 +565,14 @@ def run(self, graph, config, updatehash=False):
563
565
dependencies [idx ] = [nodes .index (prevnode ) for prevnode in
564
566
graph .predecessors (node )]
565
567
self ._submit_graph (pyfiles , dependencies , nodes )
566
-
568
+
567
569
def _get_args (self , node , keywords ):
568
570
values = ()
569
571
for keyword in keywords :
570
572
value = getattr (self , "_" + keyword )
571
573
if hasattr (node , "plugin_args" ) and isinstance (node .plugin_args , dict ) and keyword in node .plugin_args :
572
574
if 'overwrite' in node .plugin_args and node .plugin_args ['overwrite' ]:
573
- value = node .plugin_args [keyword ]
575
+ value = node .plugin_args [keyword ]
574
576
else :
575
577
value += node .plugin_args [keyword ]
576
578
else :
0 commit comments