diff --git a/smsdk/client.py b/smsdk/client.py index 42a9db6..fb49d49 100644 --- a/smsdk/client.py +++ b/smsdk/client.py @@ -238,6 +238,116 @@ def get_data_v1(self, ename, util_name, normalize=True, *args, **kwargs): return data + @version_check_decorator + def list_alerts(self, alert_type=""): + """ + Main data fetching function for all the entities. Note this is the general data fetch function. You probably want to use the model-specific functions such as get_cycles(). + :param ename: Name of the entities + :param util_name: Name of the utility function + :param normalize: Flatten nested data structures + :return: pandas dataframe + """ + base_url = get_url( + self.config["protocol"], + self.tenant, + self.config["site.domain"], + self.config["port"], + ) + + cls = smsdkentities.get("alert")(self.session, base_url) + alert_info = getattr(cls, "list_alerts")(alert_type) + return alert_info + + @version_check_decorator + def get_alert_dataframe(self, alert_type=""): + """ + Main data fetching function for all the entities. Note this is the general data fetch function. You probably want to use the model-specific functions such as get_cycles(). + :param ename: Name of the entities + :param util_name: Name of the utility function + :param normalize: Flatten nested data structures + :return: pandas dataframe + """ + base_url = get_url( + self.config["protocol"], + self.tenant, + self.config["site.domain"], + self.config["port"], + ) + + cls = smsdkentities.get("alert")(self.session, base_url) + alert_dataframe = getattr(cls, "get_alert_dataframe")(alert_type) + return alert_dataframe + + def update_alert(self, alert_id, params): + base_url = get_url( + self.config["protocol"], + self.tenant, + self.config["site.domain"], + self.config["port"], + ) + cls = smsdkentities.get("alert")(self.session, base_url) + getattr(cls, "update_alert")(alert_id, params) + + def delete_alert(self, alert_id=None, delete_all=False, alert_group=""): + """ + Main data fetching function for all the entities. Note this is the general data fetch function. You probably want to use the model-specific functions such as get_cycles(). + :param ename: Name of the entities + :param util_name: Name of the utility function + :param normalize: Flatten nested data structures + :return: pandas dataframe + """ + base_url = get_url( + self.config["protocol"], + self.tenant, + self.config["site.domain"], + self.config["port"], + ) + if alert_id is None and not delete_all and not alert_group: + print( + "Invalid input please provide alert group or alert id or delete_all flag as true if you want to delete all the alerts." + ) + return + cls = smsdkentities.get("alert")(self.session, base_url) + getattr(cls, "delete_alert")(alert_id, delete_all, alert_group) + + @version_check_decorator + def create_alerts(self, dataframe, alert_type=""): + """ + Main data fetching function for all the entities. Note this is the general data fetch function. You probably want to use the model-specific functions such as get_cycles(). + :param ename: Name of the entities + :param util_name: Name of the utility function + :param normalize: Flatten nested data structures + :return: pandas dataframe + """ + base_url = get_url( + self.config["protocol"], + self.tenant, + self.config["site.domain"], + self.config["port"], + ) + + cls = smsdkentities.get("alert")(self.session, base_url) + getattr(cls, "create_alert")(alert_type, dataframe) + + @version_check_decorator + def update_alert_group(self, dataframe): + """ + Main data fetching function for all the entities. Note this is the general data fetch function. You probably want to use the model-specific functions such as get_cycles(). + :param ename: Name of the entities + :param util_name: Name of the utility function + :param normalize: Flatten nested data structures + :return: pandas dataframe + """ + base_url = get_url( + self.config["protocol"], + self.tenant, + self.config["site.domain"], + self.config["port"], + ) + + cls = smsdkentities.get("alert")(self.session, base_url) + getattr(cls, "update_alert_group")(dataframe) + @version_check_decorator @ClientV0.validate_input @ClientV0.cycle_decorator diff --git a/smsdk/config/api_endpoints.json b/smsdk/config/api_endpoints.json index bd3497c..e0555ee 100644 --- a/smsdk/config/api_endpoints.json +++ b/smsdk/config/api_endpoints.json @@ -2,6 +2,9 @@ "Auth": { "url": "/auth/password/login" }, + "Alert": { + "url": "/v1/obj/alert_config" + }, "Cycle": { "url_v1": "/v1/datatab/cycle", "url": "/api/cycle", diff --git a/smsdk/smsdk_entities/alerts/__init__.py b/smsdk/smsdk_entities/alerts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/smsdk/smsdk_entities/alerts/alerts.py b/smsdk/smsdk_entities/alerts/alerts.py new file mode 100644 index 0000000..9b13698 --- /dev/null +++ b/smsdk/smsdk_entities/alerts/alerts.py @@ -0,0 +1,528 @@ +import json +from pandas import json_normalize +import copy +from typing import List +import ast +from copy import deepcopy +from math import isnan +import pandas as pd +import importlib.resources as pkg_resources +from typing import Dict, Any, List + +from smsdk.tool_register import SmsdkEntities, smsdkentities +from smsdk.utils import module_utility +from smsdk import config +from smsdk.ma_session import MaSession +import numpy as np + +import logging + +log = logging.getLogger(__name__) + +ENDPOINTS = json.loads(pkg_resources.read_text(config, "api_endpoints.json")) + +try: + NPINFINITY = np.Inf +except AttributeError: + # numpy 2.0 + NPINFINITY = np.inf + + +@smsdkentities.register("alerts") +class Alerts(SmsdkEntities, MaSession): + # Decorator to register a function as utility + # Only the registered utilites would be accessible + # to outside world via client.get_data() + mod_util = module_utility() + + def __init__(self, session: Any, base_url: str) -> None: + self.session = session + self.base_url = base_url + + @mod_util + def update_alert_payload( + self, payload: Dict[str, Any], updates: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Update the alert payload with new values provided in the updates dictionary. + + :param payload: dict, the original payload + :param updates: dict, fields to be updated in the payload + :return: dict, updated payload + """ + updated_payload = copy.deepcopy(payload) # Avoid modifying the original payload + + if "display_name" in updates: + updated_payload["display_name"] = updates["display_name"] + + if "email_list" in updates: + if updates.get("extend_lists", False): + updated_payload["notification"]["backend"]["email"][ + "email_list" + ].extend(updates["email_list"]) + else: + updated_payload["notification"]["backend"]["email"][ + "email_list" + ] = updates["email_list"] + + if "interval" in updates: + updated_payload["trigger"]["schedule"]["interval"] = updates["interval"] + updated_payload["analytic"]["plugin_parameters"]["alert_config"][ + "max_latency" + ] = updates["interval"] + + if "max_latency" in updates: + updated_payload["analytic"]["plugin_parameters"]["alert_config"][ + "max_latency" + ] = updates["max_latency"] + + if "white_list" in updates: + if updates.get("extend_lists", False): + updated_payload["analytic"]["plugin_parameters"]["alert_config"][ + "white_list" + ].extend(updates["white_list"]) + else: + updated_payload["analytic"]["plugin_parameters"]["alert_config"][ + "white_list" + ] = updates["white_list"] + + if "black_list" in updates: + if updates.get("extend_lists", False): + updated_payload["analytic"]["plugin_parameters"]["alert_config"][ + "black_list" + ].extend(updates["black_list"]) + else: + updated_payload["analytic"]["plugin_parameters"]["alert_config"][ + "black_list" + ] = updates["black_list"] + + keys_to_remove = [ + "incident_total", + "id", + "created_by", + "system_fixture", + "audit_keys", + "tombstone_epoch", + "tombstone", + "version", + "updatelocation", + "localtz", + "updatetime", + "capturetime_epoch", + "capturetime", + ] + for key in keys_to_remove: + updated_payload.pop(key, None) + + return updated_payload + + @mod_util + def get_updated_alert( + self, existing: Dict[str, Any], updates: Dict[str, Any] + ) -> Dict[str, Any]: + """Recursively update existing dictionary with new values, handling nested structures.""" + if not isinstance(existing, dict) or not isinstance(updates, dict): + return ( + updates + if not (isinstance(updates, float) and isnan(updates)) + else existing + ) + + updated = deepcopy(existing) + for key, value in updates.items(): + if isinstance(value, dict): # Handle nested dictionaries + updated[key] = self.get_updated_alert(existing.get(key, {}), value) + elif isinstance(value, list): # Handle lists + if all(isinstance(v, dict) for v in value): # List of dicts + updated[key] = [ + self.get_updated_alert(old, new) + for old, new in zip(existing.get(key, []), value) + ] + else: + updated[key] = value # Replace non-dict lists completely + else: + if not (isinstance(value, float) and isnan(value)): + updated[key] = value # Replace only if not NaN + keys_to_remove = [ + "incident_total", + "id", + "created_by", + "system_fixture", + "audit_keys", + "tombstone_epoch", + "tombstone", + "version", + "updatelocation", + "localtz", + "updatetime", + "capturetime_epoch", + "capturetime", + ] + for key in keys_to_remove: + updated.pop(key, None) + return updated + + @mod_util + def get_utilities( + self, *args: tuple[Any, ...], **kwargs: dict[str, Any] + ) -> List[Any]: + """ + Get the list of registered utilites by name + """ + return [*self.mod_util.all] + + @mod_util + def get_alert_config(self, alert_id: str) -> Any: + url = "{}{}{}".format(self.base_url, "/v1/obj/alert_config/", alert_id) + response = self.session.get(url) + if response.status_code in [200, 201]: + alert_config = response.json() + return alert_config + else: + print(f"\033[91mFor alert_id: {alert_id}-- {response.text}\033[0m") + return None + + @mod_util + def get_filtered_alerts_by_group( + self, alerts: List[Any], alert_group: str + ) -> List[Any]: + mapping = { + "kpi": "KPIAlerting", + "data_latency": "DataLatencyAlertingETL3", + "spc": "SPCXBarRControlChartTable", + "pipelinesourcemonitoring": "PipelineSourceMonitoring", + } + alert_plugin_id = mapping.get(alert_group.lower(), None) + alerts = [ + data + for data in alerts + if data["analytic"].get("plugin_id") == alert_plugin_id + ] + return alerts + + @mod_util + def update_alert(self, alert_id: str, updated_params: List[Any]) -> None: + original_alert = self.get_alert_config(alert_id) + if updated_params: + updated_payload = self.update_alert_payload(original_alert, updated_params) + url = "{}{}{}".format(self.base_url, "/v1/obj/alert_config/", alert_id) + response = self.session.put(url, json=updated_payload) + if response.status_code in [200, 201]: + print( + f"\033[92mSuccessfully updated alert with id \033[0m`{alert_id}`." + ) + else: + print(f"\033[91m{response.text}\033[0m") + else: + print( + "Please enter params to be updated in dict format for `updated_params`" + ) + + @mod_util + def update_alert_group(self, updated_dataframe: pd.DataFrame) -> None: + dataframe = self.convert_str_to_dict(updated_dataframe) + json_data = self.reconstruct_json(dataframe) + for item in json_data: + alert_config = self.get_alert_config(item["id"]) + if alert_config is None: + continue + updated_alert = self.get_updated_alert(alert_config, item) + url = "{}{}{}".format(self.base_url, "/v1/obj/alert_config/", item["id"]) + response = self.session.put(url, json=updated_alert) + if response.status_code in [200, 201]: + print( + f"\033[92mSuccessfully updated alert with id \033[0m`{item['id']}`." + ) + else: + print(f"\033[91m{response.text}\033[0m") + + @mod_util + def fetch_alerts_data(self) -> List[Dict[str, Any]]: + url = "{}{}".format(self.base_url, "/v1/obj/alert_config") + response = self.session.get(url) + if response.status_code in [200, 201]: + alerts: List[Dict[str, Any]] = response.json()["objects"] + return alerts + else: + print(f"\033[91m{response.text}\033[0m") + return [] + + @mod_util + def list_alerts(self, alert_type: str) -> Any: + """ """ + mapping = { + "kpi": "KPIAlerting", + "data_latency": "DataLatencyAlertingETL3", + "spc": "SPCXBarRControlChartTable", + "pipelinesourcemonitoring": "PipelineSourceMonitoring", + } + alert_plugin_id = mapping.get(alert_type.lower(), None) + alerts = self.fetch_alerts_data() + if alerts: + transformed_data = [] + for data in alerts: + try: + creator = f"{data['created_by']['metadata']['first_name']} {data['created_by']['metadata']['last_name']}" + except: + creator = "Undefined Undefined" + status = "Enabled" if data.get("enabled") else "Disabled" + alert_type = data["analytic"].get("plugin_id") + if alert_plugin_id is not None: + if alert_plugin_id != alert_type: + continue + incident_count = data.get("incident_total") + display_name = data.get("display_name").strip() + t_data = { + "display_name": display_name, + "analytic": alert_type, + "Creator": creator, + "status": status, + "incident_count": incident_count, + } + if display_name: + transformed_data.append(t_data) + alert_dataframes = pd.DataFrame(transformed_data) + return alert_dataframes + return None + + @mod_util + def get_alert_dataframe(self, alert_type: str) -> pd.DataFrame: + """Fetches and returns alerts as a structured DataFrame""" + alerts = self.fetch_alerts_data() + print("alerts", len(alerts)) + if alert_type: + alerts = self.get_filtered_alerts_by_group(alerts, alert_type) + + alert_ids = [data["id"] for data in alerts if "id" in data] + + if not alert_ids: + return pd.DataFrame() # Return empty DataFrame instead of None + + alert_data = [ + self.get_alert_config(id) for id in alert_ids if self.get_alert_config(id) + ] + + if not alert_data: + return pd.DataFrame() + + # Convert list of dicts to DataFrame + df_main = pd.DataFrame(alert_data) + # Normalize different sections with different depths + df_created_by = ( + pd.json_normalize(df_main["created_by"], sep="___", max_level=2).add_prefix( + "created_by___" + ) + if "created_by" in df_main + else None + ) + df_notification = ( + pd.json_normalize( + df_main["notification"], sep="___", max_level=2 + ).add_prefix("notification___") + if "notification" in df_main + else None + ) + + df_analytic_meta = df_main[["analytic"]].copy() + df_analytic_meta["analytic___plugin_id"] = df_analytic_meta["analytic"].apply( + lambda x: x.get("plugin_id", None) + ) + df_analytic_meta["analytic___plugin_version"] = df_analytic_meta[ + "analytic" + ].apply(lambda x: x.get("plugin_version", None)) + df_analytic_meta["analytic___plugin_type"] = df_analytic_meta["analytic"].apply( + lambda x: x.get("plugin_type", None) + ) + + # Drop the `analytic` column after extracting meta + df_analytic_meta.drop(columns=["analytic"], inplace=True) + + df_plugin_params = ( + pd.json_normalize( + df_main["analytic"].apply(lambda x: x.get("plugin_parameters", {})), + sep="___", + max_level=1, + ).add_prefix("analytic___plugin_parameters___") + if "analytic" in df_main + else None + ) + df_sidebar_params = ( + pd.json_normalize( + df_main["sidebar_params"], sep="___", max_level=1 + ).add_prefix("sidebar_params___") + if "sidebar_params" in df_main + else None + ) + + df_trigger = ( + pd.json_normalize(df_main["trigger"], sep="___", max_level=1).add_prefix( + "trigger___" + ) + if "trigger" in df_main + else None + ) + + # Drop original nested columns + df_main = df_main.drop( + columns=[ + "created_by", + "trigger", + "notification", + "analytic", + "sidebar_params", + ], + errors="ignore", + ) + + # Concatenate the processed DataFrames + df_final = pd.concat( + [ + df_main, + df_trigger, + df_created_by, + df_notification, + df_analytic_meta, + df_plugin_params, + df_sidebar_params, + ], + axis=1, + ) + + return df_final + + @mod_util + def delete_alert(self, alert_id: str, delete_all: bool, alert_group: str) -> None: + alerts = self.fetch_alerts_data() + alerts_ids_dict = {alert["id"]: alert for alert in alerts} + if alert_id: + if alert_id not in alerts_ids_dict: + print("\033[91mInvalid alert id.. not found in existing alerts\033[0m") + else: + _response = self.session.delete( + f"{self.base_url}/v1/obj/alert_config/{alert_id}" + ) + if _response.status_code in [200, 201]: + print( + f"\033[92mSuccessfully deleted alert with id :\033[0m `{alert_id}`" + ) + else: + print( + f"\033[91mFailed to delete alert with id:\033[0m {alert_id} \033[91mdue to:\033[0m {_response.text}" + ) + if delete_all: + for alert_id in alerts_ids_dict: + _response = self.session.delete( + f"{self.base_url}/v1/obj/alert_config/{alert_id}" + ) + if _response.status_code in [200, 201]: + print( + f"\033[92mSuccessfully deleted alert with id :\033[0m `{alert_id}`" + ) + else: + print( + f"\033[91mFailed to delete alert with id:\033[0m {alert_id} \033[91mdue to:\033[0m {_response.text}" + ) + if alert_group: + alerts = self.get_filtered_alerts_by_group(alerts, alert_group) + alert_ids = [data["id"] for data in alerts] + for alert_id in alert_ids: + _response = self.session.delete( + f"{self.base_url}/v1/obj/alert_config/{alert_id}" + ) + if _response.status_code in [200, 201]: + print( + f"\033[92mSuccessfully deleted alert with id :\033[0m `{alert_id}`" + ) + else: + print( + f"\033[91mFailed to delete alert with id:\033[0m {alert_id} \033[91mdue to:\033[0m {_response.text}" + ) + + # Convert string representations of dictionaries back to actual dictionaries + @mod_util + def convert_str_to_dict(self, df: pd.DataFrame) -> pd.DataFrame: + for col in df.columns: + df[col] = df[col].apply( + lambda x: ast.literal_eval(x) + if isinstance(x, str) and (x.startswith("{") or x.startswith("[")) + else x # type: ignore + ) + return df + + # Apply conversion + # Convert back to nested dictionary + @mod_util + def reconstruct_json(self, df: pd.DataFrame) -> List[Any]: + result = [] + for _, row in df.iterrows(): + item = {} # type: ignore + for col, value in row.items(): + keys = col.split("___") # Reverse the flattening + temp = item + for key in keys[:-1]: # Traverse nested keys + temp = temp.setdefault(key, {}) + temp[keys[-1]] = value + result.append(item) + return result + + # Get back the nested JSON + @mod_util + def remove_nan_keys(self, d: Dict[str, Any]) -> Any: + """Recursively remove keys with NaN values from a nested dictionary.""" + if isinstance(d, dict): + return { + k: self.remove_nan_keys(v) + for k, v in d.items() + if not (isinstance(v, (float, int)) and pd.isna(v)) + } + elif isinstance(d, list): + return [ + self.remove_nan_keys(v) + for v in d + if not (isinstance(v, (float, int)) and pd.isna(v)) + ] + elif isinstance(d, np.ndarray): + return np.array( + [ + self.remove_nan_keys(v) + for v in d + if not (isinstance(v, (float, int)) and pd.isna(v)) + ] + ) + else: + return d + + @mod_util + def create_alert(self, alert_type: str, dataframe: pd.DataFrame) -> None: + dataframe = self.convert_str_to_dict(dataframe) + json_data = self.reconstruct_json(dataframe) + if alert_type: + json_data = self.get_filtered_alerts_by_group(json_data, alert_type) + if json_data: + for new_alert in json_data: + keys_to_remove = [ + "incident_total", + "id", + "created_by", + "system_fixture", + "audit_keys", + "tombstone_epoch", + "tombstone", + "version", + "updatelocation", + "localtz", + "updatetime", + "capturetime_epoch", + "capturetime", + ] + for key in keys_to_remove: + new_alert.pop(key, None) + new_alert = self.remove_nan_keys(new_alert) + url = "{}{}".format(self.base_url, "/v1/obj/alert_config") + response = self.session.post(url, json=new_alert) + if response.status_code in [200, 201]: + print( + f"\033[92mSuccessfully added new alert {new_alert['display_name']}\033[0m" + ) + else: + print(f"\033[91m{response.text}\033[92m") diff --git a/tests/alerts/__init__.py b/tests/alerts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/alerts/alerts_data.py b/tests/alerts/alerts_data.py new file mode 100644 index 0000000..bfa0d4e --- /dev/null +++ b/tests/alerts/alerts_data.py @@ -0,0 +1,486 @@ +ALERT_PAYLOAD = { + "capturetime": "2025-03-03 08:10:42.863000", + "capturetime_epoch": 1740989442863, + "updatetime": "2025-03-03 08:10:42.863000", + "localtz": "UTC", + "updatelocation": "", + "version": 0, + "tombstone": False, + "tombstone_epoch": 0, + "audit_keys": ["id"], + "system_fixture": False, + "created_by": { + "_id": "8b8f8cdfab8df8c254d6b942", + "metadata": None, + "email": None, + "username": None, + "roles": None, + "is_active": False, + }, + "enabled": True, + "display_name": "Data latency updated FROM SMSDK", + "sidebar_params": { + "asset": {}, + "chartType": {}, + "specLimits": {}, + "yAxis": {}, + "dateRange": {}, + }, + "trigger": { + "trigger_type": "periodic", + "schedule": {"interval": {"every": 10, "period": "days"}}, + }, + "analytic": { + "plugin_id": "DataLatencyAlertingETL3", + "plugin_version": 1, + "plugin_type": "analytic", + "plugin_parameters": { + "output_type": "alert", + "time_selection": { + "time_type": "relative", + "relative_unit": "minute", + "relative_start": 14400, + }, + "asset_selection": {}, + "spec_limits": {}, + "analytics": {}, + "alert_config": { + "max_latency": {"every": 1, "period": "days"}, + "black_list": [], + "white_list": [ + "genmills-ms-rbg.v1.MS_RBG_ALL_LINE.PRODUCT_MATRIX_1st_Oct_v1" + ], + }, + }, + }, + "notification": { + "backend": { + "email": { + "silence_window": {"every": 1, "period": "days"}, + "email_list": [ + {"to_email": "cjadhav@sightmachine.com", "name": "chaitanya jadhav"} + ], + "subject": "Data Latency Alert", + } + } + }, + "id": "67c5640266af93c9e3095134", + "incident_total": 0, +} +ALERT_UPDATES = {"display_name": "Updated Name Data latency updated FROM SMSDK 2"} +UPDATED_ALERT = { + "analytic": { + "plugin_id": "DataLatencyAlertingETL3", + "plugin_parameters": { + "alert_config": { + "black_list": [], + "max_latency": {"every": 1, "period": "days"}, + "white_list": [ + "genmills-ms-rbg.v1.MS_RBG_ALL_LINE.PRODUCT_MATRIX_1st_Oct_v1" + ], + }, + "analytics": {}, + "asset_selection": {}, + "output_type": "alert", + "spec_limits": {}, + "time_selection": { + "relative_start": 14400, + "relative_unit": "minute", + "time_type": "relative", + }, + }, + "plugin_type": "analytic", + "plugin_version": 1, + }, + "display_name": "Updated Name Data latency updated FROM SMSDK 2", + "enabled": True, + "notification": { + "backend": { + "email": { + "email_list": [ + { + "name": "chaitanya " "jadhav", + "to_email": "cjadhav@sightmachine.com", + } + ], + "silence_window": {"every": 1, "period": "days"}, + "subject": "Data Latency Alert", + } + } + }, + "sidebar_params": { + "asset": {}, + "chartType": {}, + "dateRange": {}, + "specLimits": {}, + "yAxis": {}, + }, + "trigger": { + "schedule": {"interval": {"every": 10, "period": "days"}}, + "trigger_type": "periodic", + }, +} +ALERTS_LIST = [ + { + "capturetime": "2025-03-03 08:10:42.863000", + "capturetime_epoch": 1740989442863, + "updatetime": "2025-03-03 08:10:42.863000", + "localtz": "UTC", + "updatelocation": "", + "version": 0, + "tombstone": False, + "tombstone_epoch": 0, + "audit_keys": ["id"], + "system_fixture": False, + "created_by": { + "_id": "8b8f8cdfab8df8c254d6b942", + "metadata": None, + "email": None, + "username": None, + "roles": None, + "is_active": False, + }, + "enabled": True, + "display_name": "Data latency updated FROM SMSDK", + "sidebar_params": { + "asset": {}, + "chartType": {}, + "specLimits": {}, + "yAxis": {}, + "dateRange": {}, + }, + "trigger": { + "trigger_type": "periodic", + "schedule": {"interval": {"every": 10, "period": "days"}}, + }, + "analytic": { + "plugin_id": "DataLatencyAlertingETL3", + "plugin_version": 1, + "plugin_type": "analytic", + "plugin_parameters": { + "output_type": "alert", + "time_selection": { + "time_type": "relative", + "relative_unit": "minute", + "relative_start": 14400, + }, + "asset_selection": {}, + "spec_limits": {}, + "analytics": {}, + "alert_config": { + "max_latency": {"every": 1, "period": "days"}, + "black_list": [], + "white_list": [ + "genmills-ms-rbg.v1.MS_RBG_ALL_LINE.PRODUCT_MATRIX_1st_Oct_v1" + ], + }, + }, + }, + "notification": { + "backend": { + "email": { + "silence_window": {"every": 1, "period": "days"}, + "email_list": [ + { + "to_email": "cjadhav@sightmachine.com", + "name": "chaitanya jadhav", + } + ], + "subject": "Data Latency Alert", + } + } + }, + "id": "67c5640266af93c9e3095134", + "incident_total": 0, + }, + { + "capturetime": "2025-03-03 08:10:42.863000", + "capturetime_epoch": 1740989442863, + "updatetime": "2025-03-03 08:10:42.863000", + "localtz": "UTC", + "updatelocation": "", + "version": 0, + "tombstone": False, + "tombstone_epoch": 0, + "audit_keys": ["id"], + "system_fixture": False, + "created_by": { + "_id": "8b8f8cdfab8df8c254d6b942", + "metadata": None, + "email": None, + "username": None, + "roles": None, + "is_active": False, + }, + "enabled": True, + "display_name": "Data latency updated FROM SMSDK", + "sidebar_params": { + "asset": {}, + "chartType": {}, + "specLimits": {}, + "yAxis": {}, + "dateRange": {}, + }, + "trigger": { + "trigger_type": "periodic", + "schedule": {"interval": {"every": 10, "period": "days"}}, + }, + "analytic": { + "plugin_id": "DataLatencyAlertingETL3", + "plugin_version": 1, + "plugin_type": "analytic", + "plugin_parameters": { + "output_type": "alert", + "time_selection": { + "time_type": "relative", + "relative_unit": "minute", + "relative_start": 14400, + }, + "asset_selection": {}, + "spec_limits": {}, + "analytics": {}, + "alert_config": { + "max_latency": {"every": 1, "period": "days"}, + "black_list": [], + "white_list": [ + "genmills-ms-rbg.v1.MS_RBG_ALL_LINE.PRODUCT_MATRIX_1st_Oct_v1" + ], + }, + }, + }, + "notification": { + "backend": { + "email": { + "silence_window": {"every": 1, "period": "days"}, + "email_list": [ + { + "to_email": "cjadhav@sightmachine.com", + "name": "chaitanya jadhav", + } + ], + "subject": "Data Latency Alert", + } + } + }, + "id": "67c5640266af93c9e3095134", + "incident_total": 0, + }, + { + "capturetime": "2025-03-03 08:10:42.863000", + "capturetime_epoch": 1740989442863, + "updatetime": "2025-03-03 08:10:42.863000", + "localtz": "UTC", + "updatelocation": "", + "version": 0, + "tombstone": False, + "tombstone_epoch": 0, + "audit_keys": ["id"], + "system_fixture": False, + "created_by": { + "_id": "8b8f8cdfab8df8c254d6b942", + "metadata": None, + "email": None, + "username": None, + "roles": None, + "is_active": False, + }, + "enabled": True, + "display_name": "Data latency updated FROM SMSDK", + "sidebar_params": { + "asset": {}, + "chartType": {}, + "specLimits": {}, + "yAxis": {}, + "dateRange": {}, + }, + "trigger": { + "trigger_type": "periodic", + "schedule": {"interval": {"every": 10, "period": "days"}}, + }, + "analytic": { + "plugin_id": "SPC", + "plugin_version": 1, + "plugin_type": "analytic", + "plugin_parameters": { + "output_type": "alert", + "time_selection": { + "time_type": "relative", + "relative_unit": "minute", + "relative_start": 14400, + }, + "asset_selection": {}, + "spec_limits": {}, + "analytics": {}, + "alert_config": { + "max_latency": {"every": 1, "period": "days"}, + "black_list": [], + "white_list": [ + "genmills-ms-rbg.v1.MS_RBG_ALL_LINE.PRODUCT_MATRIX_1st_Oct_v1" + ], + }, + }, + }, + "notification": { + "backend": { + "email": { + "silence_window": {"every": 1, "period": "days"}, + "email_list": [ + { + "to_email": "cjadhav@sightmachine.com", + "name": "chaitanya jadhav", + } + ], + "subject": "Data Latency Alert", + } + } + }, + "id": "67c5640266af93c9e3095134", + "incident_total": 0, + }, +] +FILTERED_ALERTS = [ + { + "capturetime": "2025-03-03 08:10:42.863000", + "capturetime_epoch": 1740989442863, + "updatetime": "2025-03-03 08:10:42.863000", + "localtz": "UTC", + "updatelocation": "", + "version": 0, + "tombstone": False, + "tombstone_epoch": 0, + "audit_keys": ["id"], + "system_fixture": False, + "created_by": { + "_id": "8b8f8cdfab8df8c254d6b942", + "metadata": None, + "email": None, + "username": None, + "roles": None, + "is_active": False, + }, + "enabled": True, + "display_name": "Data latency updated FROM SMSDK", + "sidebar_params": { + "asset": {}, + "chartType": {}, + "specLimits": {}, + "yAxis": {}, + "dateRange": {}, + }, + "trigger": { + "trigger_type": "periodic", + "schedule": {"interval": {"every": 10, "period": "days"}}, + }, + "analytic": { + "plugin_id": "DataLatencyAlertingETL3", + "plugin_version": 1, + "plugin_type": "analytic", + "plugin_parameters": { + "output_type": "alert", + "time_selection": { + "time_type": "relative", + "relative_unit": "minute", + "relative_start": 14400, + }, + "asset_selection": {}, + "spec_limits": {}, + "analytics": {}, + "alert_config": { + "max_latency": {"every": 1, "period": "days"}, + "black_list": [], + "white_list": [ + "genmills-ms-rbg.v1.MS_RBG_ALL_LINE.PRODUCT_MATRIX_1st_Oct_v1" + ], + }, + }, + }, + "notification": { + "backend": { + "email": { + "silence_window": {"every": 1, "period": "days"}, + "email_list": [ + { + "to_email": "cjadhav@sightmachine.com", + "name": "chaitanya jadhav", + } + ], + "subject": "Data Latency Alert", + } + } + }, + "id": "67c5640266af93c9e3095134", + "incident_total": 0, + }, + { + "capturetime": "2025-03-03 08:10:42.863000", + "capturetime_epoch": 1740989442863, + "updatetime": "2025-03-03 08:10:42.863000", + "localtz": "UTC", + "updatelocation": "", + "version": 0, + "tombstone": False, + "tombstone_epoch": 0, + "audit_keys": ["id"], + "system_fixture": False, + "created_by": { + "_id": "8b8f8cdfab8df8c254d6b942", + "metadata": None, + "email": None, + "username": None, + "roles": None, + "is_active": False, + }, + "enabled": True, + "display_name": "Data latency updated FROM SMSDK", + "sidebar_params": { + "asset": {}, + "chartType": {}, + "specLimits": {}, + "yAxis": {}, + "dateRange": {}, + }, + "trigger": { + "trigger_type": "periodic", + "schedule": {"interval": {"every": 10, "period": "days"}}, + }, + "analytic": { + "plugin_id": "DataLatencyAlertingETL3", + "plugin_version": 1, + "plugin_type": "analytic", + "plugin_parameters": { + "output_type": "alert", + "time_selection": { + "time_type": "relative", + "relative_unit": "minute", + "relative_start": 14400, + }, + "asset_selection": {}, + "spec_limits": {}, + "analytics": {}, + "alert_config": { + "max_latency": {"every": 1, "period": "days"}, + "black_list": [], + "white_list": [ + "genmills-ms-rbg.v1.MS_RBG_ALL_LINE.PRODUCT_MATRIX_1st_Oct_v1" + ], + }, + }, + }, + "notification": { + "backend": { + "email": { + "silence_window": {"every": 1, "period": "days"}, + "email_list": [ + { + "to_email": "cjadhav@sightmachine.com", + "name": "chaitanya jadhav", + } + ], + "subject": "Data Latency Alert", + } + } + }, + "id": "67c5640266af93c9e3095134", + "incident_total": 0, + }, +] diff --git a/tests/alerts/test_alerts.py b/tests/alerts/test_alerts.py new file mode 100755 index 0000000..2b4d887 --- /dev/null +++ b/tests/alerts/test_alerts.py @@ -0,0 +1,87 @@ +from typing import Dict, Any, List +import pandas as pd +from unittest.mock import patch, MagicMock +from smsdk.smsdk_entities.alerts.alerts import Alerts +from tests.conftest import TENANT +from tests.alerts.alerts_data import ( + ALERT_PAYLOAD, + ALERT_UPDATES, + UPDATED_ALERT, + ALERTS_LIST, + FILTERED_ALERTS, +) + +# Define constants for test cases +ALERT_ID = 1 +GROUP_NAME = "data_latency" + + +def test_update_alert_payload(get_session: Any) -> None: + alerts = Alerts(get_session, TENANT or "genmills-ms-rbg") + assert alerts.update_alert_payload(ALERT_PAYLOAD, ALERT_UPDATES) == UPDATED_ALERT + + +def test_get_filtered_alerts_by_group(get_session: Any) -> None: + alerts = Alerts(get_session, TENANT or "genmills-ms-rbg") + assert ( + alerts.get_filtered_alerts_by_group(ALERTS_LIST, GROUP_NAME) == FILTERED_ALERTS + ) + + +@patch.object(Alerts, "get_alert_config") +def test_get_alert_config(mock_get_alert_config: MagicMock, get_session: Any) -> None: + mock_get_alert_config.return_value = ALERT_PAYLOAD + + alerts_instance = Alerts( + get_session, TENANT or "genmills-ms-rbg" + ) # Create an instance + assert alerts_instance.get_alert_config(1) == ALERT_PAYLOAD + + +@patch.object(Alerts, "update_alert") +def test_update_alert(mock_update_alert: MagicMock, get_session: Any) -> None: + mock_update_alert.return_value = True + alerts_instance = Alerts( + get_session, TENANT or "genmills-ms-rbg" + ) # Create an instance + assert alerts_instance.update_alert(ALERT_ID, ALERT_UPDATES) == True + + +@patch.object(Alerts, "update_alert_group") +def test_update_alert_group( + mock_update_alert_group: MagicMock, get_session: Any +) -> None: + mock_update_alert_group.return_value = True + alerts_instance = Alerts(get_session, TENANT or "genmills-ms-rbg") + assert alerts_instance.update_alert_group(GROUP_NAME, {"priority": "high"}) + + +@patch.object(Alerts, "fetch_alerts_data") +def test_fetch_alerts_data(mock_fetch_alerts_data: MagicMock, get_session: Any) -> None: + mock_fetch_alerts_data.return_value = [ALERT_PAYLOAD] + alerts_instance = Alerts(get_session, TENANT or "genmills-ms-rbg") + assert alerts_instance.fetch_alerts_data() == [ALERT_PAYLOAD] + + +@patch.object(Alerts, "delete_alert") +def test_delete_alert(mock_delete_alert: MagicMock, get_session: Any) -> None: + mock_delete_alert.return_value = True + alerts_instance = Alerts(get_session, TENANT or "genmills-ms-rbg") + assert alerts_instance.delete_alert(ALERT_ID) + + +@patch.object(Alerts, "list_alerts") +def test_list_alerts(mock_list_alerts: MagicMock, get_session: Any) -> None: + mock_list_alerts.return_value = ["alert1", "alert2"] + alerts_instance = Alerts(get_session, TENANT or "genmills-ms-rbg") + assert alerts_instance.list_alerts() == ["alert1", "alert2"] + + +@patch.object(Alerts, "get_alert_dataframe") +def test_get_alert_dataframe( + mock_get_alert_dataframe: MagicMock, get_session: Any +) -> None: + mock_df = MagicMock() + mock_get_alert_dataframe.return_value = mock_df + alerts_instance = Alerts(get_session, TENANT or "genmills-ms-rbg") + assert alerts_instance.get_alert_dataframe() == mock_df