Skip to content

Commit 5e0d888

Browse files
YingChen1996Ying Chen
andauthored
Keep node level original source when update pipeline job. (#34379)
* keep node original source * update test and recording * revert * update recording * format * recording failed test * update * revert recording files --------- Co-authored-by: Ying Chen <[email protected]>
1 parent 3a27963 commit 5e0d888

File tree

5 files changed

+37
-37
lines changed

5 files changed

+37
-37
lines changed

sdk/ml/azure-ai-ml/assets.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@
22
"AssetsRepo": "Azure/azure-sdk-assets",
33
"AssetsRepoPrefixPath": "python",
44
"TagPrefix": "python/ml/azure-ai-ml",
5-
"Tag": "python/ml/azure-ai-ml_1af7fe2f76"
5+
"Tag": "python/ml/azure-ai-ml_2ff0425fbf"
66
}

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/base_node.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ def __init__(
139139
) -> None:
140140
self._init = True
141141
# property _source can't be set
142-
kwargs.pop("_source", None)
142+
source = kwargs.pop("_source", None)
143143
_from_component_func = kwargs.pop("_from_component_func", False)
144144
self._name: Optional[str] = None
145145
super(BaseNode, self).__init__(
@@ -186,11 +186,12 @@ def __init__(
186186
# add current component in pipeline stack for dsl scenario
187187
self._register_in_current_pipeline_component_builder()
188188

189-
self._source = (
190-
self._component._source
191-
if isinstance(self._component, Component)
192-
else Component._resolve_component_source_from_id(id=self._component)
193-
)
189+
if source is None:
190+
if isinstance(component, Component):
191+
source = self._component._source
192+
else:
193+
source = Component._resolve_component_source_from_id(id=self._component)
194+
self._source = source
194195
self._validate_required_input_not_provided = True
195196
self._init = False
196197

sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_controlflow_pipeline.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -83,19 +83,19 @@ def condition_pipeline():
8383
"type": "if_else",
8484
},
8585
"node1": {
86-
"_source": "REMOTE.WORKSPACE.COMPONENT",
86+
"_source": "YAML.COMPONENT",
8787
"inputs": {"component_in_number": {"job_input_type": "literal", "value": "1"}},
8888
"name": "node1",
8989
"type": "command",
9090
},
9191
"node2": {
92-
"_source": "REMOTE.WORKSPACE.COMPONENT",
92+
"_source": "YAML.COMPONENT",
9393
"inputs": {"component_in_number": {"job_input_type": "literal", "value": "2"}},
9494
"name": "node2",
9595
"type": "command",
9696
},
9797
"result": {
98-
"_source": "REMOTE.WORKSPACE.COMPONENT",
98+
"_source": "YAML.COMPONENT",
9999
"name": "result",
100100
"type": "command",
101101
},
@@ -129,13 +129,13 @@ def condition_pipeline():
129129
"type": "if_else",
130130
},
131131
"node1": {
132-
"_source": "REMOTE.WORKSPACE.COMPONENT",
132+
"_source": "YAML.COMPONENT",
133133
"inputs": {"component_in_number": {"job_input_type": "literal", "value": "1"}},
134134
"name": "node1",
135135
"type": "command",
136136
},
137137
"node2": {
138-
"_source": "REMOTE.WORKSPACE.COMPONENT",
138+
"_source": "YAML.COMPONENT",
139139
"inputs": {"component_in_number": {"job_input_type": "literal", "value": "2"}},
140140
"name": "node2",
141141
"type": "command",
@@ -167,7 +167,7 @@ def condition_pipeline():
167167
"type": "if_else",
168168
},
169169
"node1": {
170-
"_source": "REMOTE.WORKSPACE.COMPONENT",
170+
"_source": "YAML.COMPONENT",
171171
"inputs": {"component_in_number": {"job_input_type": "literal", "value": "1"}},
172172
"name": "node1",
173173
"type": "command",
@@ -270,7 +270,7 @@ def test_pipeline(input_data, int_param, bool_param, float_param, str_param):
270270
"type": "if_else",
271271
},
272272
"do_while_body_func": {
273-
"_source": "REMOTE.WORKSPACE.COMPONENT",
273+
"_source": "YAML.COMPONENT",
274274
"inputs": {
275275
"bool_param": {"job_input_type": "literal", "value": "${{parent.inputs.bool_param}}"},
276276
"float_param": {"job_input_type": "literal", "value": "${{parent.inputs.float_param}}"},
@@ -298,7 +298,7 @@ def test_pipeline(input_data, int_param, bool_param, float_param, str_param):
298298
"type": "do_while",
299299
},
300300
"primitive_output_component_true": {
301-
"_source": "REMOTE.WORKSPACE.COMPONENT",
301+
"_source": "YAML.COMPONENT",
302302
"inputs": {
303303
"input_data": {
304304
"job_input_type": "literal",
@@ -581,7 +581,7 @@ def parallel_for_pipeline():
581581
dsl_pipeline_job_dict = omit_with_wildcard(pipeline_job._to_rest_object().as_dict(), *omit_fields)
582582
assert dsl_pipeline_job_dict["properties"]["jobs"] == {
583583
"after_node": {
584-
"_source": "REMOTE.WORKSPACE.COMPONENT",
584+
"_source": "YAML.COMPONENT",
585585
"computeId": "cpu-cluster",
586586
"inputs": {
587587
"component_in_path": {
@@ -593,7 +593,7 @@ def parallel_for_pipeline():
593593
"type": "command",
594594
},
595595
"parallel_body": {
596-
"_source": "REMOTE.WORKSPACE.COMPONENT",
596+
"_source": "YAML.COMPONENT",
597597
"inputs": {
598598
"component_in_path": {
599599
"job_input_type": "uri_file",
@@ -637,7 +637,7 @@ def parallel_for_pipeline():
637637
dsl_pipeline_job_dict = omit_with_wildcard(pipeline_job._to_rest_object().as_dict(), *omit_fields)
638638
assert dsl_pipeline_job_dict["properties"]["jobs"] == {
639639
"after_node": {
640-
"_source": "REMOTE.WORKSPACE.COMPONENT",
640+
"_source": "YAML.COMPONENT",
641641
"computeId": "cpu-cluster",
642642
"inputs": {
643643
"component_in_number": {"job_input_type": "literal", "value": "1"},
@@ -650,7 +650,7 @@ def parallel_for_pipeline():
650650
"type": "command",
651651
},
652652
"parallel_body": {
653-
"_source": "REMOTE.WORKSPACE.COMPONENT",
653+
"_source": "YAML.COMPONENT",
654654
"inputs": {
655655
"component_in_path": {
656656
"job_input_type": "uri_file",
@@ -700,7 +700,7 @@ def parallel_for_pipeline():
700700
dsl_pipeline_job_dict = omit_with_wildcard(pipeline_job._to_rest_object().as_dict(), *omit_fields)
701701
assert dsl_pipeline_job_dict["properties"]["jobs"] == {
702702
"after_node": {
703-
"_source": "REMOTE.WORKSPACE.COMPONENT",
703+
"_source": "YAML.COMPONENT",
704704
"computeId": "cpu-cluster",
705705
"inputs": {
706706
"component_in_path": {
@@ -711,7 +711,7 @@ def parallel_for_pipeline():
711711
"name": "after_node",
712712
"type": "command",
713713
},
714-
"parallel_body": {"_source": "REMOTE.WORKSPACE.COMPONENT", "name": "parallel_body", "type": "pipeline"},
714+
"parallel_body": {"_source": "DSL", "name": "parallel_body", "type": "pipeline"},
715715
"parallel_node": {
716716
"_source": "DSL",
717717
"body": "${{parent.jobs.parallel_body}}",
@@ -752,7 +752,7 @@ def parallel_for_pipeline():
752752
dsl_pipeline_job_dict = omit_with_wildcard(pipeline_job._to_rest_object().as_dict(), *omit_fields)
753753
assert dsl_pipeline_job_dict["properties"]["jobs"] == {
754754
"after_node": {
755-
"_source": "REMOTE.WORKSPACE.COMPONENT",
755+
"_source": "YAML.COMPONENT",
756756
"computeId": "cpu-cluster",
757757
"inputs": {
758758
"component_in_path": {
@@ -763,7 +763,7 @@ def parallel_for_pipeline():
763763
"name": "after_node",
764764
"type": "command",
765765
},
766-
"parallel_body": {"_source": "REMOTE.WORKSPACE.COMPONENT", "name": "parallel_body", "type": "pipeline"},
766+
"parallel_body": {"_source": "DSL", "name": "parallel_body", "type": "pipeline"},
767767
"parallel_node": {
768768
"_source": "DSL",
769769
"body": "${{parent.jobs.parallel_body}}",
@@ -811,7 +811,7 @@ def parallel_for_pipeline():
811811
dsl_pipeline_job_dict = omit_with_wildcard(pipeline_job._to_rest_object().as_dict(), *omit_fields)
812812
assert dsl_pipeline_job_dict["properties"]["jobs"] == {
813813
"parallel_body": {
814-
"_source": "REMOTE.WORKSPACE.COMPONENT",
814+
"_source": "YAML.COMPONENT",
815815
"inputs": {
816816
"component_in_path": {
817817
"job_input_type": "uri_file",

sdk/ml/azure-ai-ml/tests/dsl/unittests/test_dsl_pipeline.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,6 @@ def mock_add_to_builder(component):
921921
omit_fields = ["componentId", "properties"]
922922
assert pydash.omit(component_from_dsl._to_rest_object(), *omit_fields) == expected_component
923923
assert pydash.omit(component_from_sdk._to_rest_object(), *omit_fields) == expected_component
924-
expected_component.update({"_source": "REMOTE.WORKSPACE.COMPONENT"})
925924
assert pydash.omit(component_from_rest._to_rest_object(), *omit_fields) == expected_component
926925
expected_component.update({"_source": "YAML.JOB"})
927926
assert pydash.omit(component_from_yaml._to_rest_object(), *omit_fields) == expected_component

sdk/ml/azure-ai-ml/tests/pipeline_job/e2etests/test_pipeline_job.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -385,12 +385,12 @@ def test_pipeline_job_with_command_job(
385385
},
386386
"literal_input": {"job_input_type": "literal", "value": "2"},
387387
},
388-
"_source": "REMOTE.WORKSPACE.COMPONENT",
388+
"_source": "YAML.JOB",
389389
},
390390
"hello_world_inline_commandjob_2": {
391391
"type": "command",
392392
"name": "hello_world_inline_commandjob_2",
393-
"_source": "REMOTE.WORKSPACE.COMPONENT",
393+
"_source": "YAML.JOB",
394394
},
395395
},
396396
"outputs": {"job_out_path_1": {"mode": "ReadWriteMount", "job_output_type": "uri_folder"}},
@@ -474,7 +474,7 @@ def test_pipeline_job_with_command_job(
474474
"value": "${{parent.inputs.pipeline_job_test_input}}",
475475
},
476476
},
477-
"_source": "REMOTE.WORKSPACE.COMPONENT",
477+
"_source": "YAML.JOB",
478478
},
479479
},
480480
"outputs": {
@@ -1507,7 +1507,7 @@ def test_pipeline_job_with_data_transfer_copy_urifolder(self, client: MLClient,
15071507
actual_dict = pydash.omit(pipeline_dict["properties"]["jobs"]["copy_files"], fields_to_omit)
15081508

15091509
assert actual_dict == {
1510-
"_source": "REMOTE.WORKSPACE.COMPONENT",
1510+
"_source": "YAML.COMPONENT",
15111511
"data_copy_mode": "merge_with_overwrite",
15121512
"inputs": {"folder1": {"job_input_type": "literal", "value": "${{parent.inputs.cosmos_folder}}"}},
15131513
"outputs": {"output_folder": {"type": "literal", "value": "${{parent.outputs.merged_blob}}"}},
@@ -1525,7 +1525,7 @@ def test_pipeline_job_with_data_transfer_copy_urifile(self, client: MLClient, ra
15251525
actual_dict = pydash.omit(pipeline_dict["properties"]["jobs"]["copy_files"], fields_to_omit)
15261526

15271527
assert actual_dict == {
1528-
"_source": "REMOTE.WORKSPACE.COMPONENT",
1528+
"_source": "YAML.COMPONENT",
15291529
"data_copy_mode": "fail_if_conflict",
15301530
"inputs": {"folder1": {"job_input_type": "literal", "value": "${{parent.inputs.cosmos_folder}}"}},
15311531
"outputs": {"output_folder": {"type": "literal", "value": "${{parent.outputs.merged_blob}}"}},
@@ -1543,7 +1543,7 @@ def test_pipeline_job_with_data_transfer_copy_2urifolder(self, client: MLClient,
15431543
actual_dict = pydash.omit(pipeline_dict["properties"]["jobs"]["merge_files"], fields_to_omit)
15441544

15451545
assert actual_dict == {
1546-
"_source": "REMOTE.WORKSPACE.COMPONENT",
1546+
"_source": "YAML.COMPONENT",
15471547
"data_copy_mode": "merge_with_overwrite",
15481548
"inputs": {
15491549
"folder1": {"job_input_type": "literal", "value": "${{parent.inputs.cosmos_folder}}"},
@@ -1566,7 +1566,7 @@ def test_pipeline_job_with_inline_data_transfer_copy_2urifolder(
15661566
actual_dict = pydash.omit(pipeline_dict["properties"]["jobs"]["merge_files_job"], fields_to_omit)
15671567

15681568
assert actual_dict == {
1569-
"_source": "REMOTE.WORKSPACE.COMPONENT",
1569+
"_source": "YAML.JOB",
15701570
"data_copy_mode": "merge_with_overwrite",
15711571
"inputs": {
15721572
"folder1": {"job_input_type": "literal", "value": "${{parent.inputs.cosmos_folder}}"},
@@ -1589,7 +1589,7 @@ def test_pipeline_job_with_inline_data_transfer_copy_mixtype_file(
15891589
actual_dict = pydash.omit(pipeline_dict["properties"]["jobs"]["merge_files"], fields_to_omit)
15901590

15911591
assert actual_dict == {
1592-
"_source": "REMOTE.WORKSPACE.COMPONENT",
1592+
"_source": "YAML.COMPONENT",
15931593
"data_copy_mode": "merge_with_overwrite",
15941594
"inputs": {
15951595
"input1": {"job_input_type": "literal", "value": "${{parent.inputs.input1}}"},
@@ -1613,7 +1613,7 @@ def test_pipeline_job_with_data_transfer_import_filesystem(self, client: MLClien
16131613
# load from rest will get source from component, which will be REMOTE.REGISTRY since component now is
16141614
# registry component
16151615
assert actual_dict == {
1616-
"_source": "REMOTE.REGISTRY",
1616+
"_source": "BUILTIN",
16171617
"outputs": {
16181618
"sink": {
16191619
"job_output_type": "uri_folder",
@@ -1671,7 +1671,7 @@ def test_pipeline_job_with_data_transfer_import_sql_database(self, client: MLCli
16711671
actual_dict = pydash.omit(pipeline_dict["properties"]["jobs"]["snowflake_blob"], fields_to_omit)
16721672

16731673
assert actual_dict == {
1674-
"_source": "REMOTE.REGISTRY",
1674+
"_source": "BUILTIN",
16751675
"computeId": "serverless",
16761676
"outputs": {"sink": {"job_output_type": "mltable"}},
16771677
"source": {
@@ -1695,7 +1695,7 @@ def test_pipeline_job_with_data_transfer_import_snowflake_database(
16951695
actual_dict = pydash.omit(pipeline_dict["properties"]["jobs"]["snowflake_blob"], fields_to_omit)
16961696

16971697
assert actual_dict == {
1698-
"_source": "REMOTE.REGISTRY",
1698+
"_source": "BUILTIN",
16991699
"computeId": "serverless",
17001700
"outputs": {
17011701
"sink": {
@@ -1722,7 +1722,7 @@ def test_pipeline_job_with_data_transfer_export_sql_database(self, client: MLCli
17221722
actual_dict = pydash.omit(pipeline_dict["properties"]["jobs"]["blob_azuresql"], fields_to_omit)
17231723

17241724
assert actual_dict == {
1725-
"_source": "REMOTE.REGISTRY",
1725+
"_source": "BUILTIN",
17261726
"inputs": {"source": {"job_input_type": "literal", "value": "${{parent.inputs.cosmos_folder}}"}},
17271727
"sink": {
17281728
"connection": "${{parent.inputs.connection_target_azuresql}}",

0 commit comments

Comments
 (0)