Skip to content

Commit 2ba677a

Browse files
authored
Merge pull request #676 from common-workflow-language/provenance
Provenance Support for cwltool - Single Job Executor
2 parents 639229b + 1a68957 commit 2ba677a

24 files changed

+2539
-127
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ typeshed/2and3/ruamel/yaml
3030

3131
#mypy
3232
.mypy_cache/
33+
bin/
34+
lib/
3335

3436
# Files generated by Makefile
3537
.cache/

CWLProv.rst

Lines changed: 361 additions & 0 deletions
Large diffs are not rendered by default.

Makefile

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ DEBDEVPKGS=pep8 python-autopep8 pylint python-coverage pydocstyle sloccount \
3232
VERSION=1.0.$(shell date +%Y%m%d%H%M%S --utc --date=`git log --first-parent \
3333
--max-count=1 --format=format:%cI`)
3434
mkfile_dir := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
35+
UNAME_S=$(shell uname -s)
36+
ifeq ($(UNAME_S),Linux)
37+
nproc=$(shell nproc)
38+
endif
39+
ifeq ($(UNAME_S),Darwin)
40+
nproc=$(shell sysctl -n hw.physicalcpu)
41+
endif
3542

3643
## all : default task
3744
all:
@@ -105,11 +112,11 @@ format: autopep8
105112
## pylint : run static code analysis on Python code
106113
pylint: $(PYSOURCES)
107114
pylint --msg-template="{path}:{line}: [{msg_id}({symbol}), {obj}] {msg}" \
108-
$^ -j$(shell nproc)|| true
115+
$^ -j$(nproc)|| true
109116

110117
pylint_report.txt: ${PYSOURCES}
111118
pylint --msg-template="{path}:{line}: [{msg_id}({symbol}), {obj}] {msg}" \
112-
$^ -j$(shell nproc)> pylint_report.txt || true
119+
$^ -j$(nproc)> pylint_report.txt || true
113120

114121
diff_pylint_report: pylint_report.txt
115122
diff-quality --violations=pylint pylint_report.txt

cwltool/argparser.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,43 @@ def arg_parser(): # type: () -> argparse.ArgumentParser
116116
type=float,
117117
default=20)
118118

119+
provgroup = parser.add_argument_group("Options for recording provenance "
120+
"information of the execution")
121+
provgroup.add_argument("--provenance",
122+
help="Save provenance to specified folder as a "
123+
"Research Object that captures and aggregates "
124+
"workflow execution and data products.",
125+
type=Text)
126+
127+
provgroup.add_argument("--enable-user-provenance", default=False,
128+
action="store_true",
129+
help="Record user account info as part of provenance.",
130+
dest="user_provenance")
131+
provgroup.add_argument("--disable-user-provenance", default=False,
132+
action="store_false",
133+
help="Do not record user account info in provenance.",
134+
dest="user_provenance")
135+
provgroup.add_argument("--enable-host-provenance", default=False,
136+
action="store_true",
137+
help="Record host info as part of provenance.",
138+
dest="host_provenance")
139+
provgroup.add_argument("--disable-host-provenance", default=False,
140+
action="store_false",
141+
help="Do not record host info in provenance.",
142+
dest="host_provenance")
143+
provgroup.add_argument(
144+
"--orcid", help="Record user ORCID identifier as part of "
145+
"provenance, e.g. https://orcid.org/0000-0002-1825-0097 "
146+
"or 0000-0002-1825-0097. Alternatively the environment variable "
147+
"ORCID may be set.", dest="orcid", default=os.environ.get("ORCID"),
148+
type=Text)
149+
provgroup.add_argument(
150+
"--full-name", help="Record full name of user as part of provenance, "
151+
"e.g. Josiah Carberry. You may need to use shell quotes to preserve "
152+
"spaces. Alternatively the environment variable CWL_FULL_NAME may "
153+
"be set.", dest="cwl_full_name", default=os.environ.get("CWL_FULL_NAME"),
154+
type=Text)
155+
119156
exgroup = parser.add_mutually_exclusive_group()
120157
exgroup.add_argument("--print-rdf", action="store_true",
121158
help="Print corresponding RDF graph for workflow and exit")

cwltool/builder.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import copy
44
import logging
55
from typing import (Any, Callable, Dict, List, # pylint: disable=unused-import
6-
Optional, Set, Text, Type, Union, Tuple)
6+
Optional, Set, Text, Type, Union, Tuple, TYPE_CHECKING)
77

88
from rdflib import Graph, URIRef # pylint: disable=unused-import
99
from rdflib.namespace import OWL, RDFS
@@ -22,7 +22,8 @@
2222
from .stdfsaccess import StdFsAccess # pylint: disable=unused-import
2323
from .utils import (aslist, docker_windows_path_adjust,
2424
json_dumps, onWindows)
25-
25+
if TYPE_CHECKING:
26+
from .provenance import CreateProvProfile # pylint: disable=unused-import
2627
CONTENT_LIMIT = 64 * 1024
2728

2829

@@ -178,6 +179,7 @@ def __init__(self,
178179

179180
# One of "no_listing", "shallow_listing", "deep_listing"
180181
self.loadListing = loadListing
182+
self.prov_obj = None # type: Optional[CreateProvProfile]
181183

182184
self.find_default_container = None # type: Optional[Callable[[], Text]]
183185
self.job_script_provider = job_script_provider

cwltool/command_line_tool.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
import tempfile
1212
from functools import cmp_to_key, partial
1313
from typing import (Any, Callable, Dict, # pylint: disable=unused-import
14-
Generator, List, Optional, Set, Text, Type, Union, cast)
14+
Generator, List, Optional, Set, Text, Type, TYPE_CHECKING,
15+
Union, cast)
1516

1617
import schema_salad.validate as validate
1718
from schema_salad.ref_resolver import file_uri, uri_file_path
@@ -40,7 +41,10 @@
4041
from .utils import (aslist, convert_pathsep_to_unix,
4142
docker_windows_path_adjust, json_dumps, onWindows,
4243
windows_default_container_id)
43-
from .context import LoadingContext, RuntimeContext, getdefault
44+
from .context import (LoadingContext, # pylint: disable=unused-import
45+
RuntimeContext, getdefault)
46+
if TYPE_CHECKING:
47+
from .provenance import CreateProvProfile # pylint: disable=unused-import
4448

4549
ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
4650
ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") # Accept anything
@@ -80,6 +84,7 @@ def __init__(self,
8084
self.outdir = outdir
8185
self.tmpdir = tmpdir
8286
self.script = script
87+
self.prov_obj = None # type: Optional[CreateProvProfile]
8388

8489
def run(self, runtimeContext): # type: (RuntimeContext) -> None
8590
try:
@@ -99,9 +104,11 @@ def job(self,
99104
# type: (...) -> Generator[ExpressionTool.ExpressionJob, None, None]
100105
builder = self._init_job(job_order, runtimeContext)
101106

102-
yield ExpressionTool.ExpressionJob(
107+
job = ExpressionTool.ExpressionJob(
103108
builder, self.tool["expression"], output_callbacks,
104109
self.requirements, self.hints)
110+
job.prov_obj = runtimeContext.prov_obj
111+
yield job
105112

106113

107114
def remove_path(f): # type: (Dict[Text, Any]) -> None
@@ -167,6 +174,7 @@ def __init__(self, job, output_callback, cachebuilder, jobcache):
167174
self.output_callback = output_callback
168175
self.cachebuilder = cachebuilder
169176
self.outdir = jobcache
177+
self.prov_obj = None # type: Optional[CreateProvProfile]
170178

171179
def run(self, runtimeContext):
172180
# type: (RuntimeContext) -> None
@@ -213,8 +221,8 @@ def check_valid_locations(fs_access, ob):
213221
class CommandLineTool(Process):
214222
def __init__(self, toolpath_object, loadingContext):
215223
# type: (Dict[Text, Any], LoadingContext) -> None
216-
super(CommandLineTool, self).__init__(
217-
toolpath_object, loadingContext)
224+
super(CommandLineTool, self).__init__(toolpath_object, loadingContext)
225+
self.prov_obj = loadingContext.prov_obj
218226

219227
def make_job_runner(self,
220228
runtimeContext # type: RuntimeContext
@@ -264,7 +272,7 @@ def job(self,
264272
job_order, # type: Dict[Text, Text]
265273
output_callbacks, # type: Callable[[Any, Any], Any]
266274
runtimeContext # RuntimeContext
267-
):
275+
):
268276
# type: (...) -> Generator[Union[JobBase, CallbackJob], None, None]
269277

270278
require_prefix = ""
@@ -369,6 +377,7 @@ def rm_pending_output_callback(output_callbacks, jobcachepending,
369377
j = self.make_job_runner(runtimeContext)(
370378
builder, builder.job, self.make_path_mapper, self.requirements,
371379
self.hints, jobname)
380+
j.prov_obj = self.prov_obj
372381
j.successCodes = self.tool.get("successCodes")
373382
j.temporaryFailCodes = self.tool.get("temporaryFailCodes")
374383
j.permanentFailCodes = self.tool.get("permanentFailCodes")

cwltool/context.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
from typing import TYPE_CHECKING
1616
if TYPE_CHECKING:
1717
from .process import Process
18+
from .provenance import (ResearchObject, # pylint: disable=unused-import
19+
CreateProvProfile)
1820

1921
class ContextBase(object):
2022
def __init__(self, kwargs=None):
@@ -50,6 +52,12 @@ def __init__(self, kwargs=None):
5052
self.resolver = None
5153
self.fetcher_constructor = None
5254
self.construct_tool_object = default_make_tool
55+
self.research_obj = None # type: Optional[ResearchObject]
56+
self.orcid = None
57+
self.cwl_full_name = None
58+
self.host_provenance = False # type: bool
59+
self.user_provenance = False # type: bool
60+
self.prov_obj = None # type: Optional[CreateProvProfile]
5361

5462
super(LoadingContext, self).__init__(kwargs)
5563

@@ -108,6 +116,12 @@ def __init__(self, kwargs=None):
108116
self.cidfile_dir = None
109117
self.cidfile_prefix = None
110118

119+
self.research_obj = None # type: Optional[ResearchObject]
120+
self.orcid = None
121+
self.cwl_full_name = None
122+
self.process_run_id = None # type: Optional[str]
123+
self.prov_obj = None # type: Optional[CreateProvProfile]
124+
self.reference_locations = {} # type: Dict[Text, Text]
111125
super(RuntimeContext, self).__init__(kwargs)
112126

113127

cwltool/executors.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import logging
21
import os
32
import tempfile
43
import threading
54
from abc import ABCMeta, abstractmethod
5+
import datetime
66
from typing import (Any, Dict, List, Optional, # pylint: disable=unused-import
77
Set, Text, Tuple)
88

@@ -12,14 +12,15 @@
1212

1313
from .builder import Builder # pylint: disable=unused-import
1414
from .errors import WorkflowException
15+
from .loghandler import _logger
1516
from .job import JobBase # pylint: disable=unused-import
1617
from .mutation import MutationManager
18+
from .provenance import CreateProvProfile
1719
from .process import (Process, # pylint: disable=unused-import
1820
cleanIntermediate, relocateOutputs)
1921
from .utils import DEFAULT_TMP_PREFIX
20-
from .context import LoadingContext, RuntimeContext, getdefault
21-
22-
_logger = logging.getLogger("cwltool")
22+
from .context import RuntimeContext, getdefault # pylint: disable=unused-import
23+
from .workflow import Workflow, WorkflowJob, WorkflowJobStep
2324

2425
class JobExecutor(six.with_metaclass(ABCMeta, object)):
2526
""" Abstract base job executor. """
@@ -92,6 +93,18 @@ def execute(self,
9293
cleanIntermediate(self.output_dirs)
9394

9495
if self.final_output and self.final_status:
96+
97+
if runtimeContext.research_obj is not None and \
98+
isinstance(process, (JobBase, Process, WorkflowJobStep,
99+
WorkflowJob)) and process.parent_wf:
100+
process_run_id = None
101+
name = "primary"
102+
process.parent_wf.generate_output_prov(self.final_output[0],
103+
process_run_id, name)
104+
process.parent_wf.document.wasEndedBy(
105+
process.parent_wf.workflow_run_uri, None, process.parent_wf.engine_uuid,
106+
datetime.datetime.now())
107+
process.parent_wf.finalize_prov_profile(name)
95108
return (self.final_output[0], self.final_status[0])
96109
return (None, "permanentFail")
97110

@@ -104,6 +117,18 @@ def run_jobs(self,
104117
logger,
105118
runtimeContext # type: RuntimeContext
106119
): # type: (...) -> None
120+
121+
process_run_id = None # type: Optional[str]
122+
reference_locations = {} # type: Dict[Text,Text]
123+
124+
# define provenance profile for single commandline tool
125+
if not isinstance(process, Workflow) \
126+
and runtimeContext.research_obj is not None:
127+
orcid = runtimeContext.orcid
128+
full_name = runtimeContext.cwl_full_name
129+
process.provenance_object = CreateProvProfile(
130+
runtimeContext.research_obj, orcid, full_name)
131+
process.parent_wf = process.provenance_object
107132
jobiter = process.job(job_order_object, self.output_callback, runtimeContext)
108133

109134
try:
@@ -113,6 +138,21 @@ def run_jobs(self,
113138
job.builder = runtimeContext.builder
114139
if job.outdir:
115140
self.output_dirs.add(job.outdir)
141+
if runtimeContext.research_obj is not None:
142+
if not isinstance(process, Workflow):
143+
runtimeContext.prov_obj = process.provenance_object
144+
else:
145+
runtimeContext.prov_obj = job.prov_obj
146+
assert runtimeContext.prov_obj
147+
process_run_id, reference_locations = \
148+
runtimeContext.prov_obj.evaluate(
149+
process, job, job_order_object,
150+
runtimeContext.make_fs_access,
151+
runtimeContext)
152+
runtimeContext = runtimeContext.copy()
153+
runtimeContext.process_run_id = process_run_id
154+
runtimeContext.reference_locations = \
155+
reference_locations
116156
job.run(runtimeContext)
117157
else:
118158
logger.error("Workflow cannot make any more progress.")

0 commit comments

Comments
 (0)