File tree Expand file tree Collapse file tree 1 file changed +10
-7
lines changed Expand file tree Collapse file tree 1 file changed +10
-7
lines changed Original file line number Diff line number Diff line change @@ -299,7 +299,12 @@ def runner():
299
299
threads .add (thread )
300
300
thread .start ()
301
301
302
-
302
+ def wait_for_next_completion ():
303
+ fetch_iter_lock .acquire ()
304
+ fetch_iter_lock .acquire ()
305
+ fetch_iter_lock .release ()
306
+ if exception is not None :
307
+ raise exception
303
308
304
309
jobiter = t .job (job_order_object , output_callback , ** kwargs )
305
310
@@ -312,17 +317,15 @@ def runner():
312
317
output_dirs .add (r .outdir )
313
318
run_job (r )
314
319
else :
315
- if exception is not None :
316
- raise exception
317
320
if len (threads ):
318
- # wait until one job completes to try fetching jobiter again
319
- fetch_iter_lock .acquire ()
320
- fetch_iter_lock .acquire ()
321
- fetch_iter_lock .release ()
321
+ wait_for_next_completion ()
322
322
else :
323
323
_logger .error ("Workflow cannot make any more progress." )
324
324
break
325
325
326
+ while len (threads ) > 0 :
327
+ wait_for_next_completion ()
328
+
326
329
if final_output and final_output [0 ] and finaloutdir :
327
330
final_output [0 ] = relocateOutputs (final_output [0 ], finaloutdir ,
328
331
output_dirs , kwargs .get ("move_outputs" ),
You can’t perform that action at this time.
0 commit comments