Skip to content

Commit d210791

Browse files
committed
improve
1 parent fba4728 commit d210791

File tree

3 files changed

+11
-7
lines changed

3 files changed

+11
-7
lines changed

cwltool/factory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class Factory(object):
3535
def __init__(self,
3636
makeTool=workflow.defaultMakeTool, # type: tCallable[[Any], Process]
3737
# should be tCallable[[Dict[Text, Any], Any], Process] ?
38-
executor=main.single_job_executor, # type: tCallable[...,Tuple[Dict[Text,Any], Text]]
38+
executor=main.job_executor, # type: tCallable[...,Tuple[Dict[Text,Any], Text]]
3939
**execkwargs # type: Any
4040
):
4141
# type: (...) -> None

cwltool/job.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import subprocess
1111
import sys
1212
import tempfile
13+
from threading import Lock
1314
from io import open
1415
from typing import (IO, Any, Callable, Dict, Iterable, List, MutableMapping, Text,
1516
Tuple, Union, cast)
@@ -30,6 +31,8 @@
3031

3132
needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
3233

34+
job_output_lock = Lock()
35+
3336
FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1"
3437

3538
SHELL_COMMAND_TEMPLATE = """#!/bin/bash
@@ -265,7 +268,8 @@ def _execute(self, runtime, env, rm_tmpdir=True, move_outputs="move"):
265268
if _logger.isEnabledFor(logging.DEBUG):
266269
_logger.debug(u"[job %s] %s", self.name, json.dumps(outputs, indent=4))
267270

268-
self.output_callback(outputs, processStatus)
271+
with job_output_lock:
272+
self.output_callback(outputs, processStatus)
269273

270274
if self.stagedir and os.path.exists(self.stagedir):
271275
_logger.debug(u"[job %s] Removing input staging directory %s", self.name, self.stagedir)

cwltool/main.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,10 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
245245
return parser
246246

247247

248-
def single_job_executor(t, # type: Process
249-
job_order_object, # type: Dict[Text, Any]
250-
**kwargs # type: Any
251-
):
248+
def job_executor(t, # type: Process
249+
job_order_object, # type: Dict[Text, Any]
250+
**kwargs # type: Any
251+
):
252252
# type: (...) -> Tuple[Dict[Text, Any], Text]
253253
final_output = []
254254
final_status = []
@@ -721,7 +721,7 @@ def supportedCWLversions(enable_dev):
721721

722722
def main(argsl=None, # type: List[str]
723723
args=None, # type: argparse.Namespace
724-
executor=single_job_executor, # type: Callable[..., Tuple[Dict[Text, Any], Text]]
724+
executor=job_executor, # type: Callable[..., Tuple[Dict[Text, Any], Text]]
725725
makeTool=workflow.defaultMakeTool, # type: Callable[..., Process]
726726
selectResources=None, # type: Callable[[Dict[Text, int]], Dict[Text, int]]
727727
stdin=sys.stdin, # type: IO[Any]

0 commit comments

Comments
 (0)