Skip to content

Commit 4f25f52

Browse files
authored
Merge pull request #353 from Sanketika-Obsrv/release-1.8.0
Release 1.8.0
2 parents 7ad2455 + 92249e2 commit 4f25f52

File tree

17 files changed

+428
-85
lines changed

17 files changed

+428
-85
lines changed

api-service/src/configs/Config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ export const config = {
131131
},
132132
"user_token_public_key": process.env.user_token_public_key || "",
133133
"is_RBAC_enabled": process.env.is_rbac_enabled || "true",
134+
"telemetry_log": process.env.telemetry_log || '{"enable":true,"response_data":false}',
134135
"otel": {
135136
"enable": process.env.otel_enable || "false",
136137
"collector_endpoint": process.env.otel_collector_endpoint || "http://localhost:4318"

api-service/src/configs/alertsConfig.json

Lines changed: 140 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,54 +3,176 @@
33
"alerts": {
44
"dataset_metrics_flink": [
55
{
6-
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_ExtractorJob_dataset_id_extractor_failed_count[5m]))",
7-
"alias": "Number of Failed Extraction Events",
8-
"description": "This alert tracks how many events failed the extraction stage",
6+
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_ExtractorJob_dataset_id_extractor_failed_count[5m])) + sum(sum_over_time(flink_taskmanager_job_task_operator_ExtractorJob_dataset_id_extractor_duplicate_count[5m])) + sum(sum_over_time(flink_taskmanager_job_task_operator_PipelinePreprocessorJob_dataset_id_validator_failed_count[5m]))",
7+
"alias": "[DATASET]: Detected high rate of invalid data than expected",
8+
"category": "Processing",
9+
"severity": "critical",
10+
"code": "ALERT_1203",
11+
"description": "The dataset is unhealthy, and the query results may be incorrect",
12+
"summary": "Invalid data has been ingested in the system, preventing it from being processed. Henceforth, queries on this dataset may not return accurate data.",
913
"frequency": "5m",
1014
"interval": "5m",
1115
"operator": "gt",
1216
"threshold": 0
1317
},
1418
{
15-
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_ExtractorJob_dataset_id_extractor_duplicate_count[5m]))",
16-
"alias": "Number of Duplicate Extraction Events",
17-
"description": "This alert tracks how many duplicate events were found during extraction stage",
19+
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_PipelinePreprocessorJob_dataset_id_dedup_failed_count[5m]))",
20+
"alias": "[DATASET]: Detected higher rate of duplicate data than expected",
21+
"category": "Processing",
22+
"severity": "warning",
23+
"code": "ALERT_1204",
24+
"description": "The dataset is unhealthy, and the query results may be incorrect",
25+
"summary": "Duplicate data has been ingested in the system, preventing it from being processed. Henceforth, queries on this dataset may not return accurate data.",
1826
"frequency": "5m",
1927
"interval": "5m",
2028
"operator": "gt",
2129
"threshold": 0
2230
},
2331
{
24-
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_PipelinePreprocessorJob_dataset_id_dedup_failed_count[5m]))",
25-
"alias": "Number of Duplicate Preprocessing Events",
26-
"description": "This alert tracks how many duplicate events were found during preprocessing stage",
32+
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_DenormalizerJob_dataset_id_denorm_failed[5m])) + sum(sum_over_time(flink_taskmanager_job_task_operator_DenormalizerJob_dataset_id_denorm_partial_success[5m]))",
33+
"alias": "[DATASET]: Detected higher incidence of failures during data enrichment.",
34+
"category": "Processing",
35+
"severity": "warning",
36+
"code": "ALERT_1205",
37+
"description": "The dataset is unhealthy, and the query results may be incorrect",
38+
"summary": "The data ingested into the system is failing the enrichment process, which may cause queries on this dataset to return inaccurate data.",
2739
"frequency": "5m",
2840
"interval": "5m",
2941
"operator": "gt",
3042
"threshold": 0
3143
},
3244
{
33-
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_PipelinePreprocessorJob_dataset_id_validator_failed_count[5m]))",
34-
"alias": "Number of Failed Validation Events",
35-
"description": "This alert tracks how many events failed the validation stage",
45+
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_TransformerJob_dataset_id_transform_failed_count[5m])) + sum(sum_over_time(flink_taskmanager_job_task_operator_TransformerJob_dataset_id_transform_partial_count[5m]))",
46+
"alias": "[DATASET]: Detected higher incidence of failures during data transformations.",
47+
"category": "Processing",
48+
"severity": "warning",
49+
"code": "ALERT_1206",
50+
"description": "The dataset is unhealthy, and the query results may be incorrect",
51+
"summary": "The data ingested into the system is failing the data transformation process, which may cause queries on this dataset to return inaccurate data.",
3652
"frequency": "5m",
3753
"interval": "5m",
3854
"operator": "gt",
3955
"threshold": 0
4056
},
4157
{
42-
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_DenormalizerJob_dataset_id_denorm_failed[5m])) + sum(sum_over_time(flink_taskmanager_job_task_operator_DenormalizerJob_dataset_id_denorm_partial_success[5m]))",
43-
"alias": "Number of Failed Denorm Events",
44-
"description": "This alert tracks how many events failed the denorm stage",
58+
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_PipelinePreprocessorJob_dataset_id_validator_total_count[$__range]))",
59+
"alias": "[DATASET]: No data has been received for the past hour.",
60+
"category": "Processing",
61+
"severity": "warning",
62+
"code": "ALERT_1209",
63+
"description": "The dataset hasn’t received any new data for the past hour, which may affect the querying of the new data.",
64+
"summary": "The dataset has not received any new data, which will impact real-time data processing",
65+
"frequency": "5m",
66+
"interval": "60m",
67+
"operator": "lt",
68+
"threshold": 1
69+
}
70+
],
71+
"dataset_metrics_druid": [
72+
{
73+
"metric": "max(druid_supervisors{supervisor_name=\"dataset_id\", state=\"RUNNING\"} or (0 * absent(druid_supervisors{supervisor_name=\"dataset_id\", state=\"RUNNING\"})))",
74+
"alias": "[DATASET]: Druid supervisor is in an unhealthy state",
75+
"category": "Querying",
76+
"severity": "critical",
77+
"code": "ALERT_1309",
78+
"description": "The dataset is unhealthy, and no new data has been available for querying since the system encountered the issue.",
79+
"summary": "The associated Druid Supervisor is in an unhealthy state, preventing druid ingestion tasks from running. As a result, real-time data cannot be queried.",
80+
"frequency": "5m",
81+
"interval": "5m",
82+
"operator": "lt",
83+
"threshold": 1
84+
},
85+
{
86+
"metric": "druid_ingest_events_unparseable_total{dataSource=\"dataset_id\"}",
87+
"alias": "[DATASET]: Detected higher amount of unparseable data.",
88+
"flattened": true,
89+
"category": "Querying",
90+
"severity": "critical",
91+
"code": "ALERT_1308",
92+
"description": "The dataset is unhealthy, and the query results may be incorrect",
93+
"summary": "Unparseable data has been detected in the system, preventing it from being processed. Henceforth, queries on this dataset may not return accurate data until the issue is resolved.",
4594
"frequency": "5m",
4695
"interval": "5m",
4796
"operator": "gt",
4897
"threshold": 0
4998
},
5099
{
51-
"metric": "sum(sum_over_time(flink_taskmanager_job_task_operator_TransformerJob_dataset_id_transform_failed_count[5m])) + sum(sum_over_time(flink_taskmanager_job_task_operator_TransformerJob_dataset_id_transform_partial_count[5m]))",
52-
"alias": "Number of Failed Transformer Events",
53-
"description": "This alert tracks how many events failed the transformation stage",
100+
"metric": "druid_ingest_kafka_lag{dataSource=\"dataset_id\"}",
101+
"alias": "[DATASET]: Detected higher amount of query lag than expected.",
102+
"category": "Querying",
103+
"flattened": true,
104+
"severity": "critical",
105+
"code": "ALERT_1307",
106+
"description": "A large amount of data is still waiting to be processed. This may cause delays in querying the most recent data",
107+
"summary": "High indexer lag in the dataset indicates processing of new data is delayed. Because of this delay, new data isn’t available when querying the dataset.",
108+
"frequency": "5m",
109+
"interval": "60m",
110+
"operator": "gt",
111+
"threshold": 5000000
112+
},
113+
{
114+
"metric": "druid_ingest_kafka_lag{dataSource=\"dataset_id\"}",
115+
"alias": "[DATASET]: Druid Supervisor Ingestion Failure Due to Offsets.",
116+
"category": "Querying",
117+
"flattened": true,
118+
"severity": "critical",
119+
"code": "ALERT_1312",
120+
"description": "The dataset is unhealthy, and no new data has been available for querying since the issue occurred",
121+
"summary": "The supervisor is experiencing a negative offset, preventing it from ingesting new data. As a result, real-time data is unavailable for querying.",
122+
"frequency": "5m",
123+
"interval": "5m",
124+
"operator": "lt",
125+
"threshold": 0
126+
},
127+
{
128+
"metric": "count(druid_tasks_duration{task_status='FAILED', datasource='dataset_id'}) OR on() vector(0)",
129+
"alias": "[DATASET]: Druid tasks are in an unhealthy state",
130+
"category": "Querying",
131+
"severity": "critical",
132+
"code": "ALERT_1310",
133+
"description": "The dataset is unhealthy, and no new data has been available for querying since the system encountered the issue.",
134+
"summary": "The Druid ingestion tasks are in an unhealthy state, causing data ingestion delays and failures. As a result, real-time data may not be available for querying.",
135+
"frequency": "5m",
136+
"interval": "5m",
137+
"operator": "gt",
138+
"threshold": 0
139+
}
140+
],
141+
"api_metric": [
142+
{
143+
"metric": "sum(sum_over_time(node_failed_api_calls{dataset_id='<dataset_id>', id='api.data.out'}[$__range])) or vector(0)",
144+
"alias": "[DATASET]: The Data Query API is encountering more failures to retrieve the data",
145+
"category": "Querying",
146+
"severity": "warning",
147+
"code": "ALERT_1305",
148+
"description": "The dataset has been unavailable for querying data for an extented period",
149+
"summary": "Query failures are preventing access to the dataset, resulting in an inability to retrieve data as expected.",
150+
"frequency": "5m",
151+
"interval": "5m",
152+
"operator": "gt",
153+
"threshold": 0
154+
},
155+
{
156+
"metric": "avg(avg_over_time(node_query_response_time{dataset_id='<dataset_id>', id='api.data.out'}[$__range])) or vector(0)",
157+
"alias": "[DATASET]: The Data Query API is facing delays in retrieving data",
158+
"category": "Querying",
159+
"severity": "warning",
160+
"code": "ALERT_1306",
161+
"description": "There is a delay in querying the dataset for an extended period.",
162+
"summary": "Delays in queries are affecting access to the dataset, leading to delayed data retrieval.",
163+
"frequency": "5m",
164+
"interval": "5m",
165+
"operator": "gt",
166+
"threshold": 1000
167+
},
168+
{
169+
"metric": "sum(sum_over_time(node_failed_api_calls{dataset_id='<dataset_id>', id='api.data.in'}[$__range])) or vector(0)",
170+
"alias": "[DATASET]: Failed to ingest data into the system",
171+
"category": "Ingestion",
172+
"severity": "warning",
173+
"code": "ALERT_1101",
174+
"description": "Detected failures while adding new data to the dataset.",
175+
"summary": "Failed to add new data to the dataset, impacting real-time data availability.",
54176
"frequency": "5m",
55177
"interval": "5m",
56178
"operator": "gt",

api-service/src/controllers/Alerts/Metric.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ const telemetryObject = { type: "metric", ver: "1.0.0" };
1111
const createMetricHandler = async (req: Request, res: Response, next: NextFunction) => {
1212
try {
1313
const { component } = req.body;
14-
const transformComponent = _.toLower(component);
15-
const metricsBody = await Metrics.create({ ...(req.body), component: transformComponent });
14+
const metricsBody = await Metrics.create({ ...(req.body), component: component });
1615
updateTelemetryAuditEvent({ request: req, object: { id: metricsBody?.dataValues?.id, ...telemetryObject } });
1716
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: { id: metricsBody.dataValues.id } });
1817
} catch (error: any) {

api-service/src/controllers/DataIngestion/DataIngestionController.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const requestValidation = async (req: Request) => {
2121
if (_.isEmpty(dataset)) {
2222
throw obsrvError(datasetKey, "DATASET_NOT_FOUND", `Dataset with id/alias name '${datasetKey}' not found`, "NOT_FOUND", 404)
2323
}
24+
_.set(req, "body.request.dataset_id", dataset.dataset_id);
2425
return dataset
2526
}
2627

api-service/src/controllers/DataOut/DataOutController.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { datasetService } from "../../services/DatasetService";
1010
import { obsrvError } from "../../types/ObsrvError";
1111

1212
export const apiId = "api.data.out";
13+
export const query_data = {"data": {}};
1314

1415
const requestValidation = async (req: Request) => {
1516
const datasourceKey = req.params?.dataset_id;
@@ -21,6 +22,7 @@ const requestValidation = async (req: Request) => {
2122
if (_.isEmpty(datasource)) {
2223
throw obsrvError(datasourceKey, "DATASET_NOT_FOUND", `Dataset with id/alias name '${datasourceKey}' not found`, "NOT_FOUND", 404)
2324
}
25+
_.set(req, "body.request.dataset_id", datasource.dataset_id);
2426
return datasource
2527
}
2628

@@ -34,6 +36,7 @@ const dataOut = async (req: Request, res: Response) => {
3436

3537
if (isValidQuery === true && _.isObject(query)) {
3638
const result = await executeNativeQuery(query);
39+
_.set(query_data, "data", result.data);
3740
logger.info({ apiId, msgid, requestBody, datasetId, message: "Native query executed successfully" })
3841
return ResponseHandler.successResponse(req, res, {
3942
status: 200, data: result?.data
@@ -42,6 +45,7 @@ const dataOut = async (req: Request, res: Response) => {
4245

4346
if (isValidQuery === true && _.isString(query)) {
4447
const result = await executeSqlQuery({ query })
48+
_.set(query_data, "data", result.data);
4549
logger.info({ apiId, msgid, requestBody, datasetId, message: "SQL query executed successfully" })
4650
return ResponseHandler.successResponse(req, res, {
4751
status: 200, data: result?.data

api-service/src/controllers/QueryWrapper/SqlQueryWrapper.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { AxiosResponse } from "axios";
1010

1111
const apiId = "api.obsrv.data.sql-query";
1212
const errorCode = "SQL_QUERY_FAILURE"
13+
export const result_data = {"data": {}};
1314

1415
export const sqlQuery = async (req: Request, res: Response) => {
1516
const resmsgid = _.get(res, "resmsgid");
@@ -26,7 +27,6 @@ export const sqlQuery = async (req: Request, res: Response) => {
2627
errCode: "BAD_REQUEST"
2728
} as ErrorObject, req, res);
2829
}
29-
3030
const query = req.body.query as string;
3131
let result: AxiosResponse;
3232
if (isTableSchemaQuery(query)) {
@@ -37,6 +37,7 @@ export const sqlQuery = async (req: Request, res: Response) => {
3737
headers: { Authorization: authorization },
3838
});
3939
}
40+
_.set(result_data, "data", result.data);
4041
logger.info({ messsge: "Successfully fetched data using sql query", apiId, resmsgid })
4142
ResponseHandler.flatResponse(req, res, result)
4243
} catch (error: any) {

api-service/src/metrics/prometheus/helpers.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,20 @@ export const onGone = (req: any, res: Response) => {
4444
}
4545

4646
export const onObsrvFailure = (req: any, res: Response,error: ObsrvError) => {
47-
const { duration = 0, metricLabels }: Metric = getMetricLabels(req, res)
48-
metricLabels.dataset_id = error.datasetId
47+
const { duration = 0, metricLabels }: Metric = getMetricLabels(req, res, error)
4948
const { statusCode = 404 } = res
5049
const labels = { ...metricLabels, status: statusCode }
5150
duration && setQueryResponseTime({ duration, labels })
5251
incrementApiCalls({ labels })
5352
incrementFailedApiCalls({ labels });
5453
}
5554

56-
const getMetricLabels = (req: any, res: Response) => {
55+
const getMetricLabels = (req: any, res: Response, errorBody?: ObsrvError) => {
5756
const { id, entity, originalUrl, startTime } = req;
5857
const { statusCode = 200 } = res
5958
const request_size = req.socket.bytesRead
6059
const response_size = res.getHeader("content-length");
61-
const dataset_id = _.get(req, ["body", "request", "dataset_id"]) || _.get(req, ["params", "dataset_id"]) || null
60+
const dataset_id = _.get(req, ["body", "request", "dataset_id"]) || _.get(req, ["params", "dataset_id"]) || _.get(errorBody, "datasetId") || null
6261
const duration = getDuration(startTime);
6362
const metricLabels = { entity, id, endpoint: originalUrl, dataset_id, status: statusCode, request_size, response_size }
6463
return { duration, metricLabels }

0 commit comments

Comments
 (0)