Skip to content

Commit c6e3d5e

Browse files
committed
Support/traverse Directory as prov:Dictionary
.. as a side-effect this code should also now support arbitrary JSON objects as inputs, although this is not been covered by the current test code. Merge branch 'prov-fileset' into prov-nested-wf-fixes
2 parents b083ba7 + d137579 commit c6e3d5e

File tree

3 files changed

+331
-103
lines changed

3 files changed

+331
-103
lines changed

cwltool/provenance.py

Lines changed: 195 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import prov.model as provM
3636
from prov.identifier import Namespace, Identifier
3737
from prov.model import (PROV, ProvDocument, # pylint: disable=unused-import
38-
ProvActivity)
38+
ProvActivity, ProvEntity)
3939

4040
# Disabled due to excessive transitive dependencies
4141
#from networkx.drawing.nx_agraph import graphviz_layout
@@ -150,7 +150,6 @@ def _whoami():
150150
fullname = pwnam.pw_gecos.split(",", 1)[0]
151151
return (username, fullname)
152152

153-
154153
class WritableBagFile(io.FileIO):
155154
'''
156155
writes files in research object
@@ -470,10 +469,10 @@ def copy_job_order(job, job_order_object):
470469
# record provenance of an independent commandline tool execution
471470
self.prospective_prov(job)
472471
customised_job = copy_job_order(job, job_order_object)
473-
relativised_input_object2, reference_locations = \
472+
inputs, reference_locations = \
474473
research_obj.create_job(
475474
customised_job, make_fs_access)
476-
self.declare_artefact(relativised_input_object2, job_order_object)
475+
self.used_artefacts(inputs, self.workflow_run_uri)
477476
name = ""
478477
if hasattr(job, "name"):
479478
name = str(job.name)
@@ -482,10 +481,10 @@ def copy_job_order(job, job_order_object):
482481
elif hasattr(job, "workflow"): # record provenance for the workflow execution
483482
self.prospective_prov(job)
484483
customised_job = copy_job_order(job, job_order_object)
485-
relativised_input_object2, reference_locations = \
484+
inputs, reference_locations = \
486485
research_obj.create_job(
487486
customised_job, make_fs_access)
488-
self.declare_artefact(relativised_input_object2, job_order_object)
487+
self.used_artefacts(inputs, self.workflow_run_uri)
489488
else: # in case of commandline tool execution as part of workflow
490489
name = ""
491490
if hasattr(job, "name"):
@@ -506,7 +505,7 @@ def start_process(self, process_name, process_run_id=None):
506505
self.document.activity(
507506
process_run_id, None, None,
508507
{provM.PROV_TYPE: WFPROV["ProcessRun"],
509-
"prov:label": prov_label})
508+
provM.PROV_LABEL: prov_label})
510509
self.document.wasAssociatedWith(
511510
process_run_id, self.engine_uuid, str("wf:main/" + process_name))
512511
self.document.wasStartedBy(
@@ -517,73 +516,200 @@ def start_process(self, process_name, process_run_id=None):
517516
self.document.activity(
518517
process_run_id, None, None,
519518
{provM.PROV_TYPE: WFPROV["ProcessRun"],
520-
"prov:label": prov_label})
519+
provM.PROV_LABEL: prov_label})
521520
self.document.wasAssociatedWith(
522521
process_run_id, self.engine_uuid, str("wf:main/"+process_name))
523522
self.document.wasStartedBy(
524523
process_run_id, None, self.engine_uuid, datetime.datetime.now(),
525524
None, None)
526525
return process_run_id
527526

527+
def declare_artefact(self, value):
528+
# type: (Any) -> Optional[ProvEntity]
529+
'''
530+
create data artefact entities for all file objects.
531+
'''
532+
533+
if value is None:
534+
# FIXME: If this can happen in CWL, we'll
535+
# need a better way to represent this in PROV
536+
return self.document.entity(CWLPROV["None"],
537+
{ provM.PROV_LABEL: "None" })
538+
539+
elif isinstance(value, (bool, int, float)):
540+
# Typically used in job documents for flags
541+
542+
# FIXME: Make consistent hash URIs for these
543+
# that somehow include the type
544+
# (so "1" != 1 != "1.0" != true)
545+
return self.document.entity(uuid.uuid4().urn,
546+
{ provM.PROV_VALUE: value })
547+
548+
elif isinstance(value, (str, Text, bytes)):
549+
if type(value) == bytes:
550+
# Leave as-is (Note: In Python2 this includes strings
551+
# which will be written with system encoding)
552+
byte_s = io.BytesIO(value)
553+
else:
554+
# Save as string in UTF-8
555+
byte_s = io.BytesIO(str(value).encode(ENCODING))
556+
557+
data_file = self.research_object.add_data_file(byte_s)
558+
# FIXME: Don't naively assume add_data_file uses hash in filename!
559+
data_id = "data:%s" % posixpath.split(data_file)[1]
560+
return self.document.entity(data_id,
561+
{provM.PROV_TYPE: WFPROV["Artifact"],
562+
provM.PROV_VALUE: str(value)})
563+
564+
elif isinstance(value, dict):
565+
# Base case - we found a File we need to update
566+
if value.get("class") == "File":
567+
if 'checksum' in value:
568+
csum = value['checksum']
569+
(method, checksum) = csum.split("$", 1)
570+
if method == "sha1" and \
571+
self.research_object.has_data_file(checksum):
572+
return self.document.entity("data:" + checksum)
573+
574+
if 'location' in value:
575+
# FIXME: cope with file literals.
576+
location = str(value['location'])
577+
# If we made it here, we'll have to add it to the RO
578+
assert self.research_object.make_fs_access
579+
fsaccess = self.research_object.make_fs_access("")
580+
with fsaccess.open(location, "rb") as fhandle:
581+
relative_path = self.research_object.add_data_file(fhandle)
582+
checksum = posixpath.basename(relative_path)
583+
return self.document.entity("data:" + checksum,
584+
{provM.PROV_TYPE: WFPROV["Artifact"]})
585+
586+
if 'content' in value:
587+
# Anonymous file, add content as bytes
588+
return declare_artefact(value["content"])
589+
590+
elif value.get("class") == "Directory":
591+
# Register any nested files/directories
592+
593+
# FIXME: Calculate a hash-like identifier for directory
594+
# so we get same value if it's the same filenames/hashes
595+
# in a different location.
596+
# For now, mint a new UUID to identify this directory, but
597+
# attempt to keep it inside the value dictionary
598+
dir_id = value.setdefault("id",
599+
uuid.uuid4().urn)
600+
coll = self.document.entity(dir_id,
601+
[ (provM.PROV_TYPE, WFPROV["Artifact"]),
602+
(provM.PROV_TYPE, PROV["Collection"]),
603+
(provM.PROV_TYPE, PROV["Dictionary"]),
604+
(provM.PROV_TYPE, CWLPROV["Directory"]),
605+
])
606+
coll_attribs = [] # type ( tuple(Identifier, ProvEntity) )
607+
# FIXME: .listing might not be populated yet - hopefully
608+
# a later call to this method will sort that
609+
for f in value.get("listing", []):
610+
# Declare child-artifacts
611+
entity = self.declare_artefact(f)
612+
# TODO: Add filename to PROV-dictionary
613+
self.document.membership(coll, entity)
614+
# Membership
615+
m = self.document.entity(uuid.uuid4().urn)
616+
# Note: only support PROV-O style dictionary
617+
# https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
618+
# as prov.py do not easily allow PROV-N extensions
619+
m.add_asserted_type(PROV["KeyEntityPair"])
620+
m.add_attributes({
621+
PROV["pairKey"]: f["basename"],
622+
PROV["pairEntity"]: entity
623+
})
624+
coll_attribs.append(
625+
(PROV["hadDictionaryMember"], m))
626+
coll.add_attributes(coll_attribs)
627+
if not coll_attribs:
628+
# Empty directory
629+
coll.add_asserted_type(PROV["EmptyCollection"])
630+
coll.add_asserted_type(PROV["EmptyDictionary"])
631+
return coll
632+
else:
633+
# some other kind of dictionary?
634+
coll = self.document.entity(uuid.uuid4().urn,
635+
[ (provM.PROV_TYPE, WFPROV["Artifact"]),
636+
(provM.PROV_TYPE, PROV["Collection"]),
637+
(provM.PROV_TYPE, PROV["Dictionary"]),
638+
])
639+
640+
if value.get("class"):
641+
_logger.warn("Unknown data class " + value["class"])
642+
# FIXME: The class might be "http://example.com/somethingelse"
643+
coll.add_asserted_type(CWLPROV[value["class"]])
644+
645+
# Let's iterate and recurse
646+
coll_attribs = [] # type ( tuple(Identifier, ProvEntity) )
647+
for (k,v) in cast(Dict, value).items():
648+
v_ent = self.declare_artefact(v)
649+
self.document.membership(coll, v_ent)
650+
m = self.document.entity(uuid.uuid4().urn)
651+
# Note: only support PROV-O style dictionary
652+
# https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
653+
# as prov.py do not easily allow PROV-N extensions
654+
m.add_asserted_type(PROV["KeyEntityPair"])
655+
m.add_attributes({
656+
PROV["pairKey"]: str(k),
657+
PROV["pairEntity"]: v_ent
658+
})
659+
coll_attribs.append(
660+
(PROV["hadDictionaryMember"], m))
661+
coll.add_attributes(coll_attribs)
662+
return coll
663+
664+
# some other kind of Collection?
665+
try:
666+
members = []
667+
for each_input_obj in iter(value):
668+
is_empty = False
669+
# Recurse and register any nested objects
670+
e = self.declare_artefact(each_input_obj)
671+
members.append(e)
672+
673+
# If we reached this, then we were allowed to iterate
674+
coll = self.document.entity(uuid.uuid4().urn,
675+
[ (provM.PROV_TYPE, WFPROV["Artifact"]),
676+
(provM.PROV_TYPE, PROV["Collection"])
677+
])
678+
if not members:
679+
coll.add_asserted_type(PROV["EmptyCollection"])
680+
else:
681+
for m in members:
682+
# FIXME: This won't preserve order, for that
683+
# we would need to use PROV.Dictionary
684+
# with numeric keys
685+
self.document.membership(coll, e)
686+
return coll
687+
except TypeError:
688+
_logger.warning("Unrecognized type %s of %r" %
689+
(type(value), value))
690+
# Let's just fall back to Python repr()
691+
return self.document.entity(uuid.uuid4().urn,
692+
{ provM.PROV_LABEL: repr(value) })
693+
528694
def used_artefacts(self,
529695
job_order, # type: Dict
530-
process_run_id, # type: Optional[str]
531-
name # type: str
696+
process_run_id, # type: str
697+
name=None # type: Optional[str]
532698
): # type: (...) -> None
533699
'''
534700
adds used() for each data artefact
535701
'''
702+
# FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows
703+
base = "main"
704+
if name:
705+
base += "/" + name
536706
for key, value in job_order.items():
537-
prov_role = self.wf_ns["main/%s/%s" % (name, key)]
538-
if isinstance(value, dict) and 'class' in value \
539-
and value['class'] == 'File' and 'location' in value \
540-
and "contents" not in value:
541-
# FIXME: cope with file literals.
542-
# FIXME: process Directory.listing
543-
location = str(value['location'])
544-
545-
if 'checksum' in value:
546-
csum = value['checksum']
547-
_logger.info("[provenance] Used data w/ checksum %s", csum)
548-
(method, checksum) = csum.split("$", 1)
549-
if method == "sha1":
550-
self.document.used(
551-
process_run_id, "data:%s" % checksum,
552-
datetime.datetime.now(), None,
553-
{"prov:role": prov_role})
554-
return # successfully logged
555-
_logger.warning("[provenance] Unknown checksum algorithm %s", method)
556-
else:
557-
_logger.info("[provenance] Used data w/o checksum %s", location)
558-
# FIXME: Store manually
559-
560-
# If we made it here, then we didn't log it correctly with checksum above,
561-
# we'll have to hash it again (and potentially add it to RO)
562-
# TODO: Avoid duplication of code here and in
563-
# _relativise_files()
564-
# TODO: check we don't double-hash everything now
565-
assert self.research_object.make_fs_access
566-
fsaccess = self.research_object.make_fs_access("")
567-
with fsaccess.open(location, "rb") as fhandle:
568-
relative_path = self.research_object.add_data_file(fhandle)
569-
checksum = posixpath.basename(relative_path)
570-
self.document.used(
571-
process_run_id, "data:%s" % checksum,
707+
prov_role = self.wf_ns["%s/%s" % (base, key)]
708+
entity = self.declare_artefact(value)
709+
self.document.used(
710+
process_run_id, entity,
572711
datetime.datetime.now(), None, {"prov:role": prov_role})
573712

574-
else: # add the actual data value in the prov document
575-
# Convert to bytes so we can get a hash (and add to RO)
576-
byte_s = io.BytesIO(str(value).encode(ENCODING))
577-
data_file = self.research_object.add_data_file(byte_s)
578-
# FIXME: Don't naively assume add_data_file uses hash in filename!
579-
data_id = "data:%s" % posixpath.split(data_file)[1]
580-
self.document.entity(
581-
data_id, {provM.PROV_TYPE: WFPROV["Artifact"],
582-
provM.PROV_VALUE: str(value)})
583-
self.document.used(
584-
process_run_id, data_id, datetime.datetime.now(), None,
585-
{"prov:role": prov_role})
586-
587713
def generate_output_prov(self,
588714
final_output, # type: Optional[Dict[Text, Any]]
589715
process_run_id, # type: Optional[str]
@@ -657,35 +783,6 @@ def dict_output(key, current_dict):
657783
rel_path = self.research_object.add_data_file(cwl_output_file, when)
658784
_logger.info(u"[provenance] Adding output file %s to RO", rel_path)
659785

660-
661-
def declare_artefact(self, relativised_input_obj, job_order_object):
662-
# type: (Any, Dict) -> None
663-
'''
664-
create data artefact entities for all file objects.
665-
'''
666-
if isinstance(relativised_input_obj, dict):
667-
# Base case - we found a File we need to update
668-
if relativised_input_obj.get("class") == "File":
669-
#create an artefact
670-
shahash = "data:"+relativised_input_obj["location"].split("/")[-1]
671-
self.document.entity(shahash, {provM.PROV_TYPE:WFPROV["Artifact"]})
672-
673-
for each_input_obj in relativised_input_obj.values():
674-
self.declare_artefact(each_input_obj, job_order_object)
675-
return
676-
677-
if isinstance(relativised_input_obj, (str, Text)):
678-
# Just a string value, no need to iterate further
679-
# FIXME: Should these be added as PROV entities as well?
680-
return
681-
682-
try:
683-
for each_input_obj in iter(relativised_input_obj):
684-
# Recurse and rewrite any nested File objects
685-
self.declare_artefact(each_input_obj, job_order_object)
686-
except TypeError:
687-
pass
688-
689786
def prospective_prov(self, job):
690787
# type: (Any) -> None
691788
'''
@@ -718,7 +815,7 @@ def prospective_prov(self, job):
718815
# TODO: Declare roles/parameters as well
719816

720817
def activity_has_provenance(self, activity, *prov_ids):
721-
# type: (str, List[Identifier]) -> None
818+
# type: (str, *List[Identifier]) -> None
722819

723820
# Add http://www.w3.org/TR/prov-aq/ relations to nested PROV files
724821
# NOTE: The below will only work if the corresponding metadata/provenance arcp URI
@@ -809,7 +906,7 @@ def __init__(self, temp_prefix_ro="tmp", orcid=None, full_name=None):
809906
self.bagged_size = {} # type: Dict
810907
self.tagfiles = set() # type: Set
811908
self._file_provenance = {} # type: Dict
812-
self.annotations = [] # type: Dict
909+
self.annotations = [] # type: List[Dict]
813910

814911
# These should be replaced by generate_prov_doc when workflow/run IDs are known:
815912
self.engine_uuid = "urn:uuid:%s" % uuid.uuid4()
@@ -1217,6 +1314,9 @@ def packed_workflow(self, packed): # type: (Text) -> None
12171314
write_pack.write(packed.encode(ENCODING))
12181315
_logger.info(u"[provenance] Added packed workflow: %s", rel_path)
12191316

1317+
def has_data_file(self, sha1hash):
1318+
folder = os.path.join(self.folder, DATA, sha1hash[0:2])
1319+
return os.path.isfile(os.path.join(folder, sha1hash))
12201320

12211321
def add_data_file(self, from_fp, when=None):
12221322
# type: (IO, Optional[datetime.datetime]) -> Text
@@ -1347,6 +1447,8 @@ def create_job(self,
13471447
if isinstance(value, dict):
13481448
if value.get("class") == "File":
13491449
relativised_input_objecttemp[key] = value
1450+
if value.get("class") == "Directory":
1451+
relativised_input_objecttemp[key] = value
13501452
else:
13511453
relativised_input_objecttemp[key] = value
13521454
relativised_input_object.update(
@@ -1376,6 +1478,10 @@ def _relativise_files(self, structure, relativised_input_objecttemp2):
13761478
structure["checksum"] = "sha1$%s" % posixpath.basename(relative_path)
13771479
relativised_input_objecttemp2[ref_location] = structure["location"]
13781480

1481+
if structure.get("class") == "Directory":
1482+
# TODO:
1483+
pass
1484+
13791485
for val in structure.values():
13801486
self._relativise_files(val, relativised_input_objecttemp2)
13811487
return

0 commit comments

Comments
 (0)