Skip to content

Commit 23d56ff

Browse files
YingChen1996Ying Chen
andauthored
[V2 Datatransfer] Disable reference anoymous component for import/export and update built-in component id for Datatransfer job V2 in Pipeline (Azure#28932)
* add test * add more invalid test * update * update * update schema * update builtin component id and test * format * format * delete unnecesssary test * update test * disable refer to local component yaml * rm unused yaml file * update * update according to comment and black format * typo * pylint * black * update recording --------- Co-authored-by: Ying Chen <[email protected]>
1 parent 5f550cb commit 23d56ff

File tree

79 files changed

+6929
-2710
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+6929
-2710
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/_schema/component/data_transfer_component.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ class DataTransferCopyComponentSchema(DataTransferComponentSchemaMixin):
3838
values=NestedField(InputPortSchema),
3939
)
4040

41+
@validates("outputs")
42+
def outputs_key(self, value):
43+
outputs_count = len(value)
44+
if outputs_count != 1:
45+
msg = "Only support single output in {}, but there're {} outputs."
46+
raise ValidationError(
47+
message=msg.format(DataTransferTaskType.COPY_DATA, outputs_count), field_name="outputs"
48+
)
49+
4150

4251
class SinkSourceSchema(metaclass=PatchedSchemaMeta):
4352
type = StringTransformedEnum(

sdk/ml/azure-ai-ml/azure/ai/ml/_schema/pipeline/component_job.py

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,11 @@
1515
AnonymousParallelComponentSchema,
1616
AnonymousSparkComponentSchema,
1717
AnonymousDataTransferCopyComponentSchema,
18-
AnonymousDataTransferImportComponentSchema,
19-
AnonymousDataTransferExportComponentSchema,
2018
ComponentFileRefField,
2119
ImportComponentFileRefField,
2220
ParallelComponentFileRefField,
2321
SparkComponentFileRefField,
2422
DataTransferCopyComponentFileRefField,
25-
DataTransferImportComponentFileRefField,
26-
DataTransferExportComponentFileRefField,
2723
)
2824
from azure.ai.ml._schema.core.fields import ArmVersionedStr, NestedField, RegistryStr, UnionField
2925
from azure.ai.ml._schema.core.schema import PathAwareSchema
@@ -418,16 +414,8 @@ def resolve_inputs_outputs(self, job, **kwargs):
418414

419415
class DataTransferImportSchema(BaseNodeSchema):
420416
# pylint: disable=unused-argument
421-
component = TypeSensitiveUnionField(
422-
{
423-
NodeType.DATA_TRANSFER: [
424-
# inline component or component file reference starting with FILE prefix
425-
NestedField(AnonymousDataTransferImportComponentSchema, unknown=INCLUDE),
426-
# component file reference
427-
DataTransferImportComponentFileRefField(),
428-
],
429-
},
430-
plain_union_fields=[
417+
component = UnionField(
418+
[
431419
# for registry type assets
432420
RegistryStr(),
433421
# existing component
@@ -477,16 +465,8 @@ def resolve_inputs_outputs(self, job, **kwargs):
477465

478466
class DataTransferExportSchema(BaseNodeSchema):
479467
# pylint: disable=unused-argument
480-
component = TypeSensitiveUnionField(
481-
{
482-
NodeType.DATA_TRANSFER: [
483-
# inline component or component file reference starting with FILE prefix
484-
NestedField(AnonymousDataTransferExportComponentSchema, unknown=INCLUDE),
485-
# component file reference
486-
DataTransferExportComponentFileRefField(),
487-
],
488-
},
489-
plain_union_fields=[
468+
component = UnionField(
469+
[
490470
# for registry type assets
491471
RegistryStr(),
492472
# existing component

sdk/ml/azure-ai-ml/azure/ai/ml/_schema/pipeline/pipeline_component.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,6 @@ def PipelineJobsField():
8484
NestedField(SparkSchema, unknown=INCLUDE),
8585
NestedField(PipelineSparkJobSchema),
8686
],
87-
NodeType.DATA_TRANSFER: [
88-
NestedField(DataTransferCopySchema, unknown=INCLUDE),
89-
NestedField(DataTransferImportSchema, unknown=INCLUDE),
90-
NestedField(DataTransferExportSchema, unknown=INCLUDE),
91-
NestedField(PipelineDataTransferCopyJobSchema),
92-
NestedField(PipelineDataTransferImportJobSchema),
93-
NestedField(PipelineDataTransferExportJobSchema),
94-
],
9587
}
9688

9789
# Note: the private node types only available when private preview flag opened before init of pipeline job
@@ -101,6 +93,29 @@ def PipelineJobsField():
10193
pipeline_enable_job_type[ControlFlowType.IF_ELSE] = [NestedField(ConditionNodeSchema, unknown=INCLUDE)]
10294
pipeline_enable_job_type[ControlFlowType.PARALLEL_FOR] = [NestedField(ParallelForSchema, unknown=INCLUDE)]
10395

96+
# Todo: Put data_transfer logic to the last to avoid error message conflict, open a item to track:
97+
# https://msdata.visualstudio.com/Vienna/_workitems/edit/2244262/
98+
pipeline_enable_job_type[NodeType.DATA_TRANSFER] = [
99+
TypeSensitiveUnionField(
100+
{
101+
DataTransferTaskType.COPY_DATA: [
102+
NestedField(DataTransferCopySchema, unknown=INCLUDE),
103+
NestedField(PipelineDataTransferCopyJobSchema),
104+
],
105+
DataTransferTaskType.IMPORT_DATA: [
106+
NestedField(DataTransferImportSchema, unknown=INCLUDE),
107+
NestedField(PipelineDataTransferImportJobSchema),
108+
],
109+
DataTransferTaskType.EXPORT_DATA: [
110+
NestedField(DataTransferExportSchema, unknown=INCLUDE),
111+
NestedField(PipelineDataTransferExportJobSchema),
112+
],
113+
},
114+
type_field_name="task",
115+
unknown=INCLUDE,
116+
)
117+
]
118+
104119
pipeline_job_field = fields.Dict(
105120
keys=NodeNameStr(),
106121
values=TypeSensitiveUnionField(pipeline_enable_job_type),
@@ -157,11 +172,11 @@ def _post_load_pipeline_jobs(context, data: dict) -> dict:
157172
context=context,
158173
pipeline_job_dict=data,
159174
)
160-
if not (
161-
job_instance.type == NodeType.DATA_TRANSFER and job_instance.task != DataTransferTaskType.COPY_DATA
162-
):
175+
if job_instance.type == NodeType.DATA_TRANSFER and job_instance.task != DataTransferTaskType.COPY_DATA:
176+
job_instance._source = ComponentSource.BUILTIN
177+
else:
163178
job_instance.component._source = ComponentSource.YAML_JOB
164-
job_instance._source = job_instance.component._source
179+
job_instance._source = job_instance.component._source
165180
jobs[key] = job_instance
166181
# update job instance name to key
167182
job_instance.name = key

sdk/ml/azure-ai-ml/azure/ai/ml/_schema/pipeline/pipeline_datatransfer_job.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from azure.ai.ml._schema.core.fields import NestedField, UnionField
1313
from azure.ai.ml._schema.job.input_output_entry import OutputSchema
14+
from azure.ai.ml._schema.pipeline.pipeline_job_io import OutputBindingStr
1415
from azure.ai.ml._schema.job.data_transfer_job import (
1516
DataTransferCopyJobSchema,
1617
DataTransferImportJobSchema,
@@ -23,7 +24,7 @@
2324
class PipelineDataTransferCopyJobSchema(DataTransferCopyJobSchema):
2425
outputs = fields.Dict(
2526
keys=fields.Str(),
26-
values=UnionField([NestedField(OutputSchema), fields.Str()], allow_none=True),
27+
values=UnionField([NestedField(OutputSchema), OutputBindingStr], allow_none=True),
2728
)
2829

2930
@post_load
@@ -36,7 +37,7 @@ def make(self, data: Any, **kwargs: Any):
3637
class PipelineDataTransferImportJobSchema(DataTransferImportJobSchema):
3738
outputs = fields.Dict(
3839
keys=fields.Str(),
39-
values=UnionField([NestedField(OutputSchema)], allow_none=True),
40+
values=UnionField([NestedField(OutputSchema), OutputBindingStr], allow_none=True),
4041
)
4142

4243
@post_load

sdk/ml/azure-ai-ml/azure/ai/ml/constants/_component.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,9 @@ class ExternalDataType(object):
5454

5555

5656
class DataTransferBuiltinComponentUri(object):
57-
# todo: need update
58-
IMPORT_DATABASE = "azureml://registries/import_database"
59-
IMPORT_FILE_SYSTEM = "azureml://registries/import_file_system"
60-
EXPORT_DATABASE = "azureml://registries/export_database"
61-
EXPORT_FILE_SYSTEM = "azureml://registries/export_file_system"
57+
IMPORT_DATABASE = "azureml://registries/azureml-dev/components/import_data_database/versions/0.0.1"
58+
IMPORT_FILE_SYSTEM = "azureml://registries/azureml-dev/components/import_data_file_system/versions/0.0.1"
59+
EXPORT_DATABASE = "azureml://registries/azureml-dev/components/export_data_database/versions/0.0.1"
6260

6361

6462
class ComponentSource:

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_builders/data_transfer.py

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
DataTransferImportJob,
3030
DataTransferExportJob,
3131
)
32-
from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY
32+
from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, AssetTypes
3333
from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
3434
from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem
3535

@@ -47,25 +47,34 @@
4747

4848

4949
def _build_source_sink(io_dict: Union[Dict, Database, FileSystem]):
50-
if not io_dict:
50+
if io_dict is None:
5151
return io_dict
52-
io_dict = io_dict or {}
5352
if isinstance(io_dict, (Database, FileSystem)):
5453
component_io = io_dict
5554
else:
56-
data_type = io_dict.get("type", None)
57-
if data_type == ExternalDataType.DATABASE:
58-
component_io = Database(**io_dict)
59-
elif data_type == ExternalDataType.FILE_SYSTEM:
60-
component_io = FileSystem(**io_dict)
55+
if isinstance(io_dict, dict):
56+
data_type = io_dict.get("type", None)
57+
if data_type == ExternalDataType.DATABASE:
58+
component_io = Database(**io_dict)
59+
elif data_type == ExternalDataType.FILE_SYSTEM:
60+
component_io = FileSystem(**io_dict)
61+
else:
62+
msg = "Type in source or sink only support {} and {}, currently got {}."
63+
raise ValidationException(
64+
message=msg.format(ExternalDataType.DATABASE, ExternalDataType.FILE_SYSTEM, data_type),
65+
no_personal_data_message=msg.format(
66+
ExternalDataType.DATABASE, ExternalDataType.FILE_SYSTEM, "data_type"
67+
),
68+
target=ErrorTarget.DATA_TRANSFER_JOB,
69+
error_category=ErrorCategory.USER_ERROR,
70+
error_type=ValidationErrorType.INVALID_VALUE,
71+
)
6172
else:
62-
msg = "Source or sink only support type {} and {}, currently got {}."
73+
msg = "Source or sink only support dict, Database and FileSystem"
6374
raise ValidationException(
64-
message=msg.format(ExternalDataType.DATABASE, ExternalDataType.FILE_SYSTEM, data_type),
65-
no_personal_data_message=msg.format(
66-
ExternalDataType.DATABASE, ExternalDataType.FILE_SYSTEM, "data_type"
67-
),
68-
target=ErrorTarget.COMPONENT,
75+
message=msg,
76+
no_personal_data_message=msg,
77+
target=ErrorTarget.DATA_TRANSFER_JOB,
6978
error_category=ErrorCategory.USER_ERROR,
7079
error_type=ValidationErrorType.INVALID_VALUE,
7180
)
@@ -356,6 +365,20 @@ def _customized_validate(self):
356365
yaml_path="outputs.sink",
357366
message="Outputs field only support one output called sink in import task",
358367
)
368+
if "sink" in self.outputs and isinstance(self.outputs["sink"]._data, Output):
369+
sink_output = self.outputs["sink"]._data
370+
if (self.source.type == ExternalDataType.DATABASE and sink_output.type != AssetTypes.MLTABLE) or (
371+
self.source.type == ExternalDataType.FILE_SYSTEM and sink_output.type != AssetTypes.URI_FOLDER
372+
):
373+
result.append_error(
374+
yaml_path="outputs.sink.type",
375+
message="Outputs field only support type {} for {} and {} for {}".format(
376+
AssetTypes.MLTABLE,
377+
ExternalDataType.DATABASE,
378+
AssetTypes.URI_FOLDER,
379+
ExternalDataType.FILE_SYSTEM,
380+
),
381+
)
359382
return result
360383

361384
def _to_rest_object(self, **kwargs) -> dict:
@@ -493,6 +516,21 @@ def _customized_validate(self):
493516
yaml_path="inputs.source",
494517
message="Inputs field only support one input called source in export task",
495518
)
519+
if "source" in self.inputs and isinstance(self.inputs["source"]._data, Input):
520+
source_input = self.inputs["source"]._data
521+
if (self.sink.type == ExternalDataType.DATABASE and source_input.type != AssetTypes.URI_FILE) or (
522+
self.sink.type == ExternalDataType.FILE_SYSTEM and source_input.type != AssetTypes.URI_FOLDER
523+
):
524+
result.append_error(
525+
yaml_path="inputs.source.type",
526+
message="Inputs field only support type {} for {} and {} for {}".format(
527+
AssetTypes.URI_FILE,
528+
ExternalDataType.DATABASE,
529+
AssetTypes.URI_FOLDER,
530+
ExternalDataType.FILE_SYSTEM,
531+
),
532+
)
533+
496534
return result
497535

498536
def _to_rest_object(self, **kwargs) -> dict:

0 commit comments

Comments
 (0)