|
| 1 | +from typing import Dict, List, Tuple, Any |
| 2 | + |
| 3 | + |
| 4 | +Json = Dict[str, Any] |
| 5 | + |
| 6 | + |
| 7 | +def parse_step_name_str(step_name: str) -> Tuple[str, int, str]: |
| 8 | + """The inverse function to step_name_str() |
| 9 | +
|
| 10 | + Args: |
| 11 | + step_name (str): A string of the same form as returned by step_name_str() |
| 12 | +
|
| 13 | + Raises: |
| 14 | + Exception: If the argument is not of the same form as returned by step_name_str() |
| 15 | +
|
| 16 | + Returns: |
| 17 | + Tuple[str, int, str]: The parameters used to create step_name |
| 18 | + """ |
| 19 | + vals = step_name.split('__') # double underscore |
| 20 | + if not len(vals) == 4: |
| 21 | + raise Exception(f"Error! {step_name} is not of the format \n" |
| 22 | + + '{yaml_stem}__step__{i+1}__{step_key}\n' |
| 23 | + + 'yaml_stem and step_key should not contain any double underscores.') |
| 24 | + try: |
| 25 | + i = int(vals[2]) |
| 26 | + except Exception as ex: |
| 27 | + raise Exception(f"Error! {step_name} is not of the format \n" |
| 28 | + + '{yaml_stem}__step__{i+1}__{step_key}') from ex |
| 29 | + return (vals[0], i-1, vals[3]) |
| 30 | + |
| 31 | + |
| 32 | +# Copied from wic to avoid wic dependency |
| 33 | +def shorten_namespaced_output_name(namespaced_output_name: str, sep: str = ' ') -> Tuple[str, str]: |
| 34 | + """Removes the intentionally redundant yaml_stem prefixes from the list of |
| 35 | + step_name_str's embedded in namespaced_output_name which allows each |
| 36 | + step_name_str to be context-free and unique. This is potentially dangerous, |
| 37 | + and the only purpose is so we can slightly shorten the output filenames. |
| 38 | +
|
| 39 | + Args: |
| 40 | + namespaced_output_name (str): A string of the form: |
| 41 | + '___'.join(namespaces + [step_name_i, out_key]) |
| 42 | + sep (str): The separator used to construct the shortened step name strings. |
| 43 | +
|
| 44 | + Returns: |
| 45 | + Tuple[str, str]: the first yaml_stem, so this function can be inverted, |
| 46 | + and namespaced_output_name, with the embedded yaml_stem prefixes |
| 47 | + removed and double underscores replaced with a single space. |
| 48 | + """ |
| 49 | + split = namespaced_output_name.split('___') |
| 50 | + namespaces = split[:-1] |
| 51 | + output_name = split[-1] |
| 52 | + strs = [] |
| 53 | + yaml_stem_init = '' |
| 54 | + if len(namespaces) > 0: |
| 55 | + yaml_stem_init = parse_step_name_str(namespaces[0])[0] |
| 56 | + for stepnamestr in namespaces: |
| 57 | + _, i, step_key = parse_step_name_str(stepnamestr) |
| 58 | + strs.append(f'step{sep}{i+1}{sep}{step_key}') |
| 59 | + shortened = '___'.join(strs + [output_name]) |
| 60 | + return (yaml_stem_init, shortened) |
| 61 | + |
| 62 | + |
| 63 | +def recursively_insert_into_dict_tree(tree: Dict, keys: List[str], val: Any) -> Dict: |
| 64 | + """Recursively inserts a value into a nested tree of Dicts, creating new Dicts as necessary. |
| 65 | +
|
| 66 | + Args: |
| 67 | + tree (Dict): A nested tree of Dicts. |
| 68 | + keys (List[str]): The path through the tree to the value. |
| 69 | + val (Any): The value to be inserted. |
| 70 | +
|
| 71 | + Returns: |
| 72 | + Dict: The updated tree with val inserted as per the path specified by keys. |
| 73 | + """ |
| 74 | + if keys == []: |
| 75 | + return tree |
| 76 | + key = keys[0] |
| 77 | + if len(keys) == 1: |
| 78 | + if isinstance(tree, Dict): |
| 79 | + if key in tree: |
| 80 | + tree[key].append(val) |
| 81 | + else: |
| 82 | + tree[key] = [val] |
| 83 | + if isinstance(tree, List): |
| 84 | + # TODO: Output Directories cause problems with uniqueness of names, |
| 85 | + # so for now we have to terminate the recursion. |
| 86 | + tree.append(val) |
| 87 | + return tree |
| 88 | + subtree = tree.get(key, {}) |
| 89 | + tree[key] = recursively_insert_into_dict_tree(subtree, keys[1:], val) |
| 90 | + return tree |
| 91 | + |
| 92 | + |
| 93 | +# Copied from wic to avoid wic dependency |
| 94 | +def parse_provenance_output_files(output_json: Json) -> List[Tuple[str, str, str]]: |
| 95 | + """Parses the primary workflow provenance JSON object. |
| 96 | +
|
| 97 | + Args: |
| 98 | + output_json (Json): The JSON results object, containing the metadata for all output files. |
| 99 | +
|
| 100 | + Returns: |
| 101 | + List[Tuple[str, str, str]]: A List of (location, parentdirs, basename) for each output file. |
| 102 | + """ |
| 103 | + files = [] |
| 104 | + for namespaced_output_name, obj in output_json.items(): |
| 105 | + files.append(parse_provenance_output_files_(obj, namespaced_output_name)) |
| 106 | + return [y for x in files for y in x] |
| 107 | + |
| 108 | + |
| 109 | +# Copied from wic to avoid wic dependency |
| 110 | +def parse_provenance_output_files_(obj: Any, parentdirs: str) -> List[Tuple[str, str, str]]: |
| 111 | + """Parses the primary workflow provenance JSON object. |
| 112 | +
|
| 113 | + Args: |
| 114 | + obj (Any): The provenance object or one of its recursive sub-objects. |
| 115 | + parentdirs (str): The directory associated with obj. |
| 116 | +
|
| 117 | + Returns: |
| 118 | + List[Tuple[str, str, str]]: A List of (location, parentdirs, basename) for each output file. |
| 119 | + """ |
| 120 | + if isinstance(obj, Dict): |
| 121 | + if obj.get('class', '') == 'File': |
| 122 | + return [(str(obj['location']), parentdirs, str(obj['basename']))] # This basename is a file name |
| 123 | + if obj.get('class', '') == 'Directory': |
| 124 | + subdir = parentdirs + '/' + obj['basename'] # This basename is a directory name |
| 125 | + return parse_provenance_output_files_(obj['listing'], subdir) |
| 126 | + if isinstance(obj, List): |
| 127 | + files = [] |
| 128 | + for o in obj: |
| 129 | + files.append(parse_provenance_output_files_(o, parentdirs)) |
| 130 | + # Should we flatten?? This will lose the structure of 2D (and higher) array outputs. |
| 131 | + return [y for x in files for y in x] |
| 132 | + return [] |
| 133 | + |
| 134 | + |
| 135 | +def provenance_list_to_tree(files: List[Tuple[str, str, str]]) -> Dict: |
| 136 | + """Converts the flattened list of workflow steps into a nested tree of Dicts corresponding to subworkflows. |
| 137 | +
|
| 138 | + Args: |
| 139 | + files (List[Tuple[str, str, str]]): This should be the output of parse_provenance_output_files(...) |
| 140 | +
|
| 141 | + Returns: |
| 142 | + Dict: A nested tree of Dicts corresponding to subworkflows. |
| 143 | + """ |
| 144 | + tree: Dict = {} |
| 145 | + for location, namespaced_output_name, basename in files: |
| 146 | + namespaces = namespaced_output_name.split('___') |
| 147 | + # print(yaml.dump(tree)) |
| 148 | + # print((location, namespaced_output_name, basename)) |
| 149 | + tree = recursively_insert_into_dict_tree(tree, namespaces, (location, namespaced_output_name, basename)) |
| 150 | + return tree |
0 commit comments