Skip to content

Commit c7fee36

Browse files
authored
Renyix/data import sdkv2 (Azure#29014)
* Add azureml data import verb sdk support * Create data import pipeline job * Update doc string * data import - add unit tests * data import - skip serverless compute validation * data import - fix unit tests * data import - fix DataImportSchema import build error * data import - remove used JobOperations to fix pylint build check * add __init__ to fix build checks * data import - fix line too long build error * data import - generate recording files * Fix Run Black build check error * Refine data import job submission * Add back some data import parameters * Mark data import classes as experimental * Resolve code review comments * Resolve code review comments * Remove unused import to fix pyint * data import - update e2e test recording file * Add back @experimental for DataImport class and disable asset_name filter * Update e2e test recording file * Fix unused 'name' pyint build check
1 parent 4b6a811 commit c7fee36

File tree

19 files changed

+497
-1
lines changed

19 files changed

+497
-1
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
load_component,
2323
load_compute,
2424
load_data,
25+
load_data_import,
2526
load_datastore,
2627
load_environment,
2728
load_job,
@@ -54,6 +55,7 @@
5455
"load_component",
5556
"load_compute",
5657
"load_data",
58+
"load_data_import",
5759
"load_datastore",
5860
"load_model",
5961
"load_environment",

sdk/ml/azure-ai-ml/azure/ai/ml/_ml_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,7 @@ def __init__(
472472
requests_pipeline=self._requests_pipeline,
473473
**ops_kwargs,
474474
)
475+
self._data._job_operation = self._jobs
475476
self._operation_container.add(AzureMLResourceType.JOB, self._jobs)
476477
self._schedules = ScheduleOperations(
477478
self._operation_scope,

sdk/ml/azure-ai-ml/azure/ai/ml/_schema/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
__path__ = __import__("pkgutil").extend_path(__path__, __name__)
66

77
from ._sweep import SweepJobSchema
8+
from ._data_import import DataImportSchema
89
from .assets.code_asset import AnonymousCodeAssetSchema, CodeAssetSchema
910
from .assets.data import DataSchema
1011
from .assets.environment import AnonymousEnvironmentSchema, EnvironmentSchema
@@ -53,4 +54,5 @@
5354
"ExperimentalField",
5455
"RegistryStr",
5556
"WorkspaceAssetReferenceSchema",
57+
"DataImportSchema",
5658
]
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# ---------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# ---------------------------------------------------------
4+
5+
__path__ = __import__("pkgutil").extend_path(__path__, __name__)
6+
7+
from .data_import import DataImportSchema
8+
9+
__all__ = ["DataImportSchema"]
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# ---------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# ---------------------------------------------------------
4+
5+
# pylint: disable=unused-argument,no-self-use
6+
7+
from azure.ai.ml._schema.core.fields import NestedField
8+
from azure.ai.ml._schema.job.input_output_entry import DatabaseSchema, FileSystemSchema
9+
from azure.ai.ml._utils._experimental import experimental
10+
from ..core.fields import UnionField
11+
from ..assets.data import DataSchema
12+
13+
14+
@experimental
15+
class DataImportSchema(DataSchema):
16+
source = UnionField([NestedField(DatabaseSchema), NestedField(FileSystemSchema)], required=True, allow_none=False)

sdk/ml/azure-ai-ml/azure/ai/ml/entities/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
)
5252
from ._datastore.adls_gen1 import AzureDataLakeGen1Datastore
5353
from ._datastore.azure_storage import AzureBlobDatastore, AzureDataLakeGen2Datastore, AzureFileDatastore
54+
from ._data_import.data_import import DataImport
5455
from ._datastore.datastore import Datastore
5556
from ._deployment.batch_deployment import BatchDeployment
5657
from ._deployment.batch_job import BatchJob
@@ -209,6 +210,7 @@
209210
"PrivateEndpointDestination",
210211
"EndpointConnection",
211212
"CustomerManagedKey",
213+
"DataImport",
212214
"Datastore",
213215
"AzureDataLakeGen1Datastore",
214216
"AzureBlobDatastore",
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# ---------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# ---------------------------------------------------------
4+
5+
__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# ---------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# ---------------------------------------------------------
4+
5+
# pylint: disable=no-member
6+
7+
from os import PathLike
8+
from pathlib import Path
9+
from typing import Dict, Optional, Union
10+
from azure.ai.ml._schema import DataImportSchema
11+
from azure.ai.ml._utils._experimental import experimental
12+
from azure.ai.ml.constants._common import BASE_PATH_CONTEXT_KEY, PARAMS_OVERRIDE_KEY
13+
from azure.ai.ml.entities._assets import Data
14+
from azure.ai.ml.entities._inputs_outputs.external_data import Database, FileSystem
15+
from azure.ai.ml.entities._util import load_from_dict
16+
17+
18+
@experimental
19+
class DataImport(Data):
20+
"""Data asset with a creating data import job.
21+
22+
:param name: Name of the asset.
23+
:type name: str
24+
:param path: The path to the asset being created by data import job.
25+
:type path: str
26+
:param source: The source of the asset data being copied from.
27+
:type source: Union[Database, FileSystem]
28+
:param version: Version of the resource.
29+
:type version: str
30+
:param description: Description of the resource.
31+
:type description: str
32+
:param tags: Tag dictionary. Tags can be added, removed, and updated.
33+
:type tags: dict[str, str]
34+
:param properties: The asset property dictionary.
35+
:type properties: dict[str, str]
36+
:param kwargs: A dictionary of additional configuration parameters.
37+
:type kwargs: dict
38+
"""
39+
40+
def __init__(
41+
self,
42+
*,
43+
name: str,
44+
path: str,
45+
source: Union[Database, FileSystem],
46+
version: Optional[str] = None,
47+
description: Optional[str] = None,
48+
tags: Optional[Dict] = None,
49+
properties: Optional[Dict] = None,
50+
**kwargs,
51+
):
52+
super().__init__(
53+
name=name,
54+
version=version,
55+
description=description,
56+
tags=tags,
57+
properties=properties,
58+
path=path,
59+
**kwargs,
60+
)
61+
self.source = source
62+
63+
@classmethod
64+
def _load(
65+
cls,
66+
data: Optional[Dict] = None,
67+
yaml_path: Optional[Union[PathLike, str]] = None,
68+
params_override: Optional[list] = None,
69+
**kwargs,
70+
) -> "DataImport":
71+
data = data or {}
72+
params_override = params_override or []
73+
context = {
74+
BASE_PATH_CONTEXT_KEY: Path(yaml_path).parent if yaml_path else Path("./"),
75+
PARAMS_OVERRIDE_KEY: params_override,
76+
}
77+
data_import = DataImport._load_from_dict(yaml_data=data, context=context, **kwargs)
78+
79+
return data_import
80+
81+
@classmethod
82+
def _load_from_dict(cls, yaml_data: Dict, context: Dict, **kwargs) -> "DataImport":
83+
return DataImport(**load_from_dict(DataImportSchema, yaml_data, context, **kwargs))

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_load_functions.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from azure.ai.ml.entities._component.parallel_component import ParallelComponent
2020
from azure.ai.ml.entities._component.pipeline_component import PipelineComponent
2121
from azure.ai.ml.entities._compute.compute import Compute
22+
from azure.ai.ml.entities._data_import.data_import import DataImport
2223
from azure.ai.ml.entities._datastore.datastore import Datastore
2324
from azure.ai.ml.entities._deployment.batch_deployment import BatchDeployment
2425
from azure.ai.ml.entities._deployment.online_deployment import OnlineDeployment
@@ -452,6 +453,37 @@ def load_data(
452453
return load_common(Data, source, relative_origin, **kwargs)
453454

454455

456+
def load_data_import(
457+
source: Union[str, PathLike, IO[AnyStr]],
458+
*,
459+
relative_origin: Optional[str] = None,
460+
**kwargs,
461+
) -> DataImport:
462+
"""Construct a data object from yaml file.
463+
464+
:param source: The local yaml source of a data object. Must be either a
465+
path to a local file, or an already-open file.
466+
If the source is a path, it will be open and read.
467+
An exception is raised if the file does not exist.
468+
If the source is an open file, the file will be read directly,
469+
and an exception is raised if the file is not readable.
470+
:type source: Union[PathLike, str, io.TextIOWrapper]
471+
:param relative_origin: The origin to be used when deducing
472+
the relative locations of files referenced in the parsed yaml.
473+
Defaults to the inputted source's directory if it is a file or file path input.
474+
Defaults to "./" if the source is a stream input with no name value.
475+
:type relative_origin: str
476+
:param params_override: Fields to overwrite on top of the yaml file.
477+
Format is [{"field1": "value1"}, {"field2": "value2"}]
478+
:type params_override: List[Dict]
479+
:raises ~azure.ai.ml.exceptions.ValidationException: Raised if DataImport cannot be successfully validated.
480+
Details will be provided in the error message.
481+
:return: Constructed data_import object.
482+
:rtype: DataImport
483+
"""
484+
return load_common(DataImport, source, relative_origin, **kwargs)
485+
486+
455487
def load_environment(
456488
source: Union[str, PathLike, IO[AnyStr]],
457489
*,

sdk/ml/azure-ai-ml/azure/ai/ml/operations/_data_operations.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,15 @@
66

77
import os
88
from pathlib import Path
9-
from typing import Dict, List, Optional, Union
9+
from typing import Dict, List, Optional, Union, Iterable
1010

1111
from marshmallow.exceptions import ValidationError as SchemaValidationError
1212

13+
from azure.ai.ml._utils._experimental import experimental
14+
from azure.ai.ml.entities import Job, PipelineJob, PipelineJobSettings
15+
from azure.ai.ml.data_transfer import import_data as import_data_func
16+
from azure.ai.ml.entities._inputs_outputs import Output
17+
from azure.ai.ml.entities._inputs_outputs.external_data import Database
1318
from azure.ai.ml._artifacts._artifact_utilities import _check_and_upload_path
1419
from azure.ai.ml._artifacts._constants import (
1520
ASSET_PATH_ERROR,
@@ -52,6 +57,7 @@
5257
AzureMLResourceType,
5358
)
5459
from azure.ai.ml.entities._assets import Data, WorkspaceAssetReference
60+
from azure.ai.ml.entities._data_import.data_import import DataImport
5561
from azure.ai.ml.entities._data.mltable_metadata import MLTableMetadata
5662
from azure.ai.ml.exceptions import (
5763
AssetPathException,
@@ -347,6 +353,64 @@ def create_or_update(self, data: Data) -> Data:
347353
)
348354
raise ex
349355

356+
@experimental
357+
def import_data(self, data_import: DataImport) -> Job:
358+
"""Returns the data import job that is creating the data asset.
359+
360+
:param data_import: DataImport object.
361+
:type data_import: azure.ai.ml.entities.DataImport
362+
:return: data import job object.
363+
:rtype: ~azure.ai.ml.entities.Job
364+
"""
365+
366+
experiment_name = "data_import_" + data_import.name
367+
data_import.type = AssetTypes.MLTABLE if isinstance(data_import.source, Database) else AssetTypes.URI_FOLDER
368+
if "{name}" not in data_import.path:
369+
data_import.path = data_import.path.rstrip("/") + "/{name}"
370+
import_job = import_data_func(
371+
description=data_import.description or experiment_name,
372+
display_name=experiment_name,
373+
experiment_name=experiment_name,
374+
compute="serverless",
375+
source=data_import.source,
376+
outputs={
377+
"sink": Output(
378+
type=data_import.type, path=data_import.path, name=data_import.name, version=data_import.version
379+
)
380+
},
381+
)
382+
import_pipeline = PipelineJob(
383+
description=data_import.description or experiment_name,
384+
tags=data_import.tags,
385+
display_name=experiment_name,
386+
experiment_name=experiment_name,
387+
properties=data_import.properties or {},
388+
settings=PipelineJobSettings(force_rerun=True),
389+
jobs={experiment_name: import_job},
390+
)
391+
import_pipeline.properties["azureml.materializationAssetName"] = data_import.name
392+
return self._job_operation.create_or_update(job=import_pipeline, skip_validation=True)
393+
394+
@experimental
395+
def show_materialization_status(
396+
self,
397+
name: str,
398+
*,
399+
list_view_type: ListViewType = ListViewType.ACTIVE_ONLY,
400+
) -> Iterable[Job]:
401+
"""List materialization jobs of the asset.
402+
403+
:param name: name of asset being created by the materialization jobs.
404+
:type name: str
405+
:param list_view_type: View type for including/excluding (for example) archived jobs. Default: ACTIVE_ONLY.
406+
:type list_view_type: Optional[ListViewType]
407+
:return: An iterator like instance of Job objects.
408+
:rtype: ~azure.core.paging.ItemPaged[Job]
409+
"""
410+
411+
# TODO: Add back 'asset_name=name' filter once client switches to mfe 2023-02-01-preview and above
412+
return self._job_operation.list(job_type="Pipeline", tag=name, list_view_type=list_view_type)
413+
350414
# @monitor_with_activity(logger, "Data.Validate", ActivityType.INTERNALCALL)
351415
def _validate(self, data: Data) -> Union[List[str], None]:
352416
if not data.path:

0 commit comments

Comments
 (0)