Skip to content

Commit 4e61038

Browse files
committed
maintenance: refactor code in provenance
Remove make_fs_access from ProvenanceProfile Decouple adding start and stop times to profiles from the actual time it happens Decouple adding prospective provenance from the start of a process
1 parent 23b93e4 commit 4e61038

File tree

11 files changed

+72
-76
lines changed

11 files changed

+72
-76
lines changed

README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,6 @@ Workflow.make_workflow_step
698698
::
699699

700700
make_workflow_step(toolpath_object, pos, loadingContext, parentworkflowProv)
701-
(Dict[Text, Any], int, LoadingContext, Optional[CreateProvProfile]) -> WorkflowStep
701+
(Dict[Text, Any], int, LoadingContext, Optional[ProvenanceProfile]) -> WorkflowStep
702702

703703
Create and return a workflow step object.

cwltool/builder.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929

3030
if TYPE_CHECKING:
31-
from .provenance import CreateProvProfile # pylint: disable=unused-import
31+
from .provenance import ProvenanceProfile # pylint: disable=unused-import
3232
CONTENT_LIMIT = 64 * 1024
3333

3434

@@ -162,7 +162,7 @@ def __init__(self,
162162
self.stagedir = stagedir
163163

164164
self.pathmapper = None # type: Optional[PathMapper]
165-
self.prov_obj = None # type: Optional[CreateProvProfile]
165+
self.prov_obj = None # type: Optional[ProvenanceProfile]
166166
self.find_default_container = None # type: Optional[Callable[[], Text]]
167167

168168
def build_job_script(self, commands):

cwltool/command_line_tool.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
docker_windows_path_adjust, json_dumps, onWindows,
5050
random_outdir, windows_default_container_id)
5151
if TYPE_CHECKING:
52-
from .provenance import CreateProvProfile # pylint: disable=unused-import
52+
from .provenance import ProvenanceProfile # pylint: disable=unused-import
5353

5454
ACCEPTLIST_EN_STRICT_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
5555
ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*") # Accept anything
@@ -89,7 +89,7 @@ def __init__(self,
8989
self.outdir = outdir
9090
self.tmpdir = tmpdir
9191
self.script = script
92-
self.prov_obj = None # type: Optional[CreateProvProfile]
92+
self.prov_obj = None # type: Optional[ProvenanceProfile]
9393

9494
def run(self, runtimeContext): # type: (RuntimeContext) -> None
9595
try:
@@ -179,7 +179,7 @@ def __init__(self, job, output_callback, cachebuilder, jobcache):
179179
self.output_callback = output_callback
180180
self.cachebuilder = cachebuilder
181181
self.outdir = jobcache
182-
self.prov_obj = None # type: Optional[CreateProvProfile]
182+
self.prov_obj = None # type: Optional[ProvenanceProfile]
183183

184184
def run(self, runtimeContext):
185185
# type: (RuntimeContext) -> None

cwltool/context.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
if TYPE_CHECKING:
2222
from .process import Process
2323
from .provenance import (ResearchObject, # pylint: disable=unused-import
24-
CreateProvProfile)
24+
ProvenanceProfile)
2525

2626
class ContextBase(object):
2727
def __init__(self, kwargs=None):
@@ -63,7 +63,7 @@ def __init__(self, kwargs=None):
6363
self.cwl_full_name = "" # type: str
6464
self.host_provenance = False # type: bool
6565
self.user_provenance = False # type: bool
66-
self.prov_obj = None # type: Optional[CreateProvProfile]
66+
self.prov_obj = None # type: Optional[ProvenanceProfile]
6767

6868
super(LoadingContext, self).__init__(kwargs)
6969

@@ -131,7 +131,7 @@ def __init__(self, kwargs=None):
131131
self.orcid = '' # type: str
132132
self.cwl_full_name = "" # type: str
133133
self.process_run_id = None # type: Optional[str]
134-
self.prov_obj = None # type: Optional[CreateProvProfile]
134+
self.prov_obj = None # type: Optional[ProvenanceProfile]
135135
super(RuntimeContext, self).__init__(kwargs)
136136

137137

cwltool/executors.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from .mutation import MutationManager
2222
from .process import Process # pylint: disable=unused-import
2323
from .process import cleanIntermediate, relocateOutputs
24-
from .provenance import CreateProvProfile
24+
from .provenance import ProvenanceProfile
2525
from .utils import DEFAULT_TMP_PREFIX
2626
from .workflow import Workflow, WorkflowJob, WorkflowJobStep
2727

@@ -134,7 +134,7 @@ def run_jobs(self,
134134
# define provenance profile for single commandline tool
135135
if not isinstance(process, Workflow) \
136136
and runtime_context.research_obj is not None:
137-
process.provenance_object = CreateProvProfile(
137+
process.provenance_object = ProvenanceProfile(
138138
runtime_context.research_obj,
139139
full_name=runtime_context.cwl_full_name,
140140
host_provenance=False,
@@ -159,11 +159,12 @@ def run_jobs(self,
159159
else:
160160
runtime_context.prov_obj = job.prov_obj
161161
assert runtime_context.prov_obj
162-
process_run_id = \
163-
runtime_context.prov_obj.evaluate(
164-
process, job, job_order_object,
165-
runtime_context.make_fs_access,
166-
runtime_context.research_obj)
162+
runtime_context.prov_obj.evaluate(
163+
process, job, job_order_object,
164+
runtime_context.research_obj)
165+
process_run_id =\
166+
runtime_context.prov_obj.record_process_start(
167+
process, job)
167168
runtime_context = runtime_context.copy()
168169
runtime_context.process_run_id = process_run_id
169170
job.run(runtime_context)

cwltool/job.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
subprocess)
4141

4242
if TYPE_CHECKING:
43-
from .provenance import CreateProvProfile # pylint: disable=unused-import
43+
from .provenance import ProvenanceProfile # pylint: disable=unused-import
4444
needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""")
4545

4646
FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1"
@@ -194,8 +194,8 @@ def __init__(self,
194194
self.generatefiles = {"class": "Directory", "listing": [], "basename": ""} # type: Directory
195195
self.stagedir = None # type: Optional[Text]
196196
self.inplace_update = False
197-
self.prov_obj = None # type: Optional[CreateProvProfile]
198-
self.parent_wf = None # type: Optional[CreateProvProfile]
197+
self.prov_obj = None # type: Optional[ProvenanceProfile]
198+
self.parent_wf = None # type: Optional[ProvenanceProfile]
199199
self.timelimit = None # type: Optional[int]
200200
self.networkaccess = False # type: bool
201201

@@ -344,11 +344,8 @@ def _execute(self,
344344
if runtimeContext.research_obj is not None and self.prov_obj is not None and \
345345
runtimeContext.process_run_id is not None:
346346
#creating entities for the outputs produced by each step (in the provenance document)
347-
self.prov_obj.generate_output_prov(
348-
outputs, runtimeContext.process_run_id, str(self.name))
349-
self.prov_obj.document.wasEndedBy(
350-
runtimeContext.process_run_id, None, self.prov_obj.workflow_run_uri,
351-
datetime.datetime.now())
347+
self.prov_obj.record_process_end(str(self.name), runtimeContext.process_run_id,
348+
outputs, datetime.datetime.now())
352349
if processStatus != "success":
353350
_logger.warning(u"[job %s] completed %s", self.name, processStatus)
354351
else:

cwltool/main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ def main(argsl=None, # type: List[str]
583583
if not args.compute_checksum:
584584
_logger.error("--provenance incompatible with --no-compute-checksum")
585585
return 1
586-
ro = ResearchObject(
586+
ro = ResearchObject(runtimeContext.make_fs_access,
587587
temp_prefix_ro=args.tmpdir_prefix, orcid=args.orcid,
588588
full_name=args.cwl_full_name)
589589
runtimeContext.research_obj = ro
@@ -808,7 +808,8 @@ def my_represent_none(self, data): # pylint: disable=unused-argument
808808

809809
if out is not None:
810810
if runtimeContext.research_obj is not None:
811-
runtimeContext.research_obj.create_job(out, None, True)
811+
runtimeContext.research_obj.create_job(
812+
out, None, True)
812813

813814
def loc_to_path(obj):
814815
for field in ("path", "nameext", "nameroot", "dirname"):

cwltool/process.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
except ImportError:
5353
from scandir import scandir # type: ignore
5454
if TYPE_CHECKING:
55-
from .provenance import CreateProvProfile # pylint: disable=unused-import
55+
from .provenance import ProvenanceProfile # pylint: disable=unused-import
5656

5757

5858
class LogAsDebugFilter(logging.Filter):
@@ -453,8 +453,8 @@ def __init__(self,
453453
loadingContext # type: LoadingContext
454454
): # type: (...) -> None
455455
self.metadata = getdefault(loadingContext.metadata, {}) # type: Dict[Text,Any]
456-
self.provenance_object = None # type: Optional[CreateProvProfile]
457-
self.parent_wf = None # type: Optional[CreateProvProfile]
456+
self.provenance_object = None # type: Optional[ProvenanceProfile]
457+
self.parent_wf = None # type: Optional[ProvenanceProfile]
458458
global SCHEMA_FILE, SCHEMA_DIR, SCHEMA_ANY # pylint: disable=global-statement
459459
if SCHEMA_FILE is None or SCHEMA_ANY is None or SCHEMA_DIR is None:
460460
get_schema("v1.0")

cwltool/provenance.py

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ def _valid_orcid(orcid): # type: (Optional[Text]) -> Text
301301
return u"https://orcid.org/%s" % orcid_num
302302

303303

304-
class CreateProvProfile():
304+
class ProvenanceProfile():
305305
"""
306306
Provenance profile.
307307
@@ -338,7 +338,7 @@ def __init__(self,
338338
self.generate_prov_doc()
339339

340340
def __str__(self):
341-
return "CreateProvProfile <%s> in <%s>" % (
341+
return "ProvenanceProfile <%s> in <%s>" % (
342342
self.workflow_run_uri, self.research_object)
343343

344344
def generate_prov_doc(self):
@@ -448,41 +448,34 @@ def evaluate(self,
448448
process, # type: Process
449449
job, # type: Any
450450
job_order_object, # type: Dict[Text, Text]
451-
make_fs_access, # type: Callable[[Text], StdFsAccess]
452451
research_obj # type: ResearchObject
453-
): # type: (...) -> Optional[str]
454-
"""Evaluate the nature of job and initialize the activity start."""
455-
process_run_id = None
456-
research_obj.make_fs_access = make_fs_access
452+
): # type: (...) -> None
453+
"""Evaluate the nature of job"""
457454
if not hasattr(process, "steps"):
458-
# record provenance of an independent commandline tool execution
455+
# record provenance of independent commandline tool executions
459456
self.prospective_prov(job)
460457
customised_job = copy_job_order(job, job_order_object)
461458
self.used_artefacts(customised_job, self.workflow_run_uri)
462459
research_obj.create_job(customised_job, job)
463-
# self.used_artefacts(inputs, self.workflow_run_uri)
464-
name = ""
465-
if hasattr(job, "name"):
466-
name = str(job.name)
467-
process_name = urllib.parse.quote(name, safe=":/,#")
468-
process_run_id = self.workflow_run_uri
469460
elif hasattr(job, "workflow"):
470-
# record provenance for the workflow execution
461+
# record provenance of workflow executions
471462
self.prospective_prov(job)
472463
customised_job = copy_job_order(job, job_order_object)
473464
self.used_artefacts(customised_job, self.workflow_run_uri)
474-
# inputs = research_obj.create_job(customised_job)
475-
# self.used_artefacts(inputs, self.workflow_run_uri)
476-
else: # in case of commandline tool execution as part of workflow
477-
name = ""
478-
if hasattr(job, "name"):
479-
name = str(job.name)
465+
466+
def record_process_start(self, process, job, process_run_id=None):
467+
# type: (Process, Any, str) -> Optional[str]
468+
if not hasattr(process, 'steps'):
469+
process_run_id = self.workflow_run_uri
470+
elif not hasattr(job, 'workflow'):
471+
# commandline tool execution as part of workflow
472+
name = str(job.name) if hasattr(job, 'name') else ''
480473
process_name = urllib.parse.quote(name, safe=":/,#")
481-
process_run_id = self.start_process(process_name)
474+
process_run_id = self.start_process(process_name, datetime.datetime.now())
482475
return process_run_id
483476

484-
def start_process(self, process_name, process_run_id=None):
485-
# type: (Any, str, str) -> str
477+
def start_process(self, process_name, when, process_run_id=None):
478+
# type: (Any, Text, datetime.datetime, Optional[str]) -> str
486479
"""Record the start of each Process."""
487480
if process_run_id is None:
488481
process_run_id = uuid.uuid4().urn
@@ -495,11 +488,16 @@ def start_process(self, process_name, process_run_id=None):
495488
process_run_id, self.engine_uuid, str("wf:main/" + process_name))
496489
self.document.wasStartedBy(
497490
process_run_id, None, self.workflow_run_uri,
498-
datetime.datetime.now(), None, None)
491+
when, None, None)
499492
return process_run_id
500493

494+
def record_process_end(self, process_name, process_run_id, outputs, when):
495+
# type: (Text, str, Any, datetime.datetime) -> None
496+
self.generate_output_prov(outputs, process_run_id, process_name)
497+
self.document.wasEndedBy(process_run_id, None, self.workflow_run_uri, when)
498+
501499
def declare_file(self, value):
502-
# type: (MutableMapping) -> Tuple[ProvEntity,ProvEntity,str]
500+
# type: (MutableMapping) -> Tuple[ProvEntity, ProvEntity, str]
503501
if value["class"] != "File":
504502
raise ValueError("Must have class:File: %s" % value)
505503
# Need to determine file hash aka RO filename
@@ -856,11 +854,9 @@ def prospective_prov(self, job):
856854
"prov:type": PROV["Plan"],
857855
"prov:label":"Prospective provenance"})
858856

859-
steps = []
860857
for step in job.steps:
861-
stepnametemp = "wf:main/"+str(step.name)[5:]
858+
stepnametemp = "wf:main/" + str(step.name)[5:]
862859
stepname = urllib.parse.quote(stepnametemp, safe=":/,#")
863-
steps.append(stepname)
864860
step = self.document.entity(
865861
stepname, {provM.PROV_TYPE: WFDESC["Process"],
866862
"prov:type": PROV["Plan"]})
@@ -947,9 +943,10 @@ def finalize_prov_profile(self, name):
947943
class ResearchObject():
948944
"""CWLProv Research Object."""
949945

950-
def __init__(self, temp_prefix_ro="tmp", orcid='', full_name=''):
951-
# type: (str, Text, Text) -> None
946+
def __init__(self, make_fs_access, temp_prefix_ro="tmp", orcid='', full_name=''):
947+
# type: (Callable[[Text], StdFsAccess], str, Text, Text) -> None
952948

949+
self.make_fs_access = make_fs_access
953950
self.temp_prefix = temp_prefix_ro
954951
self.orcid = '' if not orcid else _valid_orcid(orcid)
955952
self.full_name = full_name
@@ -969,8 +966,6 @@ def __init__(self, temp_prefix_ro="tmp", orcid='', full_name=''):
969966
self.base_uri = "arcp://uuid,%s/" % self.ro_uuid
970967
self.cwltool_version = "cwltool %s" % versionstring().split()[-1]
971968
##
972-
# This function will be added by create_job()
973-
self.make_fs_access = None # type: Optional[Callable[[Text], StdFsAccess]]
974969
self.relativised_input_object = {} # type: Dict[Any, Any]
975970

976971
self._initialize()
@@ -1499,8 +1494,8 @@ def _add_to_bagit(self, rel_path, **checksums):
14991494
self.add_to_manifest(rel_path, checksums)
15001495

15011496
def create_job(self,
1502-
builder_job, # type: Dict[Text, Any]
1503-
wf_job=None, # type: Callable[[Dict[Text, Text], Callable[[Any, Any], Any], RuntimeContext], Generator[Any, None, None]]
1497+
builder_job, # type: Dict[Text, Any]
1498+
wf_job=None, # type: Callable[[Dict[Text, Text], Callable[[Any, Any], Any], RuntimeContext], Generator[Any, None, None]]
15041499
is_output=False
15051500
): # type: (...) -> Dict
15061501
#TODO customise the file

0 commit comments

Comments
 (0)