Skip to content

Commit 14dad53

Browse files
Sushanth Sathish Kumarsushanthkumar2004
authored andcommitted
feature: Added automatic dashboard feature for DefaultModelMonitor create_monitoring_schedule with error handling, TODO: modify update_monitoring_schedule accordingly and add unit tests
1 parent da348e8 commit 14dad53

File tree

2 files changed

+266
-0
lines changed

2 files changed

+266
-0
lines changed
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
import json
2+
3+
class Variable:
4+
def __init__(self, variable_type, variable_property, inputType, id, label, search, populateFrom):
5+
self.variable_type = variable_type
6+
self.variable_property = variable_property
7+
self.inputType = inputType
8+
self.id = id
9+
self.label = label
10+
self.search = search
11+
self.populateFrom = populateFrom
12+
13+
def to_dict(self):
14+
variable_properties_dict = {}
15+
if self.variable_type != None:
16+
variable_properties_dict["type"] = self.variable_type
17+
if self.variable_property != None:
18+
variable_properties_dict["property"] = self.variable_property
19+
if self.inputType != None:
20+
variable_properties_dict["inputType"] = self.inputType
21+
if self.id != None:
22+
variable_properties_dict["id"] = self.id
23+
if self.label != None:
24+
variable_properties_dict["label"] = self.label
25+
if self.search != None:
26+
variable_properties_dict["search"] = self.search
27+
if self.populateFrom != None:
28+
variable_properties_dict["populateFrom"] = self.populateFrom
29+
return variable_properties_dict
30+
31+
def to_json(self):
32+
json.dumps(self.to_dict(), indent=4)
33+
34+
class WidgetProperties:
35+
def __init__(self, view=None, stacked=None, metrics=None, region=None, period=None, title=None, markdown=None):
36+
self.view = view
37+
self.stacked = stacked
38+
self.metrics = metrics
39+
self.region = region
40+
self.period = period
41+
self.title = title
42+
self.markdown = markdown
43+
44+
def to_dict(self):
45+
widget_properties_dict = {}
46+
if self.view != None:
47+
widget_properties_dict["view"] = self.view
48+
if self.period != None:
49+
widget_properties_dict["period"] = self.period
50+
if self.markdown != None:
51+
widget_properties_dict["markdown"] = self.markdown
52+
if self.stacked != None:
53+
widget_properties_dict["stacked"] = self.stacked
54+
if self.region != None:
55+
widget_properties_dict["region"] = self.region
56+
if self.metrics != None:
57+
widget_properties_dict["metrics"] = self.metrics
58+
if self.title != None:
59+
widget_properties_dict["title"] = self.title
60+
return widget_properties_dict
61+
62+
def to_json(self):
63+
json.dumps(self.to_dict(), indent=4)
64+
65+
class Widget:
66+
def __init__(self, height, width, widget_type, properties=None):
67+
self.height = height
68+
self.width = width
69+
self.type = widget_type
70+
self.properties = properties if properties else WidgetProperties(None, False, [], None, None, None)
71+
72+
def to_dict(self):
73+
return {
74+
"height": self.height,
75+
"width": self.width,
76+
"type": self.type,
77+
"properties": self.properties.to_dict()
78+
}
79+
80+
def to_json(self):
81+
return json.dumps(self.to_dict(), indent=4)
82+
83+
class AutomaticDataQualityDashboard:
84+
DATA_QUALITY_METRICS_ENDPOINT_NAMESPACE = "{aws/sagemaker/Endpoints/data-metrics,Endpoint,Feature,MonitoringSchedule}"
85+
DATA_QUALITY_METRICS_BATCH_NAMESPACE = "{aws/sagemaker/ModelMonitoring/data-metrics,Feature,MonitoringSchedule}"
86+
87+
def __init__(self, endpoint_name, monitoring_schedule_name, batch_transform_input, region_name):
88+
self.endpoint = endpoint_name
89+
self.monitoring_schedule = monitoring_schedule_name
90+
self.batch_transform = batch_transform_input
91+
self.region = region_name
92+
93+
variables = self._generate_variables()
94+
type_counts_widget = self._generate_type_counts_widget()
95+
null_counts_widget = self._generate_null_counts_widget()
96+
estimated_unique_values_widget = self._generate_estimated_unique_values_widget()
97+
completeness_widget = self._generate_completeness_widget()
98+
baseline_drift_widget = self._generate_baseline_drift_widget()
99+
100+
self.dashboard = {"variables" : variables, "widgets": [type_counts_widget, null_counts_widget, estimated_unique_values_widget, completeness_widget, baseline_drift_widget]}
101+
102+
def _generate_variables(self):
103+
if self.batch_transform:
104+
return [Variable(variable_type="property",
105+
variable_property="Feature",
106+
inputType="select",
107+
id="Feature",
108+
label="Feature",
109+
search=AutomaticDataQualityDashboard.DATA_QUALITY_METRICS_BATCH_NAMESPACE,
110+
populateFrom="Feature")]
111+
else:
112+
return [Variable(variable_type="property",
113+
variable_property="Feature",
114+
inputType="select",
115+
id="Feature",
116+
label="Feature",
117+
search=AutomaticDataQualityDashboard.DATA_QUALITY_METRICS_ENDPOINT_NAMESPACE,
118+
populateFrom="Feature")]
119+
120+
def _generate_type_counts_widget(self):
121+
if self.batch_transform:
122+
type_counts_widget_properties = WidgetProperties(view="timeSeries",
123+
stacked=False,
124+
metrics=[[{ "expression": f"SEARCH( '{AutomaticDataQualityDashboard.DATA_QUALITY_METRICS_BATCH_NAMESPACE} feature_(fractional OR boolean OR integral OR string OR unknown)_counts_ Feature=\"_\" MonitoringSchedule=\"{self.monitoring_schedule}\" ', 'Average')"}]],
125+
region=self.region,
126+
title="Type Counts")
127+
else:
128+
type_counts_widget_properties = WidgetProperties(view="timeSeries",
129+
stacked=False,
130+
metrics=[[{ "expression": f"SEARCH( '{AutomaticDataQualityDashboard.DATA_QUALITY_METRICS_ENDPOINT_NAMESPACE} feature_(fractional OR boolean OR integral OR string OR unknown)_counts_ Endpoint=\"{self.endpoint}\" Feature=\"_\" MonitoringSchedule=\"{self.monitoring_schedule}\" ', 'Average')"}]],
131+
region=self.region,
132+
title="Type Counts")
133+
return Widget(height=8, width=12, widget_type="metric", properties=type_counts_widget_properties)
134+
135+
def _generate_null_counts_widget(self):
136+
if self.batch_transform:
137+
null_counts_widget_properties = WidgetProperties(view="timeSeries",
138+
stacked=False,
139+
metrics=[[{ "expression": f"SEARCH( '{AutomaticDataQualityDashboard.DATA_QUALITY_METRICS_BATCH_NAMESPACE} feature_null_ OR feature_non_null_ Feature=\"_\" MonitoringSchedule=\"{self.monitoring_schedule}\" ', 'Average')"}]],
140+
region=self.region,
141+
title="Missing Data Counts")
142+
else:
143+
null_counts_widget_properties = WidgetProperties(view="timeSeries",
144+
stacked=False,
145+
metrics=[[{ "expression": f"SEARCH( '{AutomaticDataQualityDashboard.DATA_QUALITY_METRICS_ENDPOINT_NAMESPACE} feature_null_ OR feature_non_null_ Endpoint=\"{self.endpoint}\" Feature=\"_\" MonitoringSchedule=\"{self.monitoring_schedule}\" ', 'Average')"}]],
146+
region=self.region,
147+
title="Missing Data Counts")
148+
return Widget(height=8, width=12, widget_type="metric", properties=null_counts_widget_properties)
149+
150+
def _generate_estimated_unique_values_widget(self):
151+
if self.batch_transform:
152+
estimated_unique_vals_widget_properties = WidgetProperties(view="timeSeries",
153+
stacked=False,
154+
metrics=[[{ "expression": f"SEARCH( '{AutomaticDataQualityDashboard.DATA_QUALITY_METRICS_BATCH_NAMESPACE} feature_estimated_unique_values_ Feature=\"_\" MonitoringSchedule=\"{self.monitoring_schedule}\" ', 'Average')"}]],
155+
region=self.region,
156+
title="Estimated Unique Values")
157+
else:
158+
estimated_unique_vals_widget_properties = WidgetProperties(view="timeSeries",
159+
stacked=False,
160+
metrics=[[{ "expression": f"SEARCH( '{AutomaticDataQualityDashboard.DATA_QUALITY_METRICS_ENDPOINT_NAMESPACE} feature_estimated_unique_values_ Endpoint=\"{self.endpoint}\" Feature=\"_\" MonitoringSchedule=\"{self.monitoring_schedule}\" ', 'Average')"}]],
161+
region=self.region,
162+
title="Estimated Unique Values")
163+
164+
return Widget(height=8, width=12, widget_type="metric", properties=estimated_unique_vals_widget_properties)
165+
166+
def _generate_completeness_widget(self):
167+
if self.batch_transform:
168+
completeness_widget_properties = WidgetProperties(view="timeSeries",
169+
stacked=False,
170+
metrics=[[{ "expression": f"SEARCH( '{AutomaticDataQualityDashboard.DATA_QUALITY_METRICS_BATCH_NAMESPACE} feature_completeness_ Feature=\"_\" MonitoringSchedule=\"{self.monitoring_schedule}\" ', 'Average')"}]],
171+
region=self.region,
172+
title="Completeness")
173+
else:
174+
completeness_widget_properties = WidgetProperties(view="timeSeries",
175+
stacked=False,
176+
metrics=[[{ "expression": f"SEARCH( '{AutomaticDataQualityDashboard.DATA_QUALITY_METRICS_ENDPOINT_NAMESPACE} feature_completeness_ Endpoint=\"{self.endpoint}\" Feature=\"_\" MonitoringSchedule=\"{self.monitoring_schedule}\" ', 'Average')"}]],
177+
region=self.region,
178+
title="Completeness")
179+
return Widget(height=8, width=12, widget_type="metric", properties=completeness_widget_properties)
180+
181+
def _generate_baseline_drift_widget(self):
182+
if self.batch_transform:
183+
baseline_drift_widget_properties = WidgetProperties(view="timeSeries",
184+
stacked=False,
185+
metrics=[[{ "expression": f"SEARCH( '{AutomaticDataQualityDashboard.DATA_QUALITY_METRICS_BATCH_NAMESPACE} feature_baseline_drift_ Feature=\"_\" MonitoringSchedule=\"{self.monitoring_schedule}\" ', 'Average')"}]],
186+
region=self.region,
187+
title="Baseline Drift")
188+
else:
189+
baseline_drift_widget_properties = WidgetProperties(view="timeSeries",
190+
stacked=False,
191+
metrics=[[{ "expression": f"SEARCH( '{AutomaticDataQualityDashboard.DATA_QUALITY_METRICS_ENDPOINT_NAMESPACE} feature_baseline_drift_ Endpoint=\"{self.endpoint}\" Feature=\"_\" MonitoringSchedule=\"{self.monitoring_schedule}\" ', 'Average')"}]],
192+
region=self.region,
193+
title="Baseline Drift")
194+
return Widget(height=8, width=12, widget_type="metric", properties=baseline_drift_widget_properties)
195+
196+
def to_dict(self):
197+
return {"variables" : [var.to_dict() for var in self.dashboard["variables"]], "widgets" : [widget.to_dict() for widget in self.dashboard["widgets"]]}
198+
199+
def to_json(self):
200+
return json.dumps(self.to_dict(), indent=4)
201+
202+
# # The above code is a Python script that uses the boto3 library to interact with AWS CloudWatch
203+
# service. It seems to be related to creating and managing CloudWatch dashboards.
204+
import boto3
205+
# dashboard_body = AutomaticDataQualityDashboard("DEMO-xgb-churn-pred-model-monitor-2024-07-09-17-54-38", "DEMO-xgb-churn-pred-model-monitor-schedule-2024-07-09-18-30-14", None, "us-west-2").to_json()
206+
# print(dashboard_body)
207+
# response = boto3.client('cloudwatch').put_dashboard(
208+
# DashboardName="New",
209+
# DashboardBody=dashboard_body
210+
# )
211+
# print(response)
212+
213+
214+
# resp = boto3.client('cloudwatch').get_dashboard(DashboardName="Old")
215+
# print(resp)

src/sagemaker/model_monitor/model_monitoring.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import uuid
2626
from typing import Union, Optional, Dict, List
2727
import attr
28+
import re
2829

2930
from six import string_types
3031
from six.moves.urllib.parse import urlparse
@@ -67,6 +68,8 @@
6768
from sagemaker.lineage._utils import get_resource_name_from_arn
6869
from sagemaker.model_monitor.cron_expression_generator import CronExpressionGenerator
6970

71+
from sagemaker.model_monitor.dashboards import AutomaticDataQualityDashboard
72+
7073
DEFAULT_REPOSITORY_NAME = "sagemaker-model-monitor-analyzer"
7174

7275
STATISTICS_JSON_DEFAULT_FILE_NAME = "statistics.json"
@@ -1945,6 +1948,8 @@ def create_monitoring_schedule(
19451948
monitor_schedule_name=None,
19461949
schedule_cron_expression=None,
19471950
enable_cloudwatch_metrics=True,
1951+
enable_automatic_dashboard=False,
1952+
dashboard_name=None,
19481953
batch_transform_input=None,
19491954
data_analysis_start_time=None,
19501955
data_analysis_end_time=None,
@@ -1988,6 +1993,8 @@ def create_monitoring_schedule(
19881993
data_analysis_end_time (str): End time for the data analysis window
19891994
for the one time monitoring schedule (NOW), e.g. "-PT1H" (default: None)
19901995
"""
1996+
cw_client = self.sagemaker_session.boto_session.client('cloudwatch')
1997+
19911998
if self.job_definition_name is not None or self.monitoring_schedule_name is not None:
19921999
message = (
19932000
"It seems that this object was already used to create an Amazon Model "
@@ -2005,6 +2012,45 @@ def create_monitoring_schedule(
20052012
)
20062013
logger.error(message)
20072014
raise ValueError(message)
2015+
2016+
# error checking and validation logic for dashboard name
2017+
if (enable_cloudwatch_metrics == False and enable_automatic_dashboard == True):
2018+
message = (
2019+
"Could not create automatic dashboard. Please set enable_cloudwatch_metrics to True."
2020+
)
2021+
logger.error(message)
2022+
raise ValueError(message)
2023+
2024+
if (enable_automatic_dashboard == True):
2025+
# verify that the provided dashboard name is not taken
2026+
dashboard_name = monitor_schedule_name if dashboard_name is None else dashboard_name
2027+
2028+
dashboard_name_validation = bool(re.match(r'^[0-9A-Za-z\-_]{1,255}$', dashboard_name))
2029+
if dashboard_name_validation == False:
2030+
message = (
2031+
f"Dashboard name {dashboard_name} is not a valid dashboard name. "
2032+
"Dashboard name can be at most 255 characters long "
2033+
"and valid characters in dashboard names include '0-9A-Za-z-_'."
2034+
)
2035+
logger.error(message)
2036+
raise ValueError(message)
2037+
2038+
try:
2039+
# try to access dashboard name to see if it exists
2040+
cw_client.get_dashboard(DashboardName=dashboard_name)
2041+
message = (
2042+
f"Dashboard name {dashboard_name} is already in use. "
2043+
"Please provide a different dashboard name, or delete the already "
2044+
"existing dashboard."
2045+
)
2046+
logger.error(message)
2047+
raise ValueError(message)
2048+
except Exception as e:
2049+
# in this case, the dashboard name is not in use
2050+
# and we are free to write to it without overwriting any
2051+
# customer data.
2052+
pass
2053+
20082054

20092055
self._check_monitoring_schedule_cron_validity(
20102056
schedule_cron_expression=schedule_cron_expression,
@@ -2068,6 +2114,11 @@ def create_monitoring_schedule(
20682114
message = "Failed to delete job definition {}.".format(new_job_definition_name)
20692115
logger.exception(message)
20702116
raise
2117+
2118+
cw_client.put_dashboard(
2119+
DashboardName=dashboard_name,
2120+
DashboardBody=AutomaticDataQualityDashboard(endpoint_name=endpoint_input, monitoring_schedule_name=monitor_schedule_name, batch_transform_input=batch_transform_input, region_name=self.sagemaker_session.boto_region_name)
2121+
)
20712122

20722123
def update_monitoring_schedule(
20732124
self,

0 commit comments

Comments
 (0)