Skip to content

Commit 0005b46

Browse files
committed
Provenance: Register Directory in PROV
1 parent 836d684 commit 0005b46

File tree

1 file changed

+178
-82
lines changed

1 file changed

+178
-82
lines changed

cwltool/provenance.py

Lines changed: 178 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import prov.model as provM
3636
from prov.identifier import Namespace, Identifier
3737
from prov.model import (PROV, ProvDocument, # pylint: disable=unused-import
38-
ProvActivity)
38+
ProvActivity, ProvEntity)
3939

4040
# Disabled due to excessive transitive dependencies
4141
#from networkx.drawing.nx_agraph import graphviz_layout
@@ -150,7 +150,6 @@ def _whoami():
150150
fullname = pwnam.pw_gecos.split(",", 1)[0]
151151
return (username, fullname)
152152

153-
154153
class WritableBagFile(io.FileIO):
155154
'''
156155
writes files in research object
@@ -473,7 +472,8 @@ def copy_job_order(job, job_order_object):
473472
relativised_input_object2, reference_locations = \
474473
research_obj.create_job(
475474
customised_job, make_fs_access)
476-
self.declare_artefact(relativised_input_object2, job_order_object)
475+
entity = self.declare_artefact(relativised_input_object2)
476+
self.document.used(self.engine_uuid, entity)
477477
name = ""
478478
if hasattr(job, "name"):
479479
name = str(job.name)
@@ -485,7 +485,8 @@ def copy_job_order(job, job_order_object):
485485
relativised_input_object2, reference_locations = \
486486
research_obj.create_job(
487487
customised_job, make_fs_access)
488-
self.declare_artefact(relativised_input_object2, job_order_object)
488+
entity = self.declare_artefact(relativised_input_object2)
489+
self.document.used(self.engine_uuid, entity)
489490
else: # in case of commandline tool execution as part of workflow
490491
name = ""
491492
if hasattr(job, "name"):
@@ -506,7 +507,7 @@ def start_process(self, process_name, process_run_id=None):
506507
self.document.activity(
507508
process_run_id, None, None,
508509
{provM.PROV_TYPE: WFPROV["ProcessRun"],
509-
"prov:label": prov_label})
510+
provM.PROV_LABEL: prov_label})
510511
self.document.wasAssociatedWith(
511512
process_run_id, self.engine_uuid, str("wf:main/" + process_name))
512513
self.document.wasStartedBy(
@@ -517,14 +518,172 @@ def start_process(self, process_name, process_run_id=None):
517518
self.document.activity(
518519
process_run_id, None, None,
519520
{provM.PROV_TYPE: WFPROV["ProcessRun"],
520-
"prov:label": prov_label})
521+
provM.PROV_LABEL: prov_label})
521522
self.document.wasAssociatedWith(
522523
process_run_id, self.engine_uuid, str("wf:main/"+process_name))
523524
self.document.wasStartedBy(
524525
process_run_id, None, self.engine_uuid, datetime.datetime.now(),
525526
None, None)
526527
return process_run_id
527528

529+
def declare_artefact(self, value):
530+
# type: (Any) -> Optional[ProvEntity]
531+
'''
532+
create data artefact entities for all file objects.
533+
'''
534+
535+
if value is None:
536+
# FIXME: If this can happen in CWL, we'll
537+
# need a better way to represent this in PROV
538+
return self.document.entity(CWLPROV["None"],
539+
{ provM.PROV_LABEL: "None" })
540+
541+
if isinstance(value, (str, Text, bytes)):
542+
if type(value) == bytes:
543+
# Leave as-is (Note: In Python2 this includes strings
544+
# which will be written with system encoding)
545+
byte_s = io.BytesIO(value)
546+
else:
547+
# Save as string in UTF-8
548+
byte_s = io.BytesIO(str(value).encode(ENCODING))
549+
550+
data_file = self.research_object.add_data_file(byte_s)
551+
# FIXME: Don't naively assume add_data_file uses hash in filename!
552+
data_id = "data:%s" % posixpath.split(data_file)[1]
553+
return self.document.entity(data_id,
554+
{provM.PROV_TYPE: WFPROV["Artifact"],
555+
provM.PROV_VALUE: str(value)})
556+
557+
if isinstance(value, dict):
558+
# Base case - we found a File we need to update
559+
if value.get("class") == "File":
560+
if 'checksum' in value:
561+
csum = value['checksum']
562+
(method, checksum) = csum.split("$", 1)
563+
if method == "sha1" and \
564+
self.research_object.has_data_file(checksum):
565+
return self.document.entity("data:" + checksum)
566+
567+
if 'location' in value:
568+
# FIXME: cope with file literals.
569+
location = str(value['location'])
570+
# If we made it here, we'll have to add it to the RO
571+
assert self.research_object.make_fs_access
572+
fsaccess = self.research_object.make_fs_access("")
573+
with fsaccess.open(location, "rb") as fhandle:
574+
relative_path = self.research_object.add_data_file(fhandle)
575+
checksum = posixpath.basename(relative_path)
576+
return self.document.entity("data:" + checksum,
577+
{provM.PROV_TYPE: WFPROV["Artifact"]})
578+
579+
if 'content' in value:
580+
# Anonymous file, add content as bytes
581+
return declare_artefact(value["content"])
582+
583+
elif value.get("class") == "Directory":
584+
# Register any nested files/directories
585+
586+
# FIXME: Calculate a hash-like identifier for directory
587+
# so we get same value if it's the same filenames/hashes
588+
# in a different location.
589+
# For now, mint a new UUID to identify this directory, but
590+
# attempt to keep it inside the value dictionary
591+
dir_id = value.setdefault("id",
592+
uuid.uuid4().urn)
593+
coll = self.document.entity(dir_id,
594+
[ (provM.PROV_TYPE, WFPROV["Artifact"]),
595+
(provM.PROV_TYPE, PROV["Collection"]),
596+
(provM.PROV_TYPE, PROV["Dictionary"]),
597+
(provM.PROV_TYPE, CWLPROV["Directory"]),
598+
])
599+
coll_attribs = [] # type ( tuple(Identifier, ProvEntity) )
600+
# FIXME: .listing might not be populated yet - hopefully
601+
# a later call to this method will sort that
602+
for f in value.get("listing", []):
603+
# Declare child-artifacts
604+
entity = self.declare_artefact(f)
605+
# TODO: Add filename to PROV-dictionary
606+
self.document.membership(coll, entity)
607+
# Membership
608+
m = self.document.entity(uuid.uuid4().urn)
609+
# Note: only support PROV-O style dictionary
610+
# https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
611+
# as prov.py do not easily allow PROV-N extensions
612+
m.add_asserted_type(PROV["KeyEntityPair"])
613+
m.add_attributes({
614+
PROV["pairKey"]: f["basename"],
615+
PROV["pairEntity"]: entity
616+
})
617+
coll_attribs.append(
618+
(PROV["hadDictionaryMember"], m))
619+
coll.add_attributes(coll_attribs)
620+
if not coll_attribs:
621+
# Empty directory
622+
coll.add_asserted_type(PROV["EmptyCollection"])
623+
coll.add_asserted_type(PROV["EmptyDictionary"])
624+
return coll
625+
else:
626+
# some other kind of dictionary?
627+
coll = self.document.entity(uuid.uuid4().urn,
628+
[ (provM.PROV_TYPE, WFPROV["Artifact"]),
629+
(provM.PROV_TYPE, PROV["Collection"]),
630+
(provM.PROV_TYPE, PROV["Dictionary"]),
631+
])
632+
633+
if value.get("class"):
634+
_logger.warn("Unknown data class " + value["class"])
635+
# FIXME: The class might be "http://example.com/somethingelse"
636+
coll.add_asserted_type(CWLPROV[value["class"]])
637+
638+
# Let's iterate and recurse
639+
coll_attribs = [] # type ( tuple(Identifier, ProvEntity) )
640+
for (k,v) in cast(Dict, value).items():
641+
v_ent = self.declare_artefact(v)
642+
self.document.membership(coll, v_ent)
643+
m = self.document.entity(uuid.uuid4().urn)
644+
# Note: only support PROV-O style dictionary
645+
# https://www.w3.org/TR/prov-dictionary/#dictionary-ontological-definition
646+
# as prov.py do not easily allow PROV-N extensions
647+
m.add_asserted_type(PROV["KeyEntityPair"])
648+
m.add_attributes({
649+
PROV["pairKey"]: str(k),
650+
PROV["pairEntity"]: v_ent
651+
})
652+
coll_attribs.append(
653+
(PROV["hadDictionaryMember"], m))
654+
coll.add_attributes(coll_attribs)
655+
return coll
656+
657+
# some other kind of Collection?
658+
try:
659+
members = []
660+
for each_input_obj in iter(value):
661+
is_empty = False
662+
# Recurse and register any nested objects
663+
e = self.declare_artefact(each_input_obj)
664+
members.append(e)
665+
666+
# If we reached this, then we were allowed to iterate
667+
coll = self.document.entity(uuid.uuid4().urn,
668+
[ (provM.PROV_TYPE, WFPROV["Artifact"]),
669+
(provM.PROV_TYPE, PROV["Collection"])
670+
])
671+
if not members:
672+
coll.add_asserted_type(PROV["EmptyCollection"])
673+
else:
674+
for m in members:
675+
# FIXME: This won't preserve order, for that
676+
# we would need to use PROV.Dictionary
677+
# with numeric keys
678+
self.document.membership(coll, e)
679+
return coll
680+
except TypeError:
681+
_logger.warning("Unrecognized type %s of %r" %
682+
(type(value), value))
683+
# Let's just fall back to Python repr()
684+
return self.document.entity(uuid.uuid4().urn,
685+
{ provM.PROV_LABEL: repr(value) })
686+
528687
def used_artefacts(self,
529688
job_order, # type: Dict
530689
process_run_id, # type: Optional[str]
@@ -534,56 +693,13 @@ def used_artefacts(self,
534693
adds used() for each data artefact
535694
'''
536695
for key, value in job_order.items():
696+
# FIXME: Use workflow name in packed.cwl, "main" is wrong for nested workflows
537697
prov_role = self.wf_ns["main/%s/%s" % (name, key)]
538-
if isinstance(value, dict) and 'class' in value \
539-
and value['class'] == 'File' and 'location' in value \
540-
and "contents" not in value:
541-
# FIXME: cope with file literals.
542-
# FIXME: process Directory.listing
543-
location = str(value['location'])
544-
545-
if 'checksum' in value:
546-
csum = value['checksum']
547-
_logger.info("[provenance] Used data w/ checksum %s", csum)
548-
(method, checksum) = csum.split("$", 1)
549-
if method == "sha1":
550-
self.document.used(
551-
process_run_id, "data:%s" % checksum,
552-
datetime.datetime.now(), None,
553-
{"prov:role": prov_role})
554-
return # successfully logged
555-
_logger.warning("[provenance] Unknown checksum algorithm %s", method)
556-
else:
557-
_logger.info("[provenance] Used data w/o checksum %s", location)
558-
# FIXME: Store manually
559-
560-
# If we made it here, then we didn't log it correctly with checksum above,
561-
# we'll have to hash it again (and potentially add it to RO)
562-
# TODO: Avoid duplication of code here and in
563-
# _relativise_files()
564-
# TODO: check we don't double-hash everything now
565-
assert self.research_object.make_fs_access
566-
fsaccess = self.research_object.make_fs_access("")
567-
with fsaccess.open(location, "rb") as fhandle:
568-
relative_path = self.research_object.add_data_file(fhandle)
569-
checksum = posixpath.basename(relative_path)
570-
self.document.used(
571-
process_run_id, "data:%s" % checksum,
698+
entity = self.declare_artefact(value)
699+
self.document.used(
700+
process_run_id, entity,
572701
datetime.datetime.now(), None, {"prov:role": prov_role})
573702

574-
else: # add the actual data value in the prov document
575-
# Convert to bytes so we can get a hash (and add to RO)
576-
byte_s = io.BytesIO(str(value).encode(ENCODING))
577-
data_file = self.research_object.add_data_file(byte_s)
578-
# FIXME: Don't naively assume add_data_file uses hash in filename!
579-
data_id = "data:%s" % posixpath.split(data_file)[1]
580-
self.document.entity(
581-
data_id, {provM.PROV_TYPE: WFPROV["Artifact"],
582-
provM.PROV_VALUE: str(value)})
583-
self.document.used(
584-
process_run_id, data_id, datetime.datetime.now(), None,
585-
{"prov:role": prov_role})
586-
587703
def generate_output_prov(self,
588704
final_output, # type: Optional[Dict[Text, Any]]
589705
process_run_id, # type: Optional[str]
@@ -657,35 +773,6 @@ def dict_output(key, current_dict):
657773
rel_path = self.research_object.add_data_file(cwl_output_file, when)
658774
_logger.info(u"[provenance] Adding output file %s to RO", rel_path)
659775

660-
661-
def declare_artefact(self, relativised_input_obj, job_order_object):
662-
# type: (Any, Dict) -> None
663-
'''
664-
create data artefact entities for all file objects.
665-
'''
666-
if isinstance(relativised_input_obj, dict):
667-
# Base case - we found a File we need to update
668-
if relativised_input_obj.get("class") == "File":
669-
#create an artefact
670-
shahash = "data:"+relativised_input_obj["location"].split("/")[-1]
671-
self.document.entity(shahash, {provM.PROV_TYPE:WFPROV["Artifact"]})
672-
673-
for each_input_obj in relativised_input_obj.values():
674-
self.declare_artefact(each_input_obj, job_order_object)
675-
return
676-
677-
if isinstance(relativised_input_obj, (str, Text)):
678-
# Just a string value, no need to iterate further
679-
# FIXME: Should these be added as PROV entities as well?
680-
return
681-
682-
try:
683-
for each_input_obj in iter(relativised_input_obj):
684-
# Recurse and rewrite any nested File objects
685-
self.declare_artefact(each_input_obj, job_order_object)
686-
except TypeError:
687-
pass
688-
689776
def prospective_prov(self, job):
690777
# type: (Any) -> None
691778
'''
@@ -1217,6 +1304,9 @@ def packed_workflow(self, packed): # type: (Text) -> None
12171304
write_pack.write(packed.encode(ENCODING))
12181305
_logger.info(u"[provenance] Added packed workflow: %s", rel_path)
12191306

1307+
def has_data_file(self, sha1hash):
1308+
folder = os.path.join(self.folder, DATA, sha1hash[0:2])
1309+
return os.path.isfile(os.path.join(folder, sha1hash))
12201310

12211311
def add_data_file(self, from_fp, when=None):
12221312
# type: (IO, Optional[datetime.datetime]) -> Text
@@ -1347,6 +1437,8 @@ def create_job(self,
13471437
if isinstance(value, dict):
13481438
if value.get("class") == "File":
13491439
relativised_input_objecttemp[key] = value
1440+
if value.get("class") == "Directory":
1441+
relativised_input_objecttemp[key] = value
13501442
else:
13511443
relativised_input_objecttemp[key] = value
13521444
relativised_input_object.update(
@@ -1376,6 +1468,10 @@ def _relativise_files(self, structure, relativised_input_objecttemp2):
13761468
structure["checksum"] = "sha1$%s" % posixpath.basename(relative_path)
13771469
relativised_input_objecttemp2[ref_location] = structure["location"]
13781470

1471+
if structure.get("class") == "Directory":
1472+
# TODO:
1473+
pass
1474+
13791475
for val in structure.values():
13801476
self._relativise_files(val, relativised_input_objecttemp2)
13811477
return

0 commit comments

Comments
 (0)