-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathcustom_evidently_app_v2.py
More file actions
97 lines (81 loc) · 3.09 KB
/
custom_evidently_app_v2.py
File metadata and controls
97 lines (81 loc) · 3.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from typing import Optional
import pandas as pd
from sklearn.datasets import load_iris
import mlrun.model_monitoring.applications.context as mm_context
from mlrun.common.schemas.model_monitoring.constants import (
ResultKindApp,
ResultStatusApp,
)
from mlrun.feature_store.api import norm_column_name
from mlrun.model_monitoring.applications import ModelMonitoringApplicationResult
from mlrun.model_monitoring.applications.evidently import (
_HAS_EVIDENTLY,
EvidentlyModelMonitoringApplicationBase,
)
if _HAS_EVIDENTLY:
from evidently.core.report import Report, Snapshot
from evidently.metrics import DatasetMissingValueCount, ValueDrift
from evidently.presets import DataDriftPreset, DataSummaryPreset
from evidently.ui.workspace import (
STR_UUID,
OrgID,
)
_PROJECT_NAME = "Iris Monitoring"
_PROJECT_DESCRIPTION = "Test project using iris dataset"
class CustomEvidentlyMonitoringApp(EvidentlyModelMonitoringApplicationBase):
NAME = "evidently-app-test-v2"
def __init__(
self,
evidently_project_id: Optional["STR_UUID"] = None,
evidently_workspace_path: Optional[str] = None,
cloud_workspace: bool = False,
evidently_organization_id: Optional["OrgID"] = None,
) -> None:
self.org_id = evidently_organization_id
self._init_iris_data()
super().__init__(
evidently_project_id=evidently_project_id,
evidently_workspace_path=evidently_workspace_path,
cloud_workspace=cloud_workspace,
)
def _init_iris_data(self) -> None:
iris = load_iris()
self.columns = [norm_column_name(col) for col in iris.feature_names]
self.train_set = pd.DataFrame(iris.data, columns=self.columns)
def do_tracking(
self, monitoring_context: mm_context.MonitoringApplicationContext
) -> ModelMonitoringApplicationResult:
monitoring_context.logger.info("Running evidently app")
sample_df = monitoring_context.sample_df[self.columns]
data_drift_report_run = self.create_report_run(
sample_df, monitoring_context.end_infer_time
)
monitoring_context.logger.info("Logged evidently object")
return ModelMonitoringApplicationResult(
name="data_drift_test",
value=0.5,
kind=ResultKindApp.data_drift,
status=ResultStatusApp.potential_detection,
)
def create_report_run(
self, sample_df: pd.DataFrame, schedule_time: pd.Timestamp
) -> "Snapshot":
metrics = [
DataDriftPreset(),
DatasetMissingValueCount(),
DataSummaryPreset(),
]
metrics.extend(
[
ValueDrift(column=col_name, method="wasserstein")
for col_name in self.columns
]
)
data_drift_report = Report(
metrics=metrics,
metadata={"timestamp": str(schedule_time)},
include_tests=True,
)
return data_drift_report.run(
current_data=sample_df, reference_data=self.train_set
)