Skip to content

Commit 6ccc3c9

Browse files
authored
[ML-Pipelines] Fix data transfer review feedback (Azure#29384)
* review feedback: update external_data * remove unnecessary task type * expose builder classes * expose component classes * fix pylint * fix tests * fix tests * update * fix remaining tests * update * fix black --------- Co-authored-by: Clement Wang <[email protected]>
1 parent 7957864 commit 6ccc3c9

File tree

13 files changed

+681
-225
lines changed

13 files changed

+681
-225
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/_schema/job/input_output_entry.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ class DatabaseSchema(metaclass=PatchedSchemaMeta):
205205
def make(self, data, **kwargs):
206206
from azure.ai.ml.data_transfer import Database
207207

208+
data.pop("type", None)
208209
return Database(**data)
209210

210211
@pre_dump
@@ -231,6 +232,7 @@ class FileSystemSchema(metaclass=PatchedSchemaMeta):
231232
def make(self, data, **kwargs):
232233
from azure.ai.ml.data_transfer import FileSystem
233234

235+
data.pop("type", None)
234236
return FileSystem(**data)
235237

236238
@pre_dump

sdk/ml/azure-ai-ml/azure/ai/ml/data_transfer/__init__.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,32 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# ---------------------------------------------------------
44
from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem
5-
from azure.ai.ml.entities._builders.data_transfer_func import copy_data, import_data, export_data
5+
from azure.ai.ml.entities._builders.data_transfer_func import (
6+
copy_data,
7+
import_data,
8+
export_data,
9+
)
10+
from azure.ai.ml.entities._builders.data_transfer import (
11+
DataTransferCopy,
12+
DataTransferImport,
13+
DataTransferExport,
14+
)
15+
from azure.ai.ml.entities._component.datatransfer_component import (
16+
DataTransferCopyComponent,
17+
DataTransferImportComponent,
18+
DataTransferExportComponent,
19+
)
620

7-
__all__ = ["import_data", "export_data", "copy_data", "Database", "FileSystem"]
21+
__all__ = [
22+
"import_data",
23+
"export_data",
24+
"copy_data",
25+
"Database",
26+
"FileSystem",
27+
"DataTransferCopy",
28+
"DataTransferImport",
29+
"DataTransferExport",
30+
"DataTransferCopyComponent",
31+
"DataTransferImportComponent",
32+
"DataTransferExportComponent",
33+
]

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/data_transfer.py

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
DataTransferImportJobSchema,
1616
DataTransferExportJobSchema,
1717
)
18-
from azure.ai.ml.constants._component import NodeType, ExternalDataType, DataTransferTaskType
18+
from azure.ai.ml.constants._component import (
19+
NodeType,
20+
ExternalDataType,
21+
DataTransferTaskType,
22+
)
1923
from azure.ai.ml.entities._component.datatransfer_component import (
2024
DataTransferCopyComponent,
2125
DataTransferImportComponent,
@@ -30,7 +34,12 @@
3034
DataTransferExportJob,
3135
)
3236
from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, AssetTypes
33-
from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
37+
from azure.ai.ml.exceptions import (
38+
ErrorCategory,
39+
ErrorTarget,
40+
ValidationErrorType,
41+
ValidationException,
42+
)
3443
from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem
3544

3645

@@ -53,17 +62,23 @@ def _build_source_sink(io_dict: Union[Dict, Database, FileSystem]):
5362
component_io = io_dict
5463
else:
5564
if isinstance(io_dict, dict):
56-
data_type = io_dict.get("type", None)
65+
data_type = io_dict.pop("type", None)
5766
if data_type == ExternalDataType.DATABASE:
5867
component_io = Database(**io_dict)
5968
elif data_type == ExternalDataType.FILE_SYSTEM:
6069
component_io = FileSystem(**io_dict)
6170
else:
6271
msg = "Type in source or sink only support {} and {}, currently got {}."
6372
raise ValidationException(
64-
message=msg.format(ExternalDataType.DATABASE, ExternalDataType.FILE_SYSTEM, data_type),
73+
message=msg.format(
74+
ExternalDataType.DATABASE,
75+
ExternalDataType.FILE_SYSTEM,
76+
data_type,
77+
),
6578
no_personal_data_message=msg.format(
66-
ExternalDataType.DATABASE, ExternalDataType.FILE_SYSTEM, "data_type"
79+
ExternalDataType.DATABASE,
80+
ExternalDataType.FILE_SYSTEM,
81+
"data_type",
6782
),
6883
target=ErrorTarget.DATA_TRANSFER_JOB,
6984
error_category=ErrorCategory.USER_ERROR,
@@ -157,8 +172,6 @@ class DataTransferCopy(DataTransfer):
157172
:type experiment_name: str
158173
:param compute: The compute target the job runs on.
159174
:type compute: str
160-
:param task: task type in data transfer component, possible value is "copy_data".
161-
:type task: str
162175
:param data_copy_mode: data copy mode in copy task, possible value is "merge_with_overwrite", "fail_if_conflict".
163176
:type data_copy_mode: str
164177
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferCopy cannot be successfully validated.
@@ -173,7 +186,6 @@ def __init__(
173186
compute: Optional[str] = None,
174187
inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None,
175188
outputs: Optional[Dict[str, Union[str, Output]]] = None,
176-
task: Optional[str] = DataTransferTaskType.COPY_DATA,
177189
data_copy_mode: Optional[str] = None,
178190
**kwargs,
179191
):
@@ -188,7 +200,7 @@ def __init__(
188200
)
189201
# init mark for _AttrDict
190202
self._init = True
191-
self.task = task
203+
self.task = DataTransferTaskType.COPY_DATA
192204
self.data_copy_mode = data_copy_mode
193205
is_component = isinstance(component, DataTransferCopyComponent)
194206
if is_component:
@@ -214,7 +226,10 @@ def _picked_fields_from_dict_to_rest_object(cls) -> List[str]:
214226

215227
def _to_rest_object(self, **kwargs) -> dict:
216228
rest_obj = super()._to_rest_object(**kwargs)
217-
for key, value in {"componentId": self._get_component_id(), "data_copy_mode": self.data_copy_mode}.items():
229+
for key, value in {
230+
"componentId": self._get_component_id(),
231+
"data_copy_mode": self.data_copy_mode,
232+
}.items():
218233
if value is not None:
219234
rest_obj[key] = value
220235
return convert_ordered_dict_to_dict(rest_obj)
@@ -240,7 +255,6 @@ def _to_job(self) -> DataTransferCopyJob:
240255
outputs=self._job_outputs,
241256
services=self.services,
242257
compute=self.compute,
243-
task=self.task,
244258
data_copy_mode=self.data_copy_mode,
245259
)
246260

@@ -301,8 +315,6 @@ class DataTransferImport(DataTransfer):
301315
:type experiment_name: str
302316
:param compute: The compute target the job runs on.
303317
:type compute: str
304-
:param task: task type in data transfer component, possible value is "import_data".
305-
:type task: str
306318
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferImport cannot be successfully validated.
307319
Details will be provided in the error message.
308320
"""
@@ -315,7 +327,6 @@ def __init__(
315327
compute: Optional[str] = None,
316328
source: Optional[Union[Dict, Database, FileSystem]] = None,
317329
outputs: Optional[Dict[str, Union[str, Output]]] = None,
318-
task: Optional[str] = DataTransferTaskType.IMPORT_DATA,
319330
**kwargs,
320331
):
321332
# validate init params are valid type
@@ -328,7 +339,7 @@ def __init__(
328339
)
329340
# init mark for _AttrDict
330341
self._init = True
331-
self.task = task
342+
self.task = DataTransferTaskType.IMPORT_DATA
332343
is_component = isinstance(component, DataTransferImportComponent)
333344
if is_component:
334345
self.task = component.task or self.task
@@ -409,7 +420,6 @@ def _to_job(self) -> DataTransferImportJob:
409420
outputs=self._job_outputs,
410421
services=self.services,
411422
compute=self.compute,
412-
task=self.task,
413423
)
414424

415425

@@ -438,8 +448,6 @@ class DataTransferExport(DataTransfer):
438448
:type experiment_name: str
439449
:param compute: The compute target the job runs on.
440450
:type compute: str
441-
:param task: task type in data transfer component, possible value is "export_data".
442-
:type task: str
443451
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataTransferExport cannot be successfully validated.
444452
Details will be provided in the error message.
445453
"""
@@ -452,7 +460,6 @@ def __init__(
452460
compute: Optional[str] = None,
453461
sink: Optional[Union[Dict, Database, FileSystem]] = None,
454462
inputs: Optional[Dict[str, Union[NodeOutput, Input, str]]] = None,
455-
task: Optional[str] = DataTransferTaskType.EXPORT_DATA,
456463
**kwargs,
457464
):
458465
# validate init params are valid type
@@ -465,7 +472,7 @@ def __init__(
465472
)
466473
# init mark for _AttrDict
467474
self._init = True
468-
self.task = task
475+
self.task = DataTransferTaskType.EXPORT_DATA
469476
is_component = isinstance(component, DataTransferExportComponent)
470477
if is_component:
471478
self.task = component.task or self.task
@@ -560,5 +567,4 @@ def _to_job(self) -> DataTransferExportJob:
560567
inputs=self._job_inputs,
561568
services=self.services,
562569
compute=self.compute,
563-
task=self.task,
564570
)

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/data_transfer_func.py

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,21 @@
1313
ComponentSource,
1414
ExternalDataType,
1515
DataTransferBuiltinComponentUri,
16-
DataTransferTaskType,
1716
)
1817
from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem
1918
from azure.ai.ml.entities._inputs_outputs import Output, Input
2019
from azure.ai.ml.entities._job.pipeline._io import PipelineInput, NodeOutput
2120
from azure.ai.ml.entities._builders.base_node import pipeline_node_decorator
22-
from azure.ai.ml.entities._job.pipeline._component_translatable import ComponentTranslatableMixin
21+
from azure.ai.ml.entities._job.pipeline._component_translatable import (
22+
ComponentTranslatableMixin,
23+
)
2324
from azure.ai.ml.exceptions import ErrorTarget, ValidationErrorType, ValidationException
24-
from .data_transfer import DataTransferCopy, DataTransferImport, DataTransferExport, _build_source_sink
25+
from .data_transfer import (
26+
DataTransferCopy,
27+
DataTransferImport,
28+
DataTransferExport,
29+
_build_source_sink,
30+
)
2531

2632

2733
SUPPORTED_INPUTS = [
@@ -122,7 +128,6 @@ def copy_data(
122128
inputs: Optional[Dict] = None,
123129
outputs: Optional[Dict] = None,
124130
is_deterministic: bool = True,
125-
task: Optional[str] = DataTransferTaskType.COPY_DATA,
126131
data_copy_mode: Optional[str] = None,
127132
**kwargs,
128133
) -> DataTransferCopy:
@@ -150,8 +155,6 @@ def copy_data(
150155
In this case, this step will not use any compute resource.
151156
Default to be True, specify is_deterministic=False if you would like to avoid such reuse behavior.
152157
:type is_deterministic: bool
153-
:param task: task type in data transfer component, possible value is "copy_data".
154-
:type task: str
155158
:param data_copy_mode: data copy mode in copy task, possible value is "merge_with_overwrite", "fail_if_conflict".
156159
:type data_copy_mode: str
157160
"""
@@ -170,7 +173,6 @@ def copy_data(
170173
description=description,
171174
inputs=component_inputs,
172175
outputs=component_outputs,
173-
task=task,
174176
data_copy_mode=data_copy_mode,
175177
_source=ComponentSource.BUILDER,
176178
is_deterministic=is_deterministic,
@@ -186,7 +188,6 @@ def copy_data(
186188
compute=compute,
187189
inputs=job_inputs,
188190
outputs=job_outputs,
189-
task=task,
190191
data_copy_mode=data_copy_mode,
191192
**kwargs,
192193
)
@@ -204,7 +205,6 @@ def import_data(
204205
compute: Optional[str] = None,
205206
source: Optional[Union[Dict, Database, FileSystem]] = None,
206207
outputs: Optional[Dict] = None,
207-
task: Optional[str] = DataTransferTaskType.IMPORT_DATA,
208208
**kwargs,
209209
) -> DataTransferImport:
210210
"""Create a DataTransferImport object which can be used inside dsl.pipeline.
@@ -226,8 +226,6 @@ def import_data(
226226
:param outputs: Mapping of outputs data bindings used in the job, default will be an output port with key "sink"
227227
and type "mltable".
228228
:type outputs: dict
229-
:param task: task type in data transfer component, possible value is "copy_data".
230-
:type task: str
231229
"""
232230
source = _build_source_sink(source)
233231
outputs = outputs or {"sink": Output(type=AssetTypes.MLTABLE)}
@@ -253,7 +251,6 @@ def import_data(
253251
compute=compute,
254252
source=source,
255253
outputs=job_outputs,
256-
task=task,
257254
**kwargs,
258255
)
259256
if update_source:
@@ -273,7 +270,6 @@ def export_data(
273270
compute: Optional[str] = None,
274271
sink: Optional[Union[Dict, Database, FileSystem]] = None,
275272
inputs: Optional[Dict] = None,
276-
task: Optional[str] = DataTransferTaskType.EXPORT_DATA,
277273
**kwargs,
278274
) -> DataTransferExport:
279275
"""Create a DataTransferExport object which can be used inside dsl.pipeline.
@@ -294,8 +290,6 @@ def export_data(
294290
:type sink: Union[Dict, Database, FileSystem]
295291
:param inputs: Mapping of inputs data bindings used in the job.
296292
:type inputs: dict
297-
:param task: task type in data transfer component, possible value is "copy_data".
298-
:type task: str
299293
"""
300294
sink = _build_source_sink(sink)
301295
_, job_inputs = _parse_inputs_outputs(inputs, parse_func=_parse_input)
@@ -326,7 +320,6 @@ def export_data(
326320
compute=compute,
327321
sink=sink,
328322
inputs=job_inputs,
329-
task=task,
330323
**kwargs,
331324
)
332325
if update_source:

0 commit comments

Comments
 (0)