Skip to content

Commit 7d5f7c1

Browse files
authored
Merge pull request #349 from Sanketika-Obsrv/alerts-rules
feat #OBS-I623 : changes to create infra and dataset level alerts v2
2 parents 7ad2455 + b244792 commit 7d5f7c1

File tree

10 files changed

+223
-58
lines changed

10 files changed

+223
-58
lines changed

api-service/src/configs/alertsConfig.json

Lines changed: 140 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,54 +3,176 @@
33
"alerts": {
44
"dataset_metrics_flink": [
55
{
6-
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_ExtractorJob_dataset_id_extractor_failed_count[5m]))",
7-
"alias": "Number of Failed Extraction Events",
8-
"description": "This alert tracks how many events failed the extraction stage",
6+
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_ExtractorJob_dataset_id_extractor_failed_count[5m])) + sum(sum_over_time(flink_taskmanager_job_task_operator_ExtractorJob_dataset_id_extractor_duplicate_count[5m])) + sum(sum_over_time(flink_taskmanager_job_task_operator_PipelinePreprocessorJob_dataset_id_validator_failed_count[5m]))",
7+
"alias": "[DATASET]: Detected high rate of invalid data than expected",
8+
"category": "Processing",
9+
"severity": "critical",
10+
"code": "ALERT_1203",
11+
"description": "The dataset is unhealthy, and the query results may be incorrect",
12+
"summary": "Invalid data has been ingested in the system, preventing it from being processed. Henceforth, queries on this dataset may not return accurate data.",
913
"frequency": "5m",
1014
"interval": "5m",
1115
"operator": "gt",
1216
"threshold": 0
1317
},
1418
{
15-
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_ExtractorJob_dataset_id_extractor_duplicate_count[5m]))",
16-
"alias": "Number of Duplicate Extraction Events",
17-
"description": "This alert tracks how many duplicate events were found during extraction stage",
19+
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_PipelinePreprocessorJob_dataset_id_dedup_failed_count[5m]))",
20+
"alias": "[DATASET]: Detected higher rate of duplicate data than expected",
21+
"category": "Processing",
22+
"severity": "warning",
23+
"code": "ALERT_1204",
24+
"description": "The dataset is unhealthy, and the query results may be incorrect",
25+
"summary": "Duplicate data has been ingested in the system, preventing it from being processed. Henceforth, queries on this dataset may not return accurate data.",
1826
"frequency": "5m",
1927
"interval": "5m",
2028
"operator": "gt",
2129
"threshold": 0
2230
},
2331
{
24-
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_PipelinePreprocessorJob_dataset_id_dedup_failed_count[5m]))",
25-
"alias": "Number of Duplicate Preprocessing Events",
26-
"description": "This alert tracks how many duplicate events were found during preprocessing stage",
32+
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_DenormalizerJob_dataset_id_denorm_failed[5m])) + sum(sum_over_time(flink_taskmanager_job_task_operator_DenormalizerJob_dataset_id_denorm_partial_success[5m]))",
33+
"alias": "[DATASET]: Detected higher incidence of failures during data enrichment.",
34+
"category": "Processing",
35+
"severity": "warning",
36+
"code": "ALERT_1205",
37+
"description": "The dataset is unhealthy, and the query results may be incorrect",
38+
"summary": "The data ingested into the system is failing the enrichment process, which may cause queries on this dataset to return inaccurate data.",
2739
"frequency": "5m",
2840
"interval": "5m",
2941
"operator": "gt",
3042
"threshold": 0
3143
},
3244
{
33-
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_PipelinePreprocessorJob_dataset_id_validator_failed_count[5m]))",
34-
"alias": "Number of Failed Validation Events",
35-
"description": "This alert tracks how many events failed the validation stage",
45+
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_TransformerJob_dataset_id_transform_failed_count[5m])) + sum(sum_over_time(flink_taskmanager_job_task_operator_TransformerJob_dataset_id_transform_partial_count[5m]))",
46+
"alias": "[DATASET]: Detected higher incidence of failures during data transformations.",
47+
"category": "Processing",
48+
"severity": "warning",
49+
"code": "ALERT_1206",
50+
"description": "The dataset is unhealthy, and the query results may be incorrect",
51+
"summary": "The data ingested into the system is failing the data transformation process, which may cause queries on this dataset to return inaccurate data.",
3652
"frequency": "5m",
3753
"interval": "5m",
3854
"operator": "gt",
3955
"threshold": 0
4056
},
4157
{
42-
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_DenormalizerJob_dataset_id_denorm_failed[5m])) + sum(sum_over_time(flink_taskmanager_job_task_operator_DenormalizerJob_dataset_id_denorm_partial_success[5m]))",
43-
"alias": "Number of Failed Denorm Events",
44-
"description": "This alert tracks how many events failed the denorm stage",
58+
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_PipelinePreprocessorJob_dataset_id_validator_total_count[$__range]))",
59+
"alias": "[DATASET]: No data has been received for the past hour.",
60+
"category": "Processing",
61+
"severity": "warning",
62+
"code": "ALERT_1209",
63+
"description": "The dataset hasn’t received any new data for the past hour, which may affect the querying of the new data.",
64+
"summary": "The dataset has not received any new data, which will impact real-time data processing",
65+
"frequency": "5m",
66+
"interval": "60m",
67+
"operator": "lt",
68+
"threshold": 1
69+
}
70+
],
71+
"dataset_metrics_druid": [
72+
{
73+
"metric": "max(druid_supervisors{supervisor_name=\"dataset_id\", state=\"RUNNING\"} or (0 * absent(druid_supervisors{supervisor_name=\"dataset_id\", state=\"RUNNING\"})))",
74+
"alias": "[DATASET]: Druid supervisor is in an unhealthy state",
75+
"category": "Querying",
76+
"severity": "critical",
77+
"code": "ALERT_1309",
78+
"description": "The dataset is unhealthy, and no new data has been available for querying since the system encountered the issue.",
79+
"summary": "The associated Druid Supervisor is in an unhealthy state, preventing druid ingestion tasks from running. As a result, real-time data cannot be queried.",
80+
"frequency": "5m",
81+
"interval": "5m",
82+
"operator": "lt",
83+
"threshold": 1
84+
},
85+
{
86+
"metric": "druid_ingest_events_unparseable_total{dataSource=\"dataset_id\"}",
87+
"alias": "[DATASET]: Detected higher amount of unparseable data.",
88+
"flattened": true,
89+
"category": "Querying",
90+
"severity": "critical",
91+
"code": "ALERT_1308",
92+
"description": "The dataset is unhealthy, and the query results may be incorrect",
93+
"summary": "Unparseable data has been detected in the system, preventing it from being processed. Henceforth, queries on this dataset may not return accurate data until the issue is resolved.",
4594
"frequency": "5m",
4695
"interval": "5m",
4796
"operator": "gt",
4897
"threshold": 0
4998
},
5099
{
51-
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_TransformerJob_dataset_id_transform_failed_count[5m])) + sum(sum_over_time(flink_taskmanager_job_task_operator_TransformerJob_dataset_id_transform_partial_count[5m]))",
52-
"alias": "Number of Failed Transformer Events",
53-
"description": "This alert tracks how many events failed the transformation stage",
100+
"metric": "druid_ingest_kafka_lag{dataSource=\"dataset_id\"}",
101+
"alias": "[DATASET]: Detected higher amount of query lag than expected.",
102+
"category": "Querying",
103+
"flattened": true,
104+
"severity": "critical",
105+
"code": "ALERT_1307",
106+
"description": "A large amount of data is still waiting to be processed. This may cause delays in querying the most recent data",
107+
"summary": "High indexer lag in the dataset indicates processing of new data is delayed. Because of this delay, new data isn’t available when querying the dataset.",
108+
"frequency": "5m",
109+
"interval": "60m",
110+
"operator": "gt",
111+
"threshold": 5000000
112+
},
113+
{
114+
"metric": "druid_ingest_kafka_lag{dataSource=\"dataset_id\"}",
115+
"alias": "[DATASET]: Druid Supervisor Ingestion Failure Due to Offsets.",
116+
"category": "Querying",
117+
"flattened": true,
118+
"severity": "critical",
119+
"code": "ALERT_1312",
120+
"description": "The dataset is unhealthy, and no new data has been available for querying since the issue occurred",
121+
"summary": "The supervisor is experiencing a negative offset, preventing it from ingesting new data. As a result, real-time data is unavailable for querying.",
122+
"frequency": "5m",
123+
"interval": "5m",
124+
"operator": "lt",
125+
"threshold": 0
126+
},
127+
{
128+
"metric": "count(druid_tasks_duration{task_status='FAILED', datasource='dataset_id'}) OR on() vector(0)",
129+
"alias": "[DATASET]: Druid tasks are in an unhealthy state",
130+
"category": "Querying",
131+
"severity": "critical",
132+
"code": "ALERT_1310",
133+
"description": "The dataset is unhealthy, and no new data has been available for querying since the system encountered the issue.",
134+
"summary": "The Druid ingestion tasks are in an unhealthy state, causing data ingestion delays and failures. As a result, real-time data may not be available for querying.",
135+
"frequency": "5m",
136+
"interval": "5m",
137+
"operator": "gt",
138+
"threshold": 0
139+
}
140+
],
141+
"api_metric": [
142+
{
143+
"metric": "sum(sum_over_time(node_failed_api_calls{dataset_id='<dataset_id>', id='api.data.out'}[$__range]))",
144+
"alias": "[DATASET]: The Data Query API is encountering more failures to retrieve the data",
145+
"category": "Querying",
146+
"severity": "warning",
147+
"code": "ALERT_1305",
148+
"description": "The dataset has been unavailable for querying data for an extented period",
149+
"summary": "Query failures are preventing access to the dataset, resulting in an inability to retrieve data as expected.",
150+
"frequency": "5m",
151+
"interval": "5m",
152+
"operator": "gt",
153+
"threshold": 0
154+
},
155+
{
156+
"metric": "avg(avg_over_time(node_query_response_time{dataset_id='<dataset_id>', id='api.data.out'}[$__range]))",
157+
"alias": "[DATASET]: The Data Query API is facing delays in retrieving data",
158+
"category": "Querying",
159+
"severity": "warning",
160+
"code": "ALERT_1306",
161+
"description": "There is a delay in querying the dataset for an extended period.",
162+
"summary": "Delays in queries are affecting access to the dataset, leading to delayed data retrieval.",
163+
"frequency": "5m",
164+
"interval": "5m",
165+
"operator": "gt",
166+
"threshold": 1000
167+
},
168+
{
169+
"metric": "sum(sum_over_time(node_failed_api_calls{dataset_id='<dataset_id>', id='api.data.in'}[$__range]))",
170+
"alias": "[DATASET]: Failed to ingest data into the system",
171+
"category": "Ingestion",
172+
"severity": "warning",
173+
"code": "ALERT_1101",
174+
"description": "Detected failures while adding new data to the dataset.",
175+
"summary": "Failed to add new data to the dataset, impacting real-time data availability.",
54176
"frequency": "5m",
55177
"interval": "5m",
56178
"operator": "gt",

api-service/src/controllers/Alerts/Metric.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ const telemetryObject = { type: "metric", ver: "1.0.0" };
1111
const createMetricHandler = async (req: Request, res: Response, next: NextFunction) => {
1212
try {
1313
const { component } = req.body;
14-
const transformComponent = _.toLower(component);
15-
const metricsBody = await Metrics.create({ ...(req.body), component: transformComponent });
14+
const metricsBody = await Metrics.create({ ...(req.body), component: component });
1615
updateTelemetryAuditEvent({ request: req, object: { id: metricsBody?.dataValues?.id, ...telemetryObject } });
1716
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { id: metricsBody.dataValues.id } });
1817
} catch (error: any) {

api-service/src/services/AlertManagerService.ts

Lines changed: 61 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,20 @@ import { Alert } from '../models/Alert';
66
import { alertConfig } from './AlertsConfigSevice';
77
import Transaction from "sequelize/types/transaction";
88

9+
interface MetricConfig {
10+
metric: string;
11+
alias: string;
12+
category: string;
13+
description: string;
14+
frequency: string;
15+
interval: string;
16+
code: string;
17+
severity: string;
18+
summary?: string;
19+
operator: string;
20+
threshold: number;
21+
flattened?: boolean;
22+
}
923

1024
class AlertManagerService {
1125
private config: any;
@@ -14,54 +28,72 @@ class AlertManagerService {
1428
this.config = alertConfig.find('configs.alerts');
1529
}
1630

17-
private getModifiedMetric = (service: string, metric: any, datasetId: string): any => {
31+
private getModifiedMetric = (service: string, metric: any, datasetId: string, datasource_ref?: string): any => {
1832
const metricData = _.cloneDeep(metric);
1933
if (service === 'flink') {
2034
const modifiedSubstring = datasetId.replace(/-/g, '_');
21-
metricData.metric = metricData.metric.replace('dataset_id', modifiedSubstring);
22-
} else {
35+
metricData.metric = metricData.metric.replaceAll('dataset_id', modifiedSubstring);
36+
}
37+
else if (service === 'druid') {
38+
metricData.metric = metricData.metric.replaceAll('dataset_id',
39+
metricData.flattened ? (datasource_ref || '').replace(/-/g, '_') : datasource_ref
40+
);
41+
}
42+
else if (service === 'api') {
43+
metricData.metric = metricData.metric.replaceAll('<dataset_id>', datasetId);
44+
}
45+
else {
2346
metricData.metric = metricData.metric.replace('dataset_id', datasetId);
2447
}
2548
return metricData;
2649
}
2750

28-
private createAlerts = async (params: { datasetId: string; service: string; metric: any; transaction: Transaction }): Promise<void> => {
29-
const { datasetId, service, metric, transaction } = params;
30-
const metricData = this.getModifiedMetric(service, metric, datasetId);
51+
private createAlerts = async (params: { datasetId: string; service: string; metric: any; transaction: Transaction, datasource_ref?: string }): Promise<void> => {
52+
const { datasetId, service, metric, transaction, datasource_ref } = params;
53+
const metricData = this.getModifiedMetric(service, metric, datasetId, datasource_ref);
3154

55+
const dataset_id = datasource_ref ? datasource_ref : datasetId
3256
const metricPayload = {
33-
alias: `${metricData.alias} (${datasetId})`,
34-
component: 'datasets',
57+
alias: `${metricData.alias} (${dataset_id})`,
58+
component: metricData.category,
3559
subComponent: datasetId,
3660
metric: metricData.metric,
3761
context: {
3862
datasetId: datasetId,
3963
},
4064
};
4165

42-
await this.createMetric(metricPayload, transaction);
43-
await this.createAlertRule({ datasetId, metricData, transaction });
66+
const response = await this.createMetric(metricPayload, transaction);
67+
await this.createAlertRule({ datasetId, datasource_ref, metricData, transaction, metricId: response.dataValues.id });
4468
}
4569

4670
private createAlertRule = async (params: {
4771
datasetId: string;
4872
metricData: any;
49-
transaction: Transaction
73+
transaction: Transaction;
74+
metricId: string;
75+
datasource_ref?: string | null;
5076
}): Promise<void> => {
51-
const { datasetId, metricData, transaction } = params;
52-
const datasetName = datasetId.replace(/[-.]/g, ' ').replace(/\b\w/g, c => _.toUpper(c));
77+
const { datasetId, metricData, transaction, metricId, datasource_ref = null } = params;
78+
const dataset = datasource_ref ? datasource_ref : datasetId
79+
const datasetName = dataset.replace(/[-.]/g, ' ').replace(/\b\w/g, c => _.toUpper(c));
5380
const alertPayload = {
54-
name: `${metricData.alias} (${datasetName})`,
81+
name: metricData.alias.replace('[DATASET]', `[DATASET][${datasetName}]`),
5582
manager: 'grafana',
5683
description: metricData.description,
57-
category: 'datasets',
84+
category: metricData.category,
5885
frequency: metricData.frequency,
5986
interval: metricData.interval,
6087
context: { alertType: 'SYSTEM' },
61-
labels: { component: 'obsrv' },
88+
labels: { alert_code: metricData.code, component: 'obsrv', dataset: datasetId, table: datasource_ref },
89+
severity: metricData.severity,
90+
annotations: {
91+
summary: _.get(metricData, 'summary', ''),
92+
},
6293
metadata: {
6394
queryBuilderContext: {
64-
category: 'datasets',
95+
category: metricData.category,
96+
id: metricId,
6597
subComponent: datasetId,
6698
metric: metricData.metric,
6799
operator: metricData.operator,
@@ -97,13 +129,20 @@ class AlertManagerService {
97129
return Alert.create(alertData, { transaction });
98130
}
99131

100-
public createDatasetAlertsDraft = async (dataset: Record<string, any>, transaction: Transaction): Promise<void> => {
101-
for (const metric of this.config.dataset_metrics_flink) {
132+
public createDatasetAlertsDraft = async (dataset: Record<string, any>, transaction: Transaction, datasource_ref: string): Promise<void> => {
133+
const allMetrics = [
134+
...this.config.dataset_metrics_flink.map((metric: MetricConfig) => ({ service: 'flink', metric })),
135+
...this.config.dataset_metrics_druid.map((metric: MetricConfig) => ({ service: 'druid', metric })),
136+
...this.config.api_metric.map((metric: MetricConfig) => ({ service: 'api', metric }))
137+
];
138+
139+
for (const { service, metric } of allMetrics) {
102140
await this.createAlerts({
103141
datasetId: dataset.dataset_id,
104-
service: "flink",
105-
metric: metric,
106-
transaction
142+
service,
143+
metric,
144+
transaction,
145+
...(service === 'druid' ? { datasource_ref } : {})
107146
});
108147
}
109148
}

0 commit comments

Comments
 (0)