Skip to content

Commit 8bc6dd0

Browse files
authored
[ML] Integrate MonitorSchedule with ScheduleOperations (Azure#30049)
1 parent 4a318f7 commit 8bc6dd0

File tree

9 files changed

+725
-14
lines changed

9 files changed

+725
-14
lines changed

sdk/ml/azure-ai-ml/azure/ai/ml/_schema/monitoring/input_data.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88

99
from azure.ai.ml.constants._monitoring import MonitorDatasetContext
1010
from azure.ai.ml._schema.core.schema import PatchedSchemaMeta
11-
from azure.ai.ml._schema.core.fields import NestedField, StringTransformedEnum
12-
from azure.ai.ml._schema.job.input_output_entry import MLTableInputSchema
11+
from azure.ai.ml._schema.core.fields import NestedField, StringTransformedEnum, UnionField
12+
from azure.ai.ml._schema.job.input_output_entry import MLTableInputSchema, DataInputSchema
1313

1414

1515
class MonitorInputDataSchema(metaclass=PatchedSchemaMeta):
16-
input_dataset = NestedField(MLTableInputSchema)
16+
input_dataset = UnionField(union_fields=[NestedField(DataInputSchema), NestedField(MLTableInputSchema)])
1717
dataset_context = StringTransformedEnum(allowed_values=[o.value for o in MonitorDatasetContext])
1818
target_column_name = fields.Str()
1919
pre_processing_component = fields.Str()

sdk/ml/azure-ai-ml/azure/ai/ml/_utils/_arm_id_utils.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
SINGULARITY_FULL_NAME_REGEX_FORMAT,
2323
SINGULARITY_ID_REGEX_FORMAT,
2424
SINGULARITY_SHORT_NAME_REGEX_FORMAT,
25+
NAMED_RESOURCE_ID_FORMAT_WITH_PARENT,
2526
)
2627
from azure.ai.ml.exceptions import ErrorCategory, ErrorTarget, ValidationErrorType, ValidationException
2728

@@ -328,6 +329,20 @@ def is_ARM_id_for_resource(name: Any, resource_type: str = ".*", sub_workspace_r
328329
return False
329330

330331

332+
def is_ARM_id_for_parented_resource(name: str, parent_resource_type: str, child_resource_type: str) -> bool:
333+
resource_regex = NAMED_RESOURCE_ID_FORMAT_WITH_PARENT.format(
334+
".*",
335+
".*",
336+
AZUREML_RESOURCE_PROVIDER,
337+
".*",
338+
parent_resource_type,
339+
".*",
340+
child_resource_type,
341+
"*",
342+
)
343+
return re.match(resource_regex, name, re.IGNORECASE) is not None
344+
345+
331346
def is_registry_id_for_resource(name: Any) -> bool:
332347
if isinstance(name, str) and re.match(REGISTRY_URI_REGEX_FORMAT, name, re.IGNORECASE):
333348
return True

sdk/ml/azure-ai-ml/azure/ai/ml/constants/_common.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
AZUREML_RESOURCE_PROVIDER = "Microsoft.MachineLearningServices"
2727
RESOURCE_ID_FORMAT = "/subscriptions/{}/resourceGroups/{}/providers/{}/workspaces/{}"
2828
NAMED_RESOURCE_ID_FORMAT = "/subscriptions/{}/resourceGroups/{}/providers/{}/workspaces/{}/{}/{}"
29+
NAMED_RESOURCE_ID_FORMAT_WITH_PARENT = "/subscriptions/{}/resourceGroups/{}/providers/{}/workspaces/{}/{}/{}/{}/{}"
2930
LEVEL_ONE_NAMED_RESOURCE_ID_FORMAT = "/subscriptions/{}/resourceGroups/{}/providers/{}/{}/{}"
3031
VERSIONED_RESOURCE_ID_FORMAT = "/subscriptions/{}/resourceGroups/{}/providers/{}/workspaces/{}/{}/{}/versions/{}"
3132
LABELLED_RESOURCE_ID_FORMAT = "/subscriptions/{}/resourceGroups/{}/providers/{}/workspaces/{}/{}/{}/labels/{}"

sdk/ml/azure-ai-ml/azure/ai/ml/entities/_monitoring/input_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def __init__(
2121
target_column_name: str = None,
2222
pre_processing_component: str = None,
2323
):
24-
self.input_dataset = input_dataset
24+
self.input_dataset = input_dataset if isinstance(input_dataset, Input) else Input(**input_dataset)
2525
self.dataset_context = dataset_context
2626
self.target_column_name = target_column_name
2727
self.pre_processing_component = pre_processing_component

sdk/ml/azure-ai-ml/azure/ai/ml/operations/_schedule_operations.py

Lines changed: 81 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,24 @@
1414

1515
from azure.ai.ml._telemetry import ActivityType, monitor_with_activity, monitor_with_telemetry_mixin
1616
from azure.ai.ml._utils._logger_utils import OpsLogger
17-
from azure.ai.ml.entities import Job, Schedule
17+
from azure.ai.ml.entities import Job, JobSchedule, Schedule
18+
from azure.ai.ml.entities._monitoring.schedule import MonitorSchedule
1819
from azure.core.credentials import TokenCredential
1920
from azure.core.polling import LROPoller
2021
from azure.core.tracing.decorator import distributed_trace
2122

2223
from .._restclient.v2022_10_01.models import ScheduleListViewType
24+
from .._utils._arm_id_utils import is_ARM_id_for_parented_resource
25+
from .._utils.utils import snake_to_camel
2326
from .._utils._azureml_polling import AzureMLPolling
24-
from ..constants._common import AzureMLResourceType, LROConfigurations
27+
from ..constants._common import (
28+
ARM_ID_PREFIX,
29+
AzureMLResourceType,
30+
LROConfigurations,
31+
NAMED_RESOURCE_ID_FORMAT_WITH_PARENT,
32+
AZUREML_RESOURCE_PROVIDER,
33+
)
34+
from ..constants._monitoring import MonitorSignalType
2535
from . import JobOperations
2636
from ._job_ops_helper import stream_logs_until_completion
2737
from ._operation_orchestrator import OperationOrchestrator
@@ -171,10 +181,14 @@ def begin_create_or_update(
171181
:rtype: Union[LROPoller, ~azure.ai.ml.entities.Schedule]
172182
"""
173183

174-
schedule._validate(raise_error=True)
175-
if isinstance(schedule.create_job, Job):
176-
# Create all dependent resources for job inside schedule
177-
self._job_operations._resolve_arm_id_or_upload_dependencies(schedule.create_job)
184+
if isinstance(schedule, JobSchedule):
185+
schedule._validate(raise_error=True)
186+
if isinstance(schedule.create_job, Job):
187+
# Create all dependent resources for job inside schedule
188+
self._job_operations._resolve_arm_id_or_upload_dependencies(schedule.create_job)
189+
elif isinstance(schedule, MonitorSchedule):
190+
# resolve ARM id for target, compute, and input datasets for each signal
191+
self._resolve_monitor_schedule_arm_id(schedule)
178192
# Create schedule
179193
schedule_data = schedule._to_rest_object()
180194
poller = self.service_client.begin_create_or_update(
@@ -221,3 +235,64 @@ def begin_disable(
221235
schedule = self.get(name=name)
222236
schedule._is_enabled = False
223237
return self.begin_create_or_update(schedule)
238+
239+
def _resolve_monitor_schedule_arm_id(self, schedule: MonitorSchedule) -> None:
240+
# resolve compute ID
241+
schedule.create_monitor.compute = self._orchestrators.get_asset_arm_id(
242+
schedule.create_monitor.compute, AzureMLResourceType.COMPUTE, register_asset=False
243+
)
244+
245+
# resolve target ARM ID
246+
target = schedule.create_monitor.monitoring_target
247+
if target and target.endpoint_deployment_id:
248+
target.endpoint_deployment_id = (
249+
target.endpoint_deployment_id[len(ARM_ID_PREFIX) :]
250+
if target.endpoint_deployment_id.startswith(ARM_ID_PREFIX)
251+
else target.endpoint_deployment_id
252+
)
253+
254+
# if it is an ARM ID, don't process it
255+
if not is_ARM_id_for_parented_resource(
256+
target.endpoint_deployment_id,
257+
snake_to_camel(AzureMLResourceType.ONLINE_ENDPOINT),
258+
AzureMLResourceType.DEPLOYMENT,
259+
):
260+
endpoint_name, deployment_name = target.endpoint_deployment_id.split(":")
261+
target.endpoint_deployment_id = NAMED_RESOURCE_ID_FORMAT_WITH_PARENT.format(
262+
self._subscription_id,
263+
self._resource_group_name,
264+
AZUREML_RESOURCE_PROVIDER,
265+
self._workspace_name,
266+
snake_to_camel(AzureMLResourceType.ONLINE_ENDPOINT),
267+
endpoint_name,
268+
AzureMLResourceType.DEPLOYMENT,
269+
deployment_name,
270+
)
271+
272+
elif target and target.model_id:
273+
target.model_id = self._orchestrators.get_asset_arm_id(
274+
target.model_id,
275+
AzureMLResourceType.MODEL,
276+
register_asset=False,
277+
)
278+
279+
# resolve input paths and preprocessing component ids
280+
for signal in schedule.create_monitor.monitoring_signals.values():
281+
if signal.type == MonitorSignalType.CUSTOM:
282+
for input_value in signal.input_datasets.values():
283+
self._job_operations._resolve_job_input(input_value.input_dataset, schedule._base_path)
284+
input_value.pre_processing_component = self._orchestrators.get_asset_arm_id(
285+
asset=input_value.pre_processing_component, azureml_type=AzureMLResourceType.COMPONENT
286+
)
287+
else:
288+
self._job_operations._resolve_job_inputs(
289+
[signal.target_dataset.dataset.input_dataset, signal.baseline_dataset.input_dataset],
290+
schedule._base_path,
291+
)
292+
signal.target_dataset.dataset.pre_processing_component = self._orchestrators.get_asset_arm_id(
293+
asset=signal.target_dataset.dataset.pre_processing_component,
294+
azureml_type=AzureMLResourceType.COMPONENT,
295+
)
296+
signal.baseline_dataset.pre_processing_component = self._orchestrators.get_asset_arm_id(
297+
asset=signal.baseline_dataset.pre_processing_component, azureml_type=AzureMLResourceType.COMPONENT
298+
)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from typing import Callable
2+
3+
from devtools_testutils import AzureRecordedTestCase
4+
import pytest
5+
6+
from azure.ai.ml import MLClient
7+
from azure.ai.ml.constants._common import AzureMLResourceType
8+
from azure.ai.ml.entities._load_functions import load_schedule
9+
from azure.ai.ml.entities._monitoring.schedule import MonitorSchedule
10+
from azure.ai.ml._utils._arm_id_utils import is_ARM_id_for_resource
11+
12+
13+
@pytest.mark.timeout(600)
14+
@pytest.mark.usefixtures("recorded_test")
15+
@pytest.mark.core_sdk_test
16+
class TestMonitorSchedule(AzureRecordedTestCase):
17+
def test_data_drift_schedule_create(
18+
self, client: MLClient, data_with_2_versions: str, randstr: Callable[[str], str]
19+
):
20+
test_path = "tests/test_configs/monitoring/yaml_configs/data_drift.yaml"
21+
22+
schedule_name = randstr("schedule_name")
23+
24+
params_override = [
25+
{"name": schedule_name},
26+
{
27+
"create_monitor.monitoring_signals.testSignal.target_dataset.dataset.input_dataset.path": f"azureml:{data_with_2_versions}:1"
28+
},
29+
{"create_monitor.monitoring_signals.testSignal.target_dataset.dataset.input_dataset.type": "uri_folder"},
30+
{
31+
"create_monitor.monitoring_signals.testSignal.baseline_dataset.input_dataset.path": f"azureml:{data_with_2_versions}:2"
32+
},
33+
{"create_monitor.monitoring_signals.testSignal.baseline_dataset.input_dataset.type": "uri_folder"},
34+
]
35+
36+
schedule = load_schedule(test_path, params_override=params_override)
37+
# not testing monitoring target expansion right now
38+
schedule.create_monitor.monitoring_target = None
39+
# bug in service when deserializing lookback_period is not supported yet
40+
schedule.create_monitor.monitoring_signals["testSignal"].target_dataset.lookback_period = None
41+
42+
created_schedule = client.schedules.begin_create_or_update(schedule).result()
43+
44+
# test ARM id resolution
45+
assert isinstance(created_schedule, MonitorSchedule)
46+
assert is_ARM_id_for_resource(created_schedule.create_monitor.compute, AzureMLResourceType.COMPUTE)
47+
48+
data_drift_signal = created_schedule.create_monitor.monitoring_signals["testSignal"]
49+
assert data_drift_signal.target_dataset
50+
assert is_ARM_id_for_resource(
51+
data_drift_signal.target_dataset.dataset.input_dataset.path, AzureMLResourceType.DATA
52+
)
53+
assert data_drift_signal.baseline_dataset
54+
assert is_ARM_id_for_resource(data_drift_signal.baseline_dataset.input_dataset.path, AzureMLResourceType.DATA)

0 commit comments

Comments
 (0)