Skip to content

Commit 11705e1

Browse files
ICT spec updates and WFB payload fix (PolusAI#282)
* Remove error when looking for output in inputs * Fix bug when parsing outputs * Remove incorrect keys from ui in ICT dict * Add function to add missing inputs to nodes in wfb * Make UI optional in ict spec and remove ui if found * Add test for updating nodes in wfb payload * Update ui validation * Update wfb type to Json * Update wfb nodes before raw to lean * Remove update to ui before clt conversion * Move is_inlet inside of wfb fixing function * Check that plugins are in wfb data * Move schema validation to payload update function * Check if plugins are present in WFB data
1 parent a908c88 commit 11705e1

File tree

10 files changed

+1757
-30
lines changed

10 files changed

+1757
-30
lines changed

.github/workflows/run_workflows.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,11 @@ jobs:
191191
# NOTE: Do NOT add coverage to PYPY CI runs https://github.com/tox-dev/tox/issues/2252
192192
run: cd workflow-inference-compiler/ && pytest tests/test_ict_to_clt_conversion.py -k test_ict_to_clt
193193

194+
- name: PyTest Run update wfb payload Tests
195+
if: always()
196+
# NOTE: Do NOT add coverage to PYPY CI runs https://github.com/tox-dev/tox/issues/2252
197+
run: cd workflow-inference-compiler/ && pytest tests/test_fix_payload.py -k test_fix
198+
194199
# NOTE: The steps below are for repository_dispatch only. For all other steps, please insert above
195200
# this comment.
196201

src/sophios/api/http/restapi.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ async def compile_wf(request: Request) -> Json:
9595
req: Json = await request.json()
9696
# clean up and convert the incoming object
9797
# schema preserving
98+
req = converter.update_payload_missing_inputs_outputs(req)
9899
wfb_payload = converter.raw_wfb_to_lean_wfb(req)
99100
# schema non-preserving
100101
workflow_temp = converter.wfb_to_wic(wfb_payload)

src/sophios/api/utils/converter.py

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,7 @@ def extract_state(inp: Json) -> Json:
5555
inp_inter['state']['links'] = step_edges
5656
# massage the plugins
5757
plugins = inp_inter['plugins']
58-
# drop incorrect/superfluous UI fields from plugins
59-
# 'required' and 'format'
60-
for ict_plugin in plugins:
61-
for ui_elem in ict_plugin['ui']:
62-
_, _ = ui_elem.pop('required', None), ui_elem.pop('format', None)
63-
for out in ict_plugin['outputs']:
64-
if out['name'] == 'outDir':
65-
ict_plugin['inputs'].append(out)
58+
6659
# Here goes the ICT to CLT extraction logic
6760
for node in inp_inter['state']['nodes']:
6861
node_pid = node["pluginId"]
@@ -78,8 +71,6 @@ def extract_state(inp: Json) -> Json:
7871

7972
def raw_wfb_to_lean_wfb(inp: Json) -> Json:
8073
"""Drop all the unnecessary info from incoming wfb object"""
81-
if validate_schema_and_object(SCHEMA, inp):
82-
print('incoming object is valid against input object schema')
8374
inp_restrict = extract_state(inp)
8475
keys = list(inp_restrict.keys())
8576
# To avoid deserialization
@@ -193,3 +184,66 @@ def ict_to_clt(ict: Union[ICT, Path, str, dict], network_access: bool = False) -
193184
ict_local = ict if isinstance(ict, ICT) else cast_to_ict(ict)
194185

195186
return ict_local.to_clt(network_access=network_access)
187+
188+
189+
def update_payload_missing_inputs_outputs(wfb_data: Json) -> Json:
190+
"""Update payload with missing inputs and outputs using links"""
191+
192+
# ensure the incoming wfb data is valid
193+
if validate_schema_and_object(SCHEMA, wfb_data):
194+
print('incoming object is valid against input object schema')
195+
196+
# return if no plugins are found in data
197+
if not wfb_data['plugins']:
198+
return wfb_data
199+
200+
wfb_data_copy = copy.deepcopy(wfb_data)
201+
202+
links = wfb_data_copy["state"]["links"]
203+
nodes = wfb_data_copy["state"]["nodes"]
204+
plugins = wfb_data_copy["plugins"]
205+
206+
# hashmap of node id to nodes for fast node lookup
207+
nodes_dict = {node['id']: node for node in nodes}
208+
209+
# hashmap of plugins id to nodes for fast plugin lookup
210+
plugins_dict = {plugin['pid']: plugin for plugin in plugins}
211+
212+
# find links corresponding to the node
213+
for link in links:
214+
215+
# link ids
216+
target_id: int = link["targetId"]
217+
source_id: int = link["sourceId"]
218+
219+
target_node = nodes_dict[target_id]
220+
source_node = nodes_dict[source_id]
221+
222+
# plugins corresponding to the nodes
223+
target_plugin = plugins_dict[target_node["pluginId"]]
224+
source_plugin = plugins_dict[source_node["pluginId"]]
225+
226+
def is_inlet(binding: Json) -> bool:
227+
"""Check if a wfb input is an inlet (directory)"""
228+
229+
return (
230+
binding['type'] in ['directory', 'file', 'path', 'collection', 'csvCollection'] or
231+
binding['name'].lower() == 'inpdir' or
232+
binding['name'].lower().endswith('path') or
233+
binding['name'].lower().endswith('dir')
234+
)
235+
236+
# filter inputs by to only be inlets (directories)
237+
input_directories = [binding for binding in target_plugin["inputs"] if is_inlet(binding)]
238+
output_directories = [binding for binding in source_plugin["outputs"] if is_inlet(binding)]
239+
240+
missing_input_key = input_directories[link["inletIndex"]]["name"]
241+
missing_output_key = output_directories[link["outletIndex"]]["name"]
242+
243+
# add the missing input value to the node if needed
244+
target_node["settings"]["inputs"][missing_input_key] = source_node["settings"]["outputs"][missing_output_key]
245+
246+
if validate_schema_and_object(SCHEMA, wfb_data_copy):
247+
print('Updated object is valid against input object schema')
248+
249+
return wfb_data_copy

src/sophios/api/utils/ict/ict_spec/cast.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def cast_to_ict(ict: Union[Path, str, dict]) -> ICT:
1313
ict = Path(ict)
1414

1515
if isinstance(ict, Path):
16+
1617
if str(ict).endswith(".yaml") or str(ict).endswith(".yml"):
1718
with open(ict, "r", encoding="utf-8") as f_o:
1819
data = safe_load(f_o)
@@ -22,6 +23,10 @@ def cast_to_ict(ict: Union[Path, str, dict]) -> ICT:
2223
else:
2324
raise ValueError(f"File extension not supported: {ict}")
2425

26+
data.pop("ui", None)
27+
2528
return ICT(**data)
2629

30+
ict.pop("ui", None)
31+
2732
return ICT(**ict)

src/sophios/api/utils/ict/ict_spec/io/objects.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,5 +144,4 @@ def _output_to_cwl(self, inputs: Any) -> dict:
144144
cwl_dict_["format"] = self.convert_uri_format(self.io_format["uri"])
145145
return cwl_dict_
146146

147-
raise ValueError(f"Output {self.name} not found in inputs")
148147
raise NotImplementedError(f"Output not supported {self.name}")

src/sophios/api/utils/ict/ict_spec/model.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,31 @@ class ICT(Metadata):
2424

2525
inputs: list[IO]
2626
outputs: list[IO]
27-
ui: list[UIItem]
27+
ui: Optional[list[UIItem]] = None
2828
hardware: Optional[HardwareRequirements] = None
2929

3030
@model_validator(mode="after")
3131
def validate_ui(self) -> "ICT":
3232
"""Validate that the ui matches the inputs and outputs."""
33-
io_dict = {"inputs": [], "outputs": []} # type: ignore
34-
ui_keys = [ui.key.root.split(".") for ui in self.ui]
35-
for ui_ in ui_keys:
36-
io_dict[ui_[0]].append(ui_[1])
37-
input_names = [io.name for io in self.inputs]
38-
output_names = [io.name for io in self.outputs]
39-
inp_bool = [x in input_names for x in io_dict["inputs"]]
40-
out_bool = [x in output_names for x in io_dict["outputs"]]
41-
42-
# if not all(inp_bool):
43-
# raise ValueError(
44-
# f"The ui keys must match the inputs and outputs keys. Unmatched: inputs.{set(io_dict['inputs'])-set(input_names)}"
45-
# )
46-
# if not all(out_bool):
47-
# raise ValueError(
48-
# f"The ui keys must match the inputs and outputs keys. Unmatched: outputs.{set(io_dict['outputs'])-set(output_names)}"
49-
# )
33+
if self.ui is not None:
34+
io_dict = {"inputs": [], "outputs": []} # type: ignore
35+
ui_keys = [ui.key.root.split(".") for ui in self.ui]
36+
for ui_ in ui_keys:
37+
io_dict[ui_[0]].append(ui_[1])
38+
input_names = [io.name for io in self.inputs]
39+
output_names = [io.name for io in self.outputs]
40+
inp_bool = [x in input_names for x in io_dict["inputs"]]
41+
out_bool = [x in output_names for x in io_dict["outputs"]]
42+
43+
# if not all(inp_bool):
44+
# raise ValueError(
45+
# f"The ui keys must match the inputs and outputs keys. Unmatched: inputs.{set(io_dict['inputs'])-set(input_names)}"
46+
# )
47+
# if not all(out_bool):
48+
# raise ValueError(
49+
# f"The ui keys must match the inputs and outputs keys. Unmatched: outputs.{set(io_dict['outputs'])-set(output_names)}"
50+
# )
51+
5052
return self
5153

5254
def to_clt(self, network_access: bool = False) -> dict:

src/sophios/api/utils/ict/ict_spec/tools/cwl_ict.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def clt_dict(ict_: "ICT", network_access: bool) -> dict:
3232
},
3333
"outputs": {
3434
io.name: io._output_to_cwl(
35-
[io.name for io in ict_.inputs]
35+
[io.name for io in ict_.outputs]
3636
) # pylint: disable=W0212
3737
for io in ict_.outputs
3838
},

0 commit comments

Comments
 (0)