Skip to content

Commit 17a180c

Browse files
committed
provenance: refactored entity incl. file extension
file extensions needed for secondaryFiles
1 parent 8f864ff commit 17a180c

File tree

1 file changed

+101
-81
lines changed

1 file changed

+101
-81
lines changed

cwltool/provenance.py

Lines changed: 101 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ class PermissionError(OSError): # pylint: disable=redefined-builtin
9393
PROVENANCE = os.path.join(METADATA, "provenance")
9494
WFDESC = Namespace("wfdesc", 'http://purl.org/wf4ever/wfdesc#')
9595
WFPROV = Namespace("wfprov", 'http://purl.org/wf4ever/wfprov#')
96+
WF4EVER = Namespace("wf4ever", 'http://purl.org/wf4ever/wf4ever#')
9697
RO = Namespace("ro", 'http://purl.org/wf4ever/ro#')
9798
ORE = Namespace("ore", 'http://www.openarchives.org/ore/terms/')
9899
FOAF = Namespace("foaf", 'http://xmlns.com/foaf/0.1/')
@@ -109,6 +110,9 @@ class PermissionError(OSError): # pylint: disable=redefined-builtin
109110
# e.g. "checksum" = "sha1$47a013e660d408619d894b20806b1d5086aab03b"
110111
# See ./cwltool/schemas/v1.0/Process.yml
111112
Hasher = hashlib.sha1
113+
SHA1="sha1"
114+
SHA256="sha256"
115+
SHA512="sha512"
112116

113117
# TODO: Better identifiers for user, at least
114118
# these should be preserved in ~/.config/cwl for every execution
@@ -160,9 +164,9 @@ def __init__(self, research_object, rel_path):
160164
if posixpath.isabs(rel_path):
161165
raise ValueError("rel_path must be relative: %s" % rel_path)
162166
self.rel_path = rel_path
163-
self.hashes = {"sha1": hashlib.sha1(),
164-
"sha256": hashlib.sha256(),
165-
"sha512": hashlib.sha512()}
167+
self.hashes = {SHA1: hashlib.sha1(),
168+
SHA256: hashlib.sha256(),
169+
SHA512: hashlib.sha512()}
166170
# Open file in Research Object folder
167171
if research_object.folder:
168172
path = os.path.abspath(os.path.join(research_object.folder, _local_path(rel_path)))
@@ -327,6 +331,7 @@ def __init__(self,
327331
self.workflow_run_uuid = run_uuid
328332
self.workflow_run_uri = run_uuid.urn
329333
self.generate_prov_doc()
334+
self.secondaries = {}
330335

331336
def __str__(self):
332337
return "CreateProvProfile <%s> in <%s>" % (
@@ -371,7 +376,7 @@ def host_provenance(document):
371376
# https://tools.ietf.org/html/rfc6920#section-7
372377
self.document.add_namespace('data', 'urn:hash::sha1:')
373378
# Also needed for docker images
374-
self.document.add_namespace("sha256", "nih:sha-256;")
379+
self.document.add_namespace(SHA256, "nih:sha-256;")
375380

376381
# info only, won't really be used by prov as sub-resources use /
377382
self.document.add_namespace('researchobject', self.research_object.base_uri)
@@ -567,30 +572,83 @@ def declare_artefact(self, value):
567572
provM.PROV_VALUE: str(value)})
568573

569574
elif isinstance(value, dict):
575+
if "@id" in value:
576+
# Already processed this value, but it might not be in this PROV
577+
entities = self.document.get_record(value["@id"])
578+
if entities:
579+
return entities[0]
580+
# else, unknown in PROV, re-add below as if it's fresh
581+
570582
# Base case - we found a File we need to update
571583
if value.get("class") == "File":
584+
# Need to determine file hash aka RO filename
585+
entity = None
572586
if 'checksum' in value:
573587
csum = value['checksum']
574588
(method, checksum) = csum.split("$", 1)
575-
if method == "sha1" and \
589+
if method == SHA1 and \
576590
self.research_object.has_data_file(checksum):
577-
return self.document.entity("data:" + checksum)
591+
entity = self.document.entity("data:" + checksum)
578592

579-
if 'location' in value:
580-
# FIXME: cope with file literals.
593+
if not entity and 'location' in value:
581594
location = str(value['location'])
582595
# If we made it here, we'll have to add it to the RO
583596
assert self.research_object.make_fs_access
584597
fsaccess = self.research_object.make_fs_access("")
585598
with fsaccess.open(location, "rb") as fhandle:
586599
relative_path = self.research_object.add_data_file(fhandle)
600+
# FIXME: This naively relies on add_data_file setting hash as filename
587601
checksum = posixpath.basename(relative_path)
588-
return self.document.entity("data:" + checksum,
602+
entity = self.document.entity("data:" + checksum,
589603
{provM.PROV_TYPE: WFPROV["Artifact"]})
604+
if "checksum" not in value:
605+
value["checksum"] = "%s$%s" % (SHA1, checksum)
606+
590607

591-
if 'content' in value:
608+
if not entity and 'content' in value:
592609
# Anonymous file, add content as string
593-
return self.declare_artefact(value["content"])
610+
entity = self.declare_artefact(value["content"])
611+
612+
# By here one of them should have worked!
613+
if not entity:
614+
raise ValueError("class:File but missing checksum/location/content: %r" % value)
615+
616+
617+
# Track filename and extension, this is generally useful only for
618+
# secondaryFiles. Note that multiple uses of a file might thus record
619+
# different names for the same entity, so we'll
620+
# make/track a specialized entity by UUID
621+
file_id = value.setdefault("@id", uuid.uuid4().urn)
622+
# A specialized entity that has just these names
623+
file_entity = self.document.entity(file_id,
624+
[(provM.PROV_TYPE, WFPROV["Artifact"]),
625+
(provM.PROV_TYPE, WF4EVER["File"])
626+
])
627+
628+
if "basename" in value:
629+
file_entity.add_attributes({CWLPROV["basename"]: value["basename"]})
630+
if "nameroot" in value:
631+
file_entity.add_attributes({CWLPROV["nameroot"]: value["nameroot"]})
632+
if "nameext" in value:
633+
file_entity.add_attributes({CWLPROV["nameext"]: value["nameext"]})
634+
self.document.specializationOf(file_entity, entity)
635+
636+
# Check for secondaries
637+
for sec in value.get("secondaryFiles", ()):
638+
# TODO: Record these in a specializationOf entity with UUID?
639+
sec_entity = self.declare_artefact(sec)
640+
# We don't know how/when/where the secondary file was generated,
641+
# but CWL convention is a kind of summary/index derived
642+
# from the original file. As its generally in a different format
643+
# then prov:Quotation is not appropriate.
644+
self.document.derivation(sec_entity, file_entity,
645+
other_attributes={PROV["type"]: CWLPROV["SecondaryFile"]})
646+
# TODO: Add to self.secondaries so it can later
647+
# be augmented into primary-job.json
648+
649+
# Return the UUID file_entity so that we
650+
# know which filenames were used/generated in this activity
651+
return file_entity
594652

595653
elif value.get("class") == "Directory":
596654
# Register any nested files/directories
@@ -600,7 +658,7 @@ def declare_artefact(self, value):
600658
# in a different location.
601659
# For now, mint a new UUID to identify this directory, but
602660
# attempt to keep it inside the value dictionary
603-
dir_id = value.setdefault("id",
661+
dir_id = value.setdefault("@id",
604662
uuid.uuid4().urn)
605663

606664
# New annotation file to keep the ORE Folder listing
@@ -692,13 +750,16 @@ def declare_artefact(self, value):
692750
self.research_object.add_uri(coll.identifier.uri)
693751
return coll
694752
else:
753+
coll_id = value.setdefault("@id",
754+
uuid.uuid4().urn)
695755
# some other kind of dictionary?
696756
# TODO: also Save as JSON
697-
coll = self.document.entity(uuid.uuid4().urn,
757+
coll = self.document.entity(coll_id,
698758
[ (provM.PROV_TYPE, WFPROV["Artifact"]),
699759
(provM.PROV_TYPE, PROV["Collection"]),
700760
(provM.PROV_TYPE, PROV["Dictionary"]),
701761
])
762+
value
702763

703764
if value.get("class"):
704765
_logger.warn("Unknown data class " + value["class"])
@@ -749,6 +810,7 @@ def declare_artefact(self, value):
749810
# with numeric keys
750811
self.document.membership(coll, e)
751812
self.research_object.add_uri(coll.identifier.uri)
813+
# FIXME: list value does not support adding "@id"
752814
return coll
753815
except TypeError:
754816
_logger.warning("Unrecognized type %s of %r" %
@@ -786,70 +848,26 @@ def generate_output_prov(self,
786848
'''
787849
create wasGeneratedBy() for each output and copy each output file in the RO
788850
'''
789-
# A bit too late, but we don't know the "inner" when
790-
def array_output(key, current_l):
791-
# type: (Any, List) -> List
792-
'''
793-
helper function for generate_output_prov()
794-
for the case when we have an array of files as output
795-
'''
796-
new_l = []
797-
for out_file in current_l:
798-
if isinstance(out_file, dict):
799-
new_l.append((key, out_file['checksum'], out_file['location']))
800-
801-
return new_l
802-
803-
def dict_output(key, current_dict):
804-
# type: (Any, Dict) -> List
805-
'''
806-
helper function for generate_output_prov()
807-
for the case when the output is key:value where value is a file item
808-
'''
809-
new_d = []
810-
if current_dict.get("class") == "File":
811-
new_d.append((key, current_dict['checksum'], current_dict['location']))
812-
return new_d
813-
851+
# Record "when" as early as possible
814852
when = datetime.datetime.now()
815-
key_files = [] # type: List[List[Any]]
816-
if final_output:
817-
for key, value in final_output.items():
818-
819-
if isinstance(value, list):
820-
key_files.append(array_output(key, value))
821-
elif isinstance(value, dict):
822-
key_files.append(dict_output(key, value))
823-
824-
merged_total = list(itertools.chain.from_iterable(key_files))
825-
#generate data artefacts at workflow level
826-
for tuple_entry in merged_total:
827-
# FIXME: What are these magic array[][] positions???
828-
output_checksum = "data:"+str(tuple_entry[1][5:])
829-
830-
if process_run_id and name:
831-
name = urllib.parse.quote(str(name), safe=":/,#")
832-
step_prov = self.wf_ns["main/"+name+"/"+str(tuple_entry[0])]
833853

834-
self.document.entity(output_checksum,
835-
{provM.PROV_TYPE: WFPROV["Artifact"]})
836-
self.document.wasGeneratedBy(
837-
output_checksum, process_run_id, when, None,
838-
{"prov:role": step_prov})
854+
# For each output, find/register the corresponding
855+
# entity (UUID) and document it as generated in
856+
# a role corresponding to the output
857+
for output, value in final_output.items():
858+
entity = self.declare_artefact(value)
859+
if name:
860+
name = urllib.parse.quote(str(name), safe=":/,#")
861+
# FIXME: Probably not "main" in nested workflows
862+
role = self.wf_ns["main/%s/%s" % (name, output)]
839863
else:
840-
output_prov_role = self.wf_ns["main/"+str(tuple_entry[0])]
841-
self.document.entity(output_checksum,
842-
{provM.PROV_TYPE: WFPROV["Artifact"]})
843-
self.document.wasGeneratedBy(
844-
output_checksum, self.workflow_run_uri, when, None,
845-
{"prov:role": output_prov_role})
846-
# FIXME: What are these magic array positions???
847-
path = tuple_entry[2]
848-
if path.startswith("file://"):
849-
path = path[7:]
850-
with open(path, "rb") as cwl_output_file:
851-
rel_path = self.research_object.add_data_file(cwl_output_file, when)
852-
_logger.info(u"[provenance] Adding output file %s to RO", rel_path)
864+
role = self.wf_ns["main/%s" % output]
865+
866+
if not process_run_id:
867+
process_run_id = self.workflow_run_uri
868+
869+
self.document.wasGeneratedBy(entity, process_run_id,
870+
when, None, {"prov:role": role})
853871

854872
def prospective_prov(self, job):
855873
# type: (Any) -> None
@@ -1081,15 +1099,15 @@ def add_tagfile(self, path, when=None):
10811099
# Below probably OK for now as metadata files
10821100
# are not too large..?
10831101

1084-
checksums["sha1"] = checksum_copy(tag_file, hasher=hashlib.sha1)
1102+
checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1)
10851103
tag_file.seek(0)
10861104
# Older Python's might not have all checksums
10871105
if sha256:
10881106
tag_file.seek(0)
1089-
checksums["sha256"] = checksum_copy(tag_file, hasher=sha256)
1107+
checksums[SHA256] = checksum_copy(tag_file, hasher=sha256)
10901108
if sha512:
10911109
tag_file.seek(0)
1092-
checksums["sha512"] = checksum_copy(tag_file, hasher=sha512)
1110+
checksums[SHA512] = checksum_copy(tag_file, hasher=sha512)
10931111
assert self.folder
10941112
rel_path = _posix_path(os.path.relpath(path, self.folder))
10951113
self.tagfiles.add(rel_path)
@@ -1495,12 +1513,12 @@ def _add_to_bagit(self, rel_path, **checksums):
14951513
return
14961514
self.bagged_size[rel_path] = os.path.getsize(local_path)
14971515

1498-
if "sha1" not in checksums:
1516+
if SHA1 not in checksums:
14991517
# ensure we always have sha1
15001518
checksums = dict(checksums)
15011519
with open(local_path, "rb") as file_path:
15021520
# FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile?
1503-
checksums["sha1"] = checksum_copy(file_path, hasher=hashlib.sha1)
1521+
checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1)
15041522

15051523
self.add_to_manifest(rel_path, checksums)
15061524

@@ -1554,12 +1572,14 @@ def _relativise_files(self, structure):
15541572
with fsaccess.open(structure["location"], "rb") as relative_file:
15551573
relative_path = self.add_data_file(relative_file)
15561574
ref_location = structure["location"]
1557-
structure["location"] = "../"+relative_path
1575+
## FIXME: This might break something else
1576+
##structure["location"] = "../"+relative_path
15581577
if "path" in structure:
15591578
del structure["path"]
15601579
if "checksum" not in structure:
15611580
# FIXME: This naively relies on add_data_file setting hash as filename
1562-
structure["checksum"] = "sha1$%s" % posixpath.basename(relative_path)
1581+
checksum = posixpath.basename(relative_path)
1582+
structure["checksum"] = "%s$%s" % (SHA1, checksum)
15631583
# TODO: Calculate secondaryFiles if needed but missing
15641584

15651585
if structure.get("class") == "Directory":

0 commit comments

Comments
 (0)