Skip to content

Commit d31e98c

Browse files
authored
Separate jobs from schema load (Azure#27019)
Signed-off-by: Brynn Yin <[email protected]> Signed-off-by: Brynn Yin <[email protected]>
1 parent e4ab9f3 commit d31e98c

File tree

8 files changed

+161
-9
lines changed

8 files changed

+161
-9
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/component_factory.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,11 @@ def load_from_rest(self, *, obj: ComponentVersionData, _type: str = None) -> Com
183183
if distribution:
184184
distribution = DistributionConfiguration._from_rest_object(distribution)
185185

186+
# Note: we need to refine the logic here if more specific type logic here.
187+
jobs = rest_component_version.component_spec.pop("jobs", None)
188+
if _type == NodeType.PIPELINE and jobs:
189+
jobs = PipelineComponent._resolve_sub_nodes(jobs)
190+
186191
new_instance = create_instance_func()
187192
init_kwargs = dict(
188193
id=obj.id,
@@ -191,6 +196,7 @@ def load_from_rest(self, *, obj: ComponentVersionData, _type: str = None) -> Com
191196
inputs=inputs,
192197
outputs=outputs,
193198
distribution=distribution,
199+
jobs=jobs,
194200
**(
195201
create_schema_func({BASE_PATH_CONTEXT_KEY: "./"}).load(
196202
rest_component_version.component_spec, unknown=INCLUDE

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_component/pipeline_component.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from azure.ai.ml.constants._component import ComponentSource, NodeType
2222
from azure.ai.ml.constants._job.pipeline import ValidationErrorCode
2323
from azure.ai.ml.entities._builders import BaseNode, Command
24-
from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode
24+
from azure.ai.ml.entities._builders.control_flow_node import ControlFlowNode, LoopNode
2525
from azure.ai.ml.entities._component.component import Component
2626
from azure.ai.ml.entities._inputs_outputs import GroupInput, Input, Output
2727
from azure.ai.ml.entities._job.automl.automl_job import AutoMLJob
@@ -323,6 +323,16 @@ def _load_from_rest_pipeline_job(cls, data: Dict):
323323
_source=ComponentSource.REMOTE_WORKSPACE_JOB,
324324
)
325325

326+
@classmethod
327+
def _resolve_sub_nodes(cls, rest_jobs):
328+
sub_nodes = {}
329+
for node_name, node in rest_jobs.items():
330+
if LoopNode._is_loop_node_dict(node):
331+
sub_nodes[node_name] = LoopNode._from_rest_object(node, reference_node_list=sub_nodes)
332+
else:
333+
sub_nodes[node_name] = BaseNode._from_rest_object(node)
334+
return sub_nodes
335+
326336
@classmethod
327337
def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema]:
328338
return PipelineComponentSchema(context=context)

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
}
6565

6666

67+
# TODO: Remove this as both rest type and sdk type are snake case now.
6768
def get_output_type_mapping_from_rest():
6869
"""Get output type mapping."""
6970
return {
@@ -255,6 +256,7 @@ def from_rest_inputs_to_dataset_literal(
255256
if input_value is None:
256257
continue
257258

259+
# TODO: Remove this as both rest type and sdk type are snake case now.
258260
type_transfer_dict = get_output_type_mapping_from_rest()
259261
# deal with invalid input type submitted by feb api
260262
# todo: backend help convert node level input/output type

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_pipeline_job_helpers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def from_dict_to_rest_io(
117117
rest_obj = rest_object_class.from_dict(val)
118118
rest_io_objects[key] = rest_obj
119119
else:
120-
msg = "Got unsupported type of output: {}:" + f"{type(val)}"
120+
msg = "Got unsupported type of input/output: {}:" + f"{type(val)}"
121121
raise ValidationException(
122122
message=msg.format(val),
123123
no_personal_data_message=msg.format("[val]"),

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/pipeline_job.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -497,13 +497,7 @@ def _load_from_rest(cls, obj: JobBase) -> "PipelineJob":
497497
from_rest_inputs = from_rest_inputs_to_dataset_literal(properties.inputs) or {}
498498
from_rest_outputs = from_rest_data_outputs(properties.outputs) or {}
499499
# Unpack the component jobs
500-
sub_nodes = {}
501-
if properties.jobs:
502-
for node_name, node in properties.jobs.items():
503-
if LoopNode._is_loop_node_dict(node):
504-
sub_nodes[node_name] = LoopNode._from_rest_object(node, reference_node_list=sub_nodes)
505-
else:
506-
sub_nodes[node_name] = BaseNode._from_rest_object(node)
500+
sub_nodes = PipelineComponent._resolve_sub_nodes(properties.jobs) if properties.jobs else {}
507501
# backend may still store Camel settings, eg: DefaultDatastore, translate them to snake when load back
508502
settings_dict = transform_dict_keys(properties.settings, camel_to_snake) if properties.settings else None
509503
settings_sdk = PipelineJobSettings(**settings_dict) if settings_dict else PipelineJobSettings()

sdk/ml/azure-ai-ml/tests/component/unittests/test_pipeline_component_entity.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from pathlib import Path
33

44
import pytest
5+
import yaml
56

67
from azure.ai.ml import Input, load_component, load_job
78
from azure.ai.ml.entities import PipelineComponent, PipelineJob
@@ -416,3 +417,29 @@ def test_invalid_nested_pipeline_component_with_group(self) -> None:
416417
"'group' is defined as a parameter group but got input '${{parent.inputs.top_group}}' with type '<class 'str'>'"
417418
in str(e.value)
418419
)
420+
421+
def test_simple_jobs_from_rest(self) -> None:
422+
test_path = "./tests/test_configs/components/pipeline_component_jobs_rest_data.json"
423+
with open(test_path, "r") as f:
424+
json_in_file = yaml.safe_load(f)
425+
json_in_file = json_in_file['properties']['component_spec']['jobs']
426+
jobs = PipelineComponent._resolve_sub_nodes(json_in_file)
427+
node_dict = {key: node._to_rest_object() for key, node in jobs.items()}['component_a_job']
428+
assert node_dict['computeId'] == '${{parent.inputs.node_compute}}'
429+
assert node_dict['outputs'] == {
430+
'output_binding': {'type': 'literal', 'value': '${{parent.outputs.output}}'},
431+
'output_binding2': {'type': 'literal', 'value': '${{parent.outputs.output}}'},
432+
'output_data': {'job_output_type': 'uri_folder', 'mode': 'Upload'},
433+
'output_data_legacy': {'job_output_type': 'uri_folder', 'mode': 'Upload'}}
434+
assert node_dict['inputs'] == {
435+
'binding_input': {'job_input_type': 'literal', 'value': '${{parent.inputs.component_in_path}}'},
436+
'data_input': {'job_input_type': 'uri_file',
437+
'mode': 'Download',
438+
'uri': 'https://my-blob/path/to/data'},
439+
'data_input_legacy': {'job_input_type': 'uri_file',
440+
'mode': 'Download',
441+
'uri': 'https://my-blob/path/to/data'},
442+
'literal_input': {'job_input_type': 'literal', 'value': '11'},
443+
'literal_input2': {'job_input_type': 'literal', 'value': '12'}}
444+
assert node_dict['resources'] == {'instance_count': 1, 'properties': {
445+
'target_selector': {'my_resource_only': 'false', 'allow_spot_vm': 'true'}}, 'shm_size': '2g'}

sdk/ml/azure-ai-ml/tests/internal/e2etests/test_pipeline_job.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ def pipeline_func(pipeline_input):
181181
except HttpResponseError as ex:
182182
assert "CancelPipelineRunInTerminalStatus" in str(ex)
183183

184+
# TODO: Enable this when type fixed on master.
184185
@pytest.mark.skip(reason="marshmallow.exceptions.ValidationError: miss required jobs.node.component")
185186
@pytest.mark.parametrize(
186187
"test_case_i,test_case_name",
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
{
2+
"id": "mock_id",
3+
"name": "1",
4+
"type": "Microsoft.MachineLearningServices/workspaces/components/versions",
5+
"system_data": {
6+
"created_by": "Brynn Yin",
7+
"created_by_type": "User",
8+
"created_at": "2022-10-25T03:39:27.465966Z",
9+
"last_modified_by": "Brynn Yin",
10+
"last_modified_by_type": "User",
11+
"last_modified_at": "2022-10-25T03:39:28.104555Z"
12+
},
13+
"properties": {
14+
"properties": {},
15+
"tags": {
16+
"tag": "tagvalue",
17+
"owner": "sdkteam"
18+
},
19+
"is_anonymous": false,
20+
"is_archived": false,
21+
"component_spec": {
22+
"name": "test_392226085584",
23+
"version": "1",
24+
"display_name": "Hello World Pipeline Component",
25+
"is_deterministic": "False",
26+
"type": "pipeline",
27+
"description": "This is the basic pipeline component",
28+
"tags": {
29+
"tag": "tagvalue",
30+
"owner": "sdkteam"
31+
},
32+
"inputs": {
33+
"component_in_path": {
34+
"type": "uri_folder",
35+
"optional": "False",
36+
"description": "A path"
37+
},
38+
"component_in_number": {
39+
"type": "number",
40+
"optional": "True",
41+
"default": "10.99",
42+
"description": "A number"
43+
},
44+
"node_compute": {
45+
"type": "string",
46+
"optional": "False",
47+
"default": "cpu-cluster"
48+
}
49+
},
50+
"jobs": {
51+
"component_a_job": {
52+
"componentId": "mock_id",
53+
"type": "command",
54+
"computeId": "${{parent.inputs.node_compute}}",
55+
"resources": {
56+
"instance_count": "1",
57+
"shm_size": "2g",
58+
"properties": {
59+
"target_selector": {
60+
"my_resource_only": "false",
61+
"allow_spot_vm": "true"
62+
}
63+
}
64+
},
65+
"inputs": {
66+
"binding_input": {
67+
"job_input_type": "literal",
68+
"value": "${{parent.inputs.component_in_path}}"
69+
},
70+
"literal_input": {
71+
"job_input_type": "literal",
72+
"value": "11"
73+
},
74+
"literal_input2": {
75+
"job_input_type": "Literal",
76+
"value": "12"
77+
},
78+
"data_input": {
79+
"job_input_type": "uri_file",
80+
"mode": "Download",
81+
"uri": "https://my-blob/path/to/data"
82+
},
83+
"data_input_legacy": {
84+
"job_input_type": "UriFile",
85+
"mode": "Download",
86+
"uri": "https://my-blob/path/to/data"
87+
}
88+
},
89+
"outputs": {
90+
"output_data": {
91+
"mode": "Upload",
92+
"job_output_type": "uri_folder"
93+
},
94+
"output_data_legacy": {
95+
"mode": "Upload",
96+
"job_output_type": "UriFolder"
97+
},
98+
"output_binding": {
99+
"value": "${{parent.outputs.output}}",
100+
"job_output_type": "literal"
101+
},
102+
"output_binding2": {
103+
"value": "${{parent.outputs.output}}",
104+
"job_output_type": "Literal"
105+
}
106+
}
107+
}
108+
},
109+
"$schema": "https://azuremlschemas.azureedge.net/development/pipelineComponent.schema.json"
110+
}
111+
}
112+
}

0 commit comments

Comments
 (0)