Skip to content

Commit 7cfcd65

Browse files
authored
[ML][Pipelines]Support parallel_for output mapping (Azure#27778)
* add type mapping * fix pipeline output * fix local failed tests * add ut * add e2e test * fix tests * fix comment * fix test
1 parent 52332fd commit 7cfcd65

11 files changed

+2229
-317
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/parallel_for.py

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@
88
from azure.ai.ml._schema import PathAwareSchema
99
from azure.ai.ml._schema.pipeline.control_flow_job import ParallelForSchema
1010
from azure.ai.ml._utils.utils import is_data_binding_expression
11-
from azure.ai.ml.constants._component import ControlFlowType
11+
from azure.ai.ml.constants import AssetTypes
12+
from azure.ai.ml.constants._component import ControlFlowType, ComponentParameterTypes
1213
from azure.ai.ml.entities import Component
1314
from azure.ai.ml.entities._builders import BaseNode
1415
from azure.ai.ml.entities._builders.control_flow_node import LoopNode
1516
from azure.ai.ml.entities._job.pipeline._io import NodeOutput, PipelineInput
1617
from azure.ai.ml.entities._job.pipeline._io.mixin import NodeIOMixin
1718
from azure.ai.ml.entities._util import validate_attribute_type
19+
from azure.ai.ml.exceptions import UserErrorException
1820

1921

2022
class ParallelFor(LoopNode, NodeIOMixin):
@@ -32,6 +34,16 @@ class ParallelFor(LoopNode, NodeIOMixin):
3234
:type max_concurrency: int
3335
"""
3436

37+
OUT_TYPE_MAPPING = {
38+
AssetTypes.URI_FILE: AssetTypes.MLTABLE,
39+
AssetTypes.URI_FOLDER: AssetTypes.MLTABLE,
40+
AssetTypes.MLTABLE: AssetTypes.MLTABLE,
41+
ComponentParameterTypes.NUMBER: ComponentParameterTypes.STRING,
42+
ComponentParameterTypes.STRING: ComponentParameterTypes.STRING,
43+
ComponentParameterTypes.BOOLEAN: ComponentParameterTypes.STRING,
44+
ComponentParameterTypes.INTEGER: ComponentParameterTypes.STRING,
45+
}
46+
3547
def __init__(
3648
self,
3749
*,
@@ -55,11 +67,12 @@ def __init__(
5567
# parallel for node shares output meta with body
5668
try:
5769
outputs = self.body._component.outputs
70+
# transform body outputs to aggregate types when available
71+
self._outputs = self._build_outputs_dict(output_definition_dict=self._convert_output_meta(outputs),
72+
outputs={})
5873
except AttributeError:
59-
outputs = {}
60-
61-
# TODO: handle when body don't have component or component.outputs
62-
self._outputs = self._build_outputs_dict_without_meta(outputs, none_data=True)
74+
# when body output not available, create default output builder without meta
75+
self._outputs = self._build_outputs_dict_without_meta(outputs={}, none_data=True)
6376

6477
self._items = items
6578
self._validate_items(raise_error=True)
@@ -108,6 +121,28 @@ def _create_instance_from_schema_dict(cls, pipeline_jobs, loaded_data):
108121
**loaded_data
109122
)
110123

124+
def _convert_output_meta(self, outputs):
125+
"""Convert output meta to aggregate types."""
126+
# pylint: disable=protected-access
127+
aggregate_outputs = {}
128+
for name, output in outputs.items():
129+
if output.type in self.OUT_TYPE_MAPPING:
130+
new_type = self.OUT_TYPE_MAPPING[output.type]
131+
else:
132+
raise UserErrorException(
133+
"Referencing output with type {} is not supported in parallel_for node.".format(output.type)
134+
)
135+
if isinstance(output, NodeOutput):
136+
output = output._to_job_output()
137+
if isinstance(output, Output):
138+
out_dict = output._to_dict()
139+
out_dict["type"] = new_type
140+
resolved_output = Output(**out_dict)
141+
else:
142+
resolved_output = Output(type=new_type)
143+
aggregate_outputs[name] = resolved_output
144+
return aggregate_outputs
145+
111146
def _validate_items(self, raise_error=True):
112147
validation_result = self._create_empty_validation_result()
113148
if self.items is not None:

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/pipeline/_io/base.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,11 @@ def _to_job_output(self):
537537
if isinstance(self._data, Output):
538538
# For pipeline output with type Output, always pass to backend.
539539
return self._data
540+
if self._data is None and self._meta and self._meta.type:
541+
# For un-configured pipeline output with meta, we need to return Output with accurate type,
542+
# so it won't default to uri_folder.
543+
return Output(type=self._meta.type)
544+
540545
return super(PipelineOutput, self)._to_job_output()
541546

542547
def _data_binding(self):

sdk/ml/azure-ai-ml/tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,7 @@ def enable_pipeline_private_preview_features(mocker: MockFixture):
617617
mocker.patch("azure.ai.ml.dsl._pipeline_component_builder.is_private_preview_enabled", return_value=True)
618618
mocker.patch("azure.ai.ml._schema.pipeline.pipeline_component.is_private_preview_enabled", return_value=True)
619619
mocker.patch("azure.ai.ml.entities._schedule.schedule.is_private_preview_enabled", return_value=True)
620+
mocker.patch("azure.ai.ml.dsl._pipeline_decorator.is_private_preview_enabled", return_value=True)
620621

621622

622623
@pytest.fixture()

sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_controlflow_pipeline.py

Lines changed: 121 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626

2727
@pytest.mark.usefixtures(
28+
"enable_private_preview_schema_features",
2829
"enable_environment_id_arm_expansion",
2930
"enable_pipeline_private_preview_features",
3031
"mock_code_hash",
@@ -335,7 +336,6 @@ def parallel_for_pipeline():
335336

336337
with include_private_preview_nodes_in_pipeline():
337338
pipeline_job = assert_job_cancel(pipeline_job, client)
338-
339339
dsl_pipeline_job_dict = omit_with_wildcard(pipeline_job._to_rest_object().as_dict(), *omit_fields)
340340
assert dsl_pipeline_job_dict["properties"]["jobs"] == {
341341
'after_node': {
@@ -460,3 +460,123 @@ def parallel_for_pipeline():
460460
'{"component_in_number": 2}]',
461461
'type': 'parallel_for'}
462462
}
463+
464+
def test_parallel_for_pipeline_with_port_outputs(self, client: MLClient):
465+
hello_world_component = load_component(
466+
source="./tests/test_configs/components/helloworld_component.yml",
467+
params_override=[
468+
{"outputs": {
469+
"component_out_path": {"type": "uri_folder"},
470+
"component_out_file": {"type": "uri_file"},
471+
"component_out_table": {"type": "mltable"},
472+
}}
473+
]
474+
)
475+
476+
@pipeline
477+
def parallel_for_pipeline():
478+
parallel_body = hello_world_component(component_in_path=test_input)
479+
parallel_node = parallel_for(
480+
body=parallel_body,
481+
items=[
482+
{"component_in_number": 1},
483+
{"component_in_number": 2},
484+
]
485+
)
486+
return {
487+
"component_out_path": parallel_node.outputs.component_out_path,
488+
"component_out_file": parallel_node.outputs.component_out_file,
489+
"component_out_table": parallel_node.outputs.component_out_table,
490+
}
491+
492+
pipeline_job = parallel_for_pipeline()
493+
pipeline_job.settings.default_compute = "cpu-cluster"
494+
495+
with include_private_preview_nodes_in_pipeline():
496+
pipeline_job = assert_job_cancel(pipeline_job, client)
497+
498+
dsl_pipeline_job_dict = omit_with_wildcard(pipeline_job._to_rest_object().as_dict(), *omit_fields)
499+
assert dsl_pipeline_job_dict["properties"]["jobs"] == {
500+
'parallel_body': {'_source': 'REMOTE.WORKSPACE.COMPONENT',
501+
'inputs': {'component_in_path': {
502+
'job_input_type': 'uri_file',
503+
'uri': 'https://dprepdata.blob.core.windows.net/demo/Titanic.csv'}},
504+
'name': 'parallel_body',
505+
'type': 'command'},
506+
'parallel_node': {'body': '${{parent.jobs.parallel_body}}',
507+
'items': '[{"component_in_number": 1}, '
508+
'{"component_in_number": 2}]',
509+
'type': 'parallel_for'}
510+
}
511+
assert dsl_pipeline_job_dict["properties"]["outputs"] == {
512+
'component_out_file': {'job_output_type': 'mltable',
513+
'mode': 'ReadWriteMount'},
514+
'component_out_path': {'job_output_type': 'mltable',
515+
'mode': 'ReadWriteMount'},
516+
'component_out_table': {'job_output_type': 'mltable',
517+
'mode': 'ReadWriteMount'}
518+
}
519+
520+
# parallel for pipeline component is correctly generated
521+
@pipeline
522+
def parent_pipeline():
523+
parallel_for_pipeline()
524+
525+
pipeline_job = parent_pipeline()
526+
pipeline_job.settings.default_compute = "cpu-cluster"
527+
528+
rest_pipeline_component = pipeline_job.jobs["parallel_for_pipeline"].component._to_rest_object().as_dict()
529+
assert rest_pipeline_component["properties"]["component_spec"]["outputs"] == {
530+
'component_out_file': {'type': 'mltable'},
531+
'component_out_path': {'type': 'mltable'},
532+
'component_out_table': {'type': 'mltable'}
533+
}
534+
535+
with include_private_preview_nodes_in_pipeline():
536+
assert_job_cancel(pipeline_job, client)
537+
538+
def test_parallel_for_pipeline_with_primitive_outputs(self, client: MLClient):
539+
hello_world_component = load_component(
540+
source="./tests/test_configs/components/helloworld_component.yml",
541+
params_override=[
542+
{"outputs": {
543+
"component_out_path": {"type": "uri_folder"},
544+
"component_out_number": {"type": "number"},
545+
"component_out_boolean": {"type": "boolean", "is_control": True},
546+
}}
547+
]
548+
)
549+
550+
@pipeline
551+
def parallel_for_pipeline():
552+
parallel_body = hello_world_component(component_in_path=test_input)
553+
parallel_node = parallel_for(
554+
body=parallel_body,
555+
items=[
556+
{"component_in_number": 1},
557+
{"component_in_number": 2},
558+
]
559+
)
560+
return {
561+
"component_out_path": parallel_node.outputs.component_out_path,
562+
"component_out_number": parallel_node.outputs.component_out_number,
563+
"component_out_boolean": parallel_node.outputs.component_out_boolean,
564+
}
565+
566+
@pipeline
567+
def parent_pipeline():
568+
parallel_for_pipeline()
569+
570+
pipeline_job = parent_pipeline()
571+
pipeline_job.settings.default_compute = "cpu-cluster"
572+
573+
rest_pipeline_component = pipeline_job.jobs["parallel_for_pipeline"].component._to_rest_object().as_dict()
574+
assert rest_pipeline_component["properties"]["component_spec"]["outputs"] == {
575+
'component_out_boolean': {'is_control': True, 'type': 'string'},
576+
'component_out_number': {'type': 'string'},
577+
'component_out_path': {'type': 'mltable'}
578+
}
579+
580+
# parallel for pipeline component is correctly generated
581+
with include_private_preview_nodes_in_pipeline():
582+
assert_job_cancel(pipeline_job, client)

sdk/ml/azure-ai-ml/tests/dsl/e2etests/test_dsl_pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ def mixed_pipeline(job_in_number, job_in_path):
474474
"outputs": {
475475
"pipeline_job_out": {
476476
"mode": "ReadWriteMount",
477-
"job_output_type": "uri_folder",
477+
"job_output_type": "mlflow_model",
478478
}
479479
},
480480
"settings": {},

sdk/ml/azure-ai-ml/tests/dsl/unittests/test_controlflow_pipeline.py

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
from azure.ai.ml.dsl import pipeline
55
from azure.ai.ml.dsl._parallel_for import parallel_for
66
from azure.ai.ml.entities import Command
7-
from azure.ai.ml.exceptions import ValidationException
7+
from azure.ai.ml.exceptions import ValidationException, UserErrorException
88
from .._util import _DSL_TIMEOUT_SECOND, include_private_preview_nodes_in_pipeline
99

1010

1111
@pytest.mark.usefixtures(
1212
"enable_pipeline_private_preview_features",
13+
"enable_private_preview_schema_features"
1314
)
1415
@pytest.mark.timeout(_DSL_TIMEOUT_SECOND)
1516
@pytest.mark.unittest
@@ -234,3 +235,76 @@ def my_pipeline():
234235
rest_job = my_job._to_rest_object().as_dict()
235236
rest_items = rest_job["properties"]["jobs"]["parallelfor"]["items"]
236237
assert rest_items == '[{"component_in_number": 1}, {"component_in_number": 2}]'
238+
239+
@pytest.mark.parametrize(
240+
"output_dict, pipeline_out_dict, component_out_dict, check_pipeline_job",
241+
[
242+
({"type": "uri_file"}, {'job_output_type': 'mltable'}, {'type': 'mltable'}, True),
243+
({"type": "uri_folder"}, {'job_output_type': 'mltable'}, {'type': 'mltable'}, True),
244+
({"type": "mltable"}, {'job_output_type': 'mltable'}, {'type': 'mltable'}, True),
245+
({"type": "number"}, {}, {'type': 'string'}, False),
246+
({"type": "string", "is_control": True}, {}, {'type': 'string', "is_control": True}, False),
247+
({"type": "boolean", "is_control": True}, {}, {'type': 'string', "is_control": True}, False),
248+
({"type": "integer"}, {}, {'type': 'string'}, False),
249+
]
250+
)
251+
def test_parallel_for_outputs(self, output_dict, pipeline_out_dict, component_out_dict, check_pipeline_job):
252+
basic_component = load_component(
253+
source="./tests/test_configs/components/helloworld_component.yml",
254+
params_override=[
255+
{"outputs.component_out_path": output_dict}
256+
]
257+
)
258+
259+
@pipeline
260+
def my_pipeline():
261+
body = basic_component(component_in_path=Input(path="test_path1"))
262+
263+
foreach_node = parallel_for(
264+
body=body,
265+
items={
266+
"iter1": {"component_in_number": 1},
267+
"iter2": {"component_in_number": 2}
268+
}
269+
)
270+
return {
271+
"output": foreach_node.outputs.component_out_path
272+
}
273+
274+
my_job = my_pipeline()
275+
276+
if check_pipeline_job:
277+
rest_job = my_job._to_rest_object().as_dict()
278+
rest_outputs = rest_job["properties"]["outputs"]
279+
assert rest_outputs == {'output': pipeline_out_dict}
280+
281+
pipeline_component = my_job.component
282+
rest_component = pipeline_component._to_rest_object().as_dict()
283+
assert rest_component["properties"]["component_spec"]["outputs"] == {'output': component_out_dict}
284+
285+
@pytest.mark.parametrize(
286+
"out_type", ["mlflow_model", "triton_model", "custom_model"]
287+
)
288+
def test_parallel_for_output_unsupported_case(self, out_type):
289+
basic_component = load_component(
290+
source="./tests/test_configs/components/helloworld_component.yml",
291+
params_override=[
292+
{"outputs.component_out_path": {"type": out_type}}
293+
]
294+
)
295+
296+
@pipeline
297+
def my_pipeline():
298+
body = basic_component(component_in_path=Input(path="test_path1"))
299+
300+
parallel_for(
301+
body=body,
302+
items={
303+
"iter1": {"component_in_number": 1},
304+
"iter2": {"component_in_number": 2}
305+
}
306+
)
307+
308+
with pytest.raises(UserErrorException) as e:
309+
my_pipeline()
310+
assert f"Referencing output with type {out_type} is not supported" in str(e.value)

0 commit comments

Comments
 (0)