|
1 | 1 | from abc import ABC, abstractmethod |
2 | | -from typing import Dict, List |
| 2 | +from dataclasses import dataclass |
| 3 | +from functools import cached_property |
| 4 | +from typing import Any, List, Optional |
3 | 5 |
|
4 | | -from .task import Task |
5 | 6 | from .util import WdlBase |
6 | 7 |
|
7 | 8 |
|
| 9 | + |
| 10 | +@dataclass |
| 11 | +class StepValueLine: |
| 12 | + tag: str |
| 13 | + value: Optional[str] |
| 14 | + special_label: Optional[str] |
| 15 | + prefix_label: Optional[str] |
| 16 | + default_label: Optional[str] |
| 17 | + #default_label: bool |
| 18 | + datatype_label: str |
| 19 | + |
| 20 | + @cached_property |
| 21 | + def length(self) -> int: |
| 22 | + return len(f'{self.tag_and_value},') |
| 23 | + |
| 24 | + @property |
| 25 | + def tag_and_value(self) -> str: |
| 26 | + return f'{self.tag}={self.value},' |
| 27 | + |
| 28 | + @property |
| 29 | + def info_comment(self) -> str: |
| 30 | + prefix = f' {self.prefix_label}' if self.prefix_label is not None else '' |
| 31 | + datatype = f' [{self.datatype_label}]' if self.datatype_label is not None else '' |
| 32 | + default = f' ({self.default_label})' if self.default_label is not None else '' |
| 33 | + special = f' **{self.special_label}' if self.special_label is not None else '' |
| 34 | + #default = f' (default)' if self.default_label is True else '' |
| 35 | + return f'#{special}{prefix}{datatype}{default}' |
| 36 | + |
| 37 | + |
| 38 | + |
8 | 39 | class WorkflowCallBase(WdlBase, ABC): |
9 | 40 | @abstractmethod |
10 | | - def get_string(self, indent=1): |
| 41 | + def get_string(self, indent: int=1): |
11 | 42 | raise Exception("Must override 'get_string(indent:int)'") |
12 | 43 |
|
13 | 44 |
|
14 | 45 | class WorkflowCall(WorkflowCallBase): |
| 46 | + |
15 | 47 | def __init__( |
16 | 48 | self, |
17 | | - namespaced_identifier: str = None, |
18 | | - alias: str = None, |
19 | | - inputs_map: Dict[str, str] = None, |
| 49 | + namespaced_identifier: str, |
| 50 | + alias: str, |
| 51 | + inputs_details: dict[str, dict[str, Any]], |
| 52 | + messages: list[str], |
| 53 | + render_comments: bool = True |
20 | 54 | ): |
21 | 55 | """ |
22 | | -
|
23 | 56 | :param task: |
24 | 57 | :param namespaced_identifier: Required if task is imported. The workflow might take care of this later? |
25 | 58 | :param alias: |
26 | | - :param inputs_map: |
| 59 | + :param inputs_details: |
27 | 60 | """ |
28 | 61 | self.namespaced_identifier = namespaced_identifier |
29 | 62 | self.alias = alias |
30 | | - self.inputs_map = inputs_map |
31 | | - |
32 | | - self.call_format = """{ind}call {name}{alias} {body}""" |
33 | | - self.body_format = """{{\n{ind}{tb}input:\n{input_map}\n{ind}}}""" |
34 | | - |
35 | | - def get_string(self, indent=1): |
36 | | - tb = " " |
37 | | - |
38 | | - body = "" |
39 | | - if self.inputs_map: |
40 | | - inpmap = ",\n".join( |
41 | | - ((indent + 2) * tb + str(k) + "=" + str(v)) |
42 | | - for k, v in self.inputs_map.items() |
| 63 | + self.inputs_details = inputs_details |
| 64 | + self.messages = messages |
| 65 | + self.render_comments = render_comments |
| 66 | + |
| 67 | + def get_string(self, indent: int=1): |
| 68 | + self.tb: str = " " |
| 69 | + self.indent: int = 1 |
| 70 | + ind = self.indent * self.tb |
| 71 | + name = self.namespaced_identifier |
| 72 | + alias = " as " + self.alias if self.alias else "" # alias is always being supplied, but this implies its not? |
| 73 | + body = self.get_body() |
| 74 | + return f"{ind}call {name}{alias} {body}\n{ind}}}" |
| 75 | + |
| 76 | + def get_body(self) -> str: |
| 77 | + lines = self.init_known_input_lines() |
| 78 | + return self.input_section_to_string(lines) |
| 79 | + |
| 80 | + def init_known_input_lines(self) -> list[StepValueLine]: |
| 81 | + out: list[StepValueLine] = [] |
| 82 | + for tag, d in self.inputs_details.items(): |
| 83 | + line = StepValueLine( |
| 84 | + tag=tag, |
| 85 | + value=d['value'], |
| 86 | + special_label=d['special_label'], |
| 87 | + prefix_label=d['prefix'], |
| 88 | + default_label=d['default'], |
| 89 | + datatype_label=d['datatype'] |
43 | 90 | ) |
44 | | - body = self.body_format.format(ind=indent * tb, input_map=inpmap, tb=tb) |
45 | | - |
46 | | - return self.call_format.format( |
47 | | - ind=indent * tb, |
48 | | - tb=tb, |
49 | | - name=self.namespaced_identifier |
50 | | - if self.namespaced_identifier |
51 | | - else self.task.name, |
52 | | - alias=(" as " + self.alias) if self.alias else "", |
53 | | - body=body, |
54 | | - ) |
| 91 | + out.append(line) |
| 92 | + return out |
| 93 | + |
| 94 | + def input_section_to_string(self, lines: list[StepValueLine]) -> str: |
| 95 | + str_lines: list[str] = [] |
| 96 | + ind = (self.indent + 1) * self.tb |
| 97 | + |
| 98 | + # render 'unknown' input messages |
| 99 | + if self.render_comments: |
| 100 | + str_lines += [f'{ind}{self.tb}#{m},' for m in self.messages] |
| 101 | + |
| 102 | + # calc longest line so we know how to justify info comments |
| 103 | + max_line_len = max([ln.length for ln in lines]) |
| 104 | + |
| 105 | + # generate string representaiton of each line |
| 106 | + for ln in lines: |
| 107 | + if self.render_comments: |
| 108 | + str_line = f'{ind}{self.tb}{ln.tag_and_value:<{max_line_len+2}}{ln.info_comment}' |
| 109 | + else: |
| 110 | + str_line = f'{ind}{self.tb}{ln.tag_and_value}' |
| 111 | + str_lines.append(str_line) |
| 112 | + |
| 113 | + # join lines and return body segment |
| 114 | + inputs = '\n'.join(str_lines) |
| 115 | + return f"{{\n{ind}input:\n{inputs}" |
| 116 | + |
55 | 117 |
|
56 | 118 |
|
57 | 119 | class WorkflowConditional(WorkflowCallBase): |
|
0 commit comments