Skip to content

Commit ccb183b

Browse files
committed
add parallel execution
1 parent 7457766 commit ccb183b

File tree

1 file changed

+43
-17
lines changed

1 file changed

+43
-17
lines changed

cwltool/main.py

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import requests
1818
import six
1919
import string
20+
import threading
2021

2122
import ruamel.yaml as yaml
2223
import schema_salad.validate as validate
@@ -275,27 +276,52 @@ def output_callback(out, processStatus):
275276
for req in jobReqs:
276277
t.requirements.append(req)
277278

278-
jobiter = t.job(job_order_object,
279-
output_callback,
280-
**kwargs)
279+
fetch_iter_lock = threading.Lock()
280+
threads = set()
281+
exception = None
281282

282-
try:
283-
for r in jobiter:
284-
if r:
285-
builder = kwargs.get("builder", None) # type: Builder
286-
if builder is not None:
287-
r.builder = builder
288-
if r.outdir:
289-
output_dirs.add(r.outdir)
290-
r.run(**kwargs)
283+
def run_job(job):
284+
def runner():
285+
try:
286+
job.run(**kwargs)
287+
except WorkflowException as e:
288+
exception = e
289+
except Exception as e:
290+
exception = WorkflowException(Text(e))
291+
292+
threads.remove(thread)
293+
294+
if fetch_iter_lock.locked():
295+
fetch_iter_lock.release()
296+
297+
thread = threading.Thread(target=runner)
298+
thread.daemon = True
299+
threads.add(thread)
300+
thread.start()
301+
302+
303+
304+
jobiter = t.job(job_order_object, output_callback, **kwargs)
305+
306+
for r in jobiter:
307+
if r:
308+
builder = kwargs.get("builder", None) # type: Builder
309+
if builder is not None:
310+
r.builder = builder
311+
if r.outdir:
312+
output_dirs.add(r.outdir)
313+
run_job(r)
314+
else:
315+
if exception is not None:
316+
raise exception
317+
if len(threads):
318+
# wait until one job completes to try fetching jobiter again
319+
fetch_iter_lock.acquire()
320+
fetch_iter_lock.acquire()
321+
fetch_iter_lock.release()
291322
else:
292323
_logger.error("Workflow cannot make any more progress.")
293324
break
294-
except WorkflowException:
295-
raise
296-
except Exception as e:
297-
_logger.exception("Got workflow error")
298-
raise WorkflowException(Text(e))
299325

300326
if final_output and final_output[0] and finaloutdir:
301327
final_output[0] = relocateOutputs(final_output[0], finaloutdir,

0 commit comments

Comments
 (0)