Skip to content

Commit 505d410

Browse files
authored
[ML][Pipelines] refactor for reference from other packages (Azure#31929)
* fix: update message when user use an internal spark when not enabled * refactor: divide validation related code for further vendor * fix: support load run.yaml of a flow with additional_includes
1 parent 29c8d48 commit 505d410

38 files changed

+1168
-648
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/_internal/_setup.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
# ---------------------------------------------------------
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# ---------------------------------------------------------
4+
from typing import NoReturn
45

56
# pylint: disable=protected-access
67
from marshmallow import INCLUDE
78

89
from .._schema import NestedField
10+
from ..entities._builders.control_flow_node import LoopNode
911
from ..entities._component.component_factory import component_factory
1012
from ..entities._job.pipeline._load_component import pipeline_node_factory
1113
from ._schema.command import CommandSchema, DistributedSchema, ParallelSchema
@@ -20,6 +22,7 @@
2022
InternalBaseNode,
2123
InternalComponent,
2224
Parallel,
25+
Pipeline,
2326
Scope,
2427
Starlite,
2528
)
@@ -57,7 +60,14 @@ def _register_node(_type, node_cls, schema_cls):
5760
)
5861

5962

60-
def enable_internal_components_in_pipeline(*, force=False):
63+
def enable_internal_components_in_pipeline(*, force=False) -> NoReturn:
64+
"""Enable internal components in pipeline.
65+
66+
:keyword force: Whether to force re-enable internal components. Defaults to False.
67+
:type force: bool
68+
:return: No return value.
69+
:rtype: None
70+
"""
6171
if _registered and not force:
6272
return # already registered
6373

@@ -83,4 +93,8 @@ def enable_internal_components_in_pipeline(*, force=False):
8393
_register_node(NodeType.SCOPE_V2, Scope, ScopeSchema)
8494
_register_node(NodeType.HDI_V2, HDInsight, HDInsightSchema)
8595
# Ae365exepool and AetherBridge have been registered to InternalBaseNode
96+
97+
# allow using internal nodes in do-while loop
98+
LoopNode._extra_body_types = (Command, Pipeline)
99+
86100
_set_registered(True)

sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/environment.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
from typing import Dict, Optional, Union
88

99
from ..._utils.utils import load_yaml
10-
from ...constants._common import DefaultOpenEncoding, FILE_PREFIX
11-
from ...entities._validation import MutableValidationResult, _ValidationResultBuilder
10+
from ...constants._common import FILE_PREFIX, DefaultOpenEncoding
11+
from ...entities._validation import MutableValidationResult, ValidationResultBuilder
1212

1313

1414
class InternalEnvironment:
@@ -45,7 +45,7 @@ def _parse_file_path(value: str) -> str:
4545
def _validate_conda_section(
4646
self, base_path: Union[str, PathLike], skip_path_validation: bool
4747
) -> MutableValidationResult:
48-
validation_result = _ValidationResultBuilder.success()
48+
validation_result = ValidationResultBuilder.success()
4949
if not self.conda:
5050
return validation_result
5151
dependencies_field_names = {self.CONDA_DEPENDENCIES, self.CONDA_DEPENDENCIES_FILE, self.PIP_REQUIREMENTS_FILE}
@@ -74,7 +74,7 @@ def _validate_conda_section(
7474
def _validate_docker_section(
7575
self, base_path: Union[str, PathLike], skip_path_validation: bool
7676
) -> MutableValidationResult:
77-
validation_result = _ValidationResultBuilder.success()
77+
validation_result = ValidationResultBuilder.success()
7878
if not self.docker:
7979
return validation_result
8080
if not self.docker.get(self.BUILD) or not self.docker[self.BUILD].get(self.DOCKERFILE):
@@ -104,7 +104,7 @@ def validate(self, base_path: Union[str, PathLike], skip_path_validation: bool =
104104
:return: The validation result
105105
:rtype: MutableValidationResult
106106
"""
107-
validation_result = _ValidationResultBuilder.success()
107+
validation_result = ValidationResultBuilder.success()
108108
if self.os is not None and self.os not in {"Linux", "Windows", "linux", "windows"}:
109109
validation_result.append_error(
110110
yaml_path="os",

sdk/ml/azure-ai-ml/azure/ai/ml/_internal/entities/spark.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
class InternalSparkComponent(
1919
InternalComponent, ParameterizedSpark, SparkJobEntryMixin
20-
): # pylint: disable=too-many-instance-attributes
20+
): # pylint: disable=too-many-instance-attributes, too-many-ancestors
2121
"""Internal Spark Component
2222
This class is used to handle internal spark component.
2323
It can be loaded from internal spark component yaml or from rest object of an internal spark component.
@@ -91,13 +91,25 @@ def _create_schema_for_validation(cls, context) -> Union[PathAwareSchema, Schema
9191
return InternalSparkComponentSchema(context=context)
9292

9393
@property
94-
def environment(self):
94+
def environment(self) -> Union[str, Environment, dict]:
95+
"""Get the environment of the component.
96+
97+
:return: The environment of the component.
98+
:rtype: Union[str, Environment, dict]
99+
"""
95100
if isinstance(self._environment, Environment) and self._environment.image is None:
96101
return Environment(conda_file=self._environment.conda_file, image=DUMMY_IMAGE)
97102
return self._environment
98103

99104
@environment.setter
100105
def environment(self, value):
106+
"""Set the environment of the component.
107+
108+
:param value: The environment of the component.
109+
:type value: Union[str, Environment, dict]
110+
:return: No return
111+
:rtype: None
112+
"""
101113
if value is None or isinstance(value, (str, Environment)):
102114
self._environment = value
103115
elif isinstance(value, dict):
@@ -119,21 +131,45 @@ def environment(self, value):
119131
raise ValueError(f"Unsupported environment type: {type(value)}")
120132

121133
@property
122-
def jars(self):
134+
def jars(self) -> List[str]:
135+
"""Get the jars of the component.
136+
137+
:return: The jars of the component.
138+
:rtype: List[str]
139+
"""
123140
return self._jars
124141

125142
@jars.setter
126-
def jars(self, value):
143+
def jars(self, value: Union[str, List[str]]):
144+
"""Set the jars of the component.
145+
146+
:param value: The jars of the component.
147+
:type value: Union[str, List[str]]
148+
:return: No return
149+
:rtype: None
150+
"""
127151
if isinstance(value, str):
128152
value = [value]
129153
self._jars = value
130154

131155
@property
132-
def py_files(self):
156+
def py_files(self) -> List[str]:
157+
"""Get the py_files of the component.
158+
159+
:return: The py_files of the component.
160+
:rtype: List[str]
161+
"""
133162
return self._py_files
134163

135164
@py_files.setter
136165
def py_files(self, value):
166+
"""Set the py_files of the component.
167+
168+
:param value: The py_files of the component.
169+
:type value: Union[str, List[str]]
170+
:return: No return
171+
:rtype: None
172+
"""
137173
if isinstance(value, str):
138174
value = [value]
139175
self._py_files = value

0 commit comments

Comments
 (0)