Skip to content

Commit cd63c7d

Browse files
authored
Merge pull request #361 from Sanketika-Obsrv/release-1.10.0
Release 1.10.0
2 parents b14bf6a + be1e842 commit cd63c7d

File tree

13 files changed

+287
-154
lines changed

13 files changed

+287
-154
lines changed

api-service/src/configs/Config.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ export const config = {
2929
"sql_query_path": "/druid/v2/sql/",
3030
"native_query_path": "/druid/v2",
3131
"list_datasources_path": "/druid/v2/datasources",
32-
"submit_ingestion": "druid/indexer/v1/supervisor"
32+
"submit_ingestion": "druid/indexer/v1/supervisor",
33+
"username": process.env.druid_username || "admin",
34+
"password": process.env.druid_password || "admin123"
3335
},
3436
"prometheus": {
3537
"url": process.env.prometheus_url || "http://localhost:9090"

api-service/src/connections/druidConnection.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,50 @@ import * as _ from "lodash";
33
import { config } from "../configs/Config";
44
const druidPort = _.get(config, "query_api.druid.port");
55
const druidHost = _.get(config, "query_api.druid.host");
6+
const druidUsername = _.get(config, "query_api.druid.username");
7+
const druidPassword = _.get(config, "query_api.druid.password");
68
const nativeQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.native_query_path}`;
79
const sqlQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.sql_query_path}`;
810

911
export const executeNativeQuery = async (payload: any) => {
10-
const queryResult = await axios.post(nativeQueryEndpoint, payload)
12+
const queryResult = await axios.post(nativeQueryEndpoint, payload,
13+
{
14+
auth: {
15+
username: druidUsername,
16+
password: druidPassword,
17+
},
18+
}
19+
)
1120
return queryResult;
1221
}
1322

1423
export const executeSqlQuery = async (payload: any) => {
15-
const queryResult = await axios.post(sqlQueryEndpoint, payload)
24+
const queryResult = await axios.post(sqlQueryEndpoint, payload,
25+
{
26+
auth: {
27+
username: druidUsername,
28+
password: druidPassword,
29+
},
30+
}
31+
)
1632
return queryResult;
1733
}
1834

1935
export const getDatasourceListFromDruid = async () => {
20-
const existingDatasources = await axios.get(`${config?.query_api?.druid?.host}:${config?.query_api?.druid?.port}${config.query_api.druid.list_datasources_path}`, {})
36+
const existingDatasources = await axios.get(`${config?.query_api?.druid?.host}:${config?.query_api?.druid?.port}${config.query_api.druid.list_datasources_path}`, {
37+
auth: {
38+
username: druidUsername,
39+
password: druidPassword,
40+
}
41+
})
2142
return existingDatasources;
2243
}
2344

24-
export const druidHttpService = axios.create({ baseURL: `${config.query_api.druid.host}:${config.query_api.druid.port}`, headers: { "Content-Type": "application/json" } });
45+
export const druidHttpService = axios.create({
46+
baseURL: `${config.query_api.druid.host}:${config.query_api.druid.port}`,
47+
headers: { "Content-Type": "application/json" },
48+
auth: {
49+
username: druidUsername,
50+
password: druidPassword,
51+
}
52+
});

api-service/src/services/DatasetMetricsService.ts

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ import dayjs from "dayjs";
33
import _ from "lodash";
44
import { config } from "../configs/Config";
55
import { dataLineageSuccessQuery, generateConnectorQuery, generateDatasetQueryCallsQuery, generateDedupFailedQuery, generateDenormFailedQuery, generateTimeseriesQuery, generateTimeseriesQueryEventsPerHour, generateTotalQueryCallsQuery, generateTransformationFailedQuery, processingTimeQuery, totalEventsQuery, totalFailedEventsQuery } from "../controllers/DatasetMetrics/queries";
6+
import { druidHttpService } from "../connections/druidConnection";
67
const druidPort = _.get(config, "query_api.druid.port");
78
const druidHost = _.get(config, "query_api.druid.host");
89
const nativeQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.native_query_path}`;
910
const prometheusEndpoint = `${config.query_api.prometheus.url}/api/v1/query_range`;
1011

1112
export const getDataFreshness = async (dataset_id: string, intervals: string, defaultThreshold: number) => {
1213
const queryPayload = processingTimeQuery(intervals, dataset_id);
13-
const druidResponse = await axios.post(nativeQueryEndpoint, queryPayload?.query);
14+
const druidResponse = await druidHttpService.post(nativeQueryEndpoint, queryPayload?.query);
1415
const avgProcessingTime = _.get(druidResponse, "data[0].average_processing_time", 0);
1516
const freshnessStatus = avgProcessingTime < defaultThreshold ? "Healthy" : "Unhealthy";
1617

@@ -41,8 +42,8 @@ export const getDataObservability = async (dataset_id: string, intervals: string
4142
const totalQueryCallsAtDatasetLevel = generateDatasetQueryCallsQuery(dataset_id, config?.data_observability?.data_out_query_time_period);
4243

4344
const [totalEventsResponse, totalFailedEventsResponse, totalApiCallsResponse, totalCallsAtDatasetLevelResponse] = await Promise.all([
44-
axios.post(nativeQueryEndpoint, totalEventsPayload),
45-
axios.post(nativeQueryEndpoint, totalFailedEventsPayload),
45+
druidHttpService.post(nativeQueryEndpoint, totalEventsPayload),
46+
druidHttpService.post(nativeQueryEndpoint, totalFailedEventsPayload),
4647
axios.request({ url: prometheusEndpoint, method: "GET", params: totalQueryCalls }),
4748
axios.request({ url: prometheusEndpoint, method: "GET", params: totalQueryCallsAtDatasetLevel })
4849
]);
@@ -110,13 +111,13 @@ export const getDataVolume = async (dataset_id: string, volume_by_days: number,
110111
previousHourResponse, previousDayResponse, previousWeekResponse,
111112
nDaysResponse
112113
] = await Promise.all([
113-
axios.post(nativeQueryEndpoint, currentHourPayload),
114-
axios.post(nativeQueryEndpoint, currentDayPayload),
115-
axios.post(nativeQueryEndpoint, currentWeekPayload),
116-
axios.post(nativeQueryEndpoint, previousHourPayload),
117-
axios.post(nativeQueryEndpoint, previousDayPayload),
118-
axios.post(nativeQueryEndpoint, previousWeekPayload),
119-
axios.post(nativeQueryEndpoint, nDaysPayload)
114+
druidHttpService.post(nativeQueryEndpoint, currentHourPayload),
115+
druidHttpService.post(nativeQueryEndpoint, currentDayPayload),
116+
druidHttpService.post(nativeQueryEndpoint, currentWeekPayload),
117+
druidHttpService.post(nativeQueryEndpoint, previousHourPayload),
118+
druidHttpService.post(nativeQueryEndpoint, previousDayPayload),
119+
druidHttpService.post(nativeQueryEndpoint, previousWeekPayload),
120+
druidHttpService.post(nativeQueryEndpoint, nDaysPayload)
120121
]);
121122
const currentHourCount = _.get(currentHourResponse, "data[0].result.count") || 0;
122123
const currentDayCount = _.get(currentDayResponse, "data[0].result.count") || 0;
@@ -158,14 +159,14 @@ export const getDataLineage = async (dataset_id: string, intervals: string) => {
158159
transformationSuccessResponse, dedupSuccessResponse, denormSuccessResponse,
159160
totalValidationResponse, totalValidationFailedResponse, transformationFailedResponse, dedupFailedResponse, denormFailedResponse
160161
] = await Promise.all([
161-
axios.post(nativeQueryEndpoint, transformationSuccessPayload),
162-
axios.post(nativeQueryEndpoint, dedupSuccessPayload),
163-
axios.post(nativeQueryEndpoint, denormSuccessPayload),
164-
axios.post(nativeQueryEndpoint, totalValidationPayload),
165-
axios.post(nativeQueryEndpoint, totalValidationFailedPayload),
166-
axios.post(nativeQueryEndpoint, transformationFailedPayload),
167-
axios.post(nativeQueryEndpoint, dedupFailedPayload),
168-
axios.post(nativeQueryEndpoint, denormFailedPayload)
162+
druidHttpService.post(nativeQueryEndpoint, transformationSuccessPayload),
163+
druidHttpService.post(nativeQueryEndpoint, dedupSuccessPayload),
164+
druidHttpService.post(nativeQueryEndpoint, denormSuccessPayload),
165+
druidHttpService.post(nativeQueryEndpoint, totalValidationPayload),
166+
druidHttpService.post(nativeQueryEndpoint, totalValidationFailedPayload),
167+
druidHttpService.post(nativeQueryEndpoint, transformationFailedPayload),
168+
druidHttpService.post(nativeQueryEndpoint, dedupFailedPayload),
169+
druidHttpService.post(nativeQueryEndpoint, denormFailedPayload)
169170
]);
170171

171172
// success at each level
@@ -198,7 +199,7 @@ export const getDataLineage = async (dataset_id: string, intervals: string) => {
198199

199200
export const getConnectors = async (dataset_id: string, intervals: string) => {
200201
const connectorQueryPayload = generateConnectorQuery(intervals, dataset_id);
201-
const connectorResponse = await axios.post(nativeQueryEndpoint, connectorQueryPayload);
202+
const connectorResponse = await druidHttpService.post(nativeQueryEndpoint, connectorQueryPayload);
202203
const connectorsData = _.get(connectorResponse, "data[0].result", []);
203204
const result = {
204205
category: "connectors",
@@ -217,8 +218,8 @@ export const getDataQuality = async (dataset_id: string, intervals: string) => {
217218
const totalValidationFailedPayload = dataLineageSuccessQuery(intervals, dataset_id, "error_pdata_status", "failed");
218219
const [totalValidationResponse, totalValidationFailedResponse,
219220
] = await Promise.all([
220-
axios.post(nativeQueryEndpoint, totalValidationPayload),
221-
axios.post(nativeQueryEndpoint, totalValidationFailedPayload),
221+
druidHttpService.post(nativeQueryEndpoint, totalValidationPayload),
222+
druidHttpService.post(nativeQueryEndpoint, totalValidationFailedPayload),
222223
]);
223224
const totalValidationCount = _.get(totalValidationResponse, "data[0].result.count") || 0;
224225
const totalValidationFailedCount = _.get(totalValidationFailedResponse, "data[0].result.count") || 0;

api-service/src/services/DatasetService.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,13 @@ class DatasetService {
458458
const draftDatasource = this.createDraftDatasource(draftDataset, "datalake");
459459
const dsId = _.join([draftDataset.dataset_id, "events", "datalake"], "_")
460460
const liveDatasource = await Datasource.findOne({ where: { id: dsId }, attributes: ["ingestion_spec"], raw: true }) as unknown as Record<string, any>
461-
const ingestionSpec = tableGenerator.getHudiIngestionSpecForUpdate(draftDataset, liveDatasource?.ingestion_spec, allFields, draftDatasource?.datasource_ref);
461+
let ingestionSpec = _.get(liveDatasource, "ingestion_spec");
462+
if (_.isEmpty(ingestionSpec)) {
463+
ingestionSpec = tableGenerator.getHudiIngestionSpecForCreate(draftDataset, allFields, draftDatasource.datasource_ref);
464+
}
465+
else {
466+
ingestionSpec = tableGenerator.getHudiIngestionSpecForUpdate(draftDataset, liveDatasource?.ingestion_spec, allFields, draftDatasource?.datasource_ref);
467+
}
462468
_.set(draftDatasource, "ingestion_spec", ingestionSpec)
463469
_.set(draftDatasource, "created_by", created_by);
464470
_.set(draftDatasource, "updated_by", updated_by);

api-service/src/services/TableGenerator.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ class TableGenerator extends BaseTableGenerator {
225225
oldColumnSpec.push({
226226
"type": col.type,
227227
"name": col.name,
228-
"index": currIndex++
228+
"index": ++currIndex
229229
})
230230
})
231231
}

api-service/src/services/WrapperService.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { NextFunction, Request, Response } from "express";
33
import { config } from "../configs/Config";
44
import { ResponseHandler } from "../helpers/ResponseHandler";
55
import { ErrorResponseHandler } from "../helpers/ErrorResponseHandler";
6+
import { druidHttpService } from "../connections/druidConnection";
67

78
class WrapperService {
89

@@ -19,7 +20,7 @@ class WrapperService {
1920
try {
2021
// console.log("SQL Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers}));
2122
const authorization = req?.headers?.authorization;
22-
const result = await axios.post(
23+
const result = await druidHttpService.post(
2324
`${config.query_api.druid.host}:${config.query_api.druid.port}${config.query_api.druid.sql_query_path}`,
2425
req.body, {
2526
headers: { Authorization: authorization },
@@ -38,7 +39,7 @@ class WrapperService {
3839
// console.log("Native POST Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers, url: req.url}));
3940
const headers = req?.headers;
4041
const url = req?.url;
41-
const result = await axios.post(
42+
const result = await druidHttpService.post(
4243
`${config.query_api.druid.host}:${config.query_api.druid.port}${url}`,
4344
req.body, { headers, }
4445
);
@@ -55,7 +56,7 @@ class WrapperService {
5556
// console.log("Native DEL Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers, url: req.url}));
5657
const headers = req?.headers;
5758
const url = req?.url;
58-
const result = await axios.delete(
59+
const result = await druidHttpService.delete(
5960
`${config.query_api.druid.host}:${config.query_api.druid.port}${url}`,
6061
{
6162
headers,
@@ -74,7 +75,7 @@ class WrapperService {
7475
// console.log("Native GET Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers, url: req.url}));
7576
const headers = req?.headers;
7677
const url = req?.url;
77-
const result = await axios.get(
78+
const result = await druidHttpService.get(
7879
`${config.query_api.druid.host}:${config.query_api.druid.port}${url}`,
7980
{
8081
headers,
@@ -93,7 +94,7 @@ class WrapperService {
9394
try {
9495
const headers = req?.headers;
9596
const url = req?.url;
96-
const result = await axios.get(
97+
const result = await druidHttpService.get(
9798
`${config.query_api.druid.host}:${config.query_api.druid.port}${url}`,
9899
{ headers }
99100
);
@@ -108,15 +109,15 @@ class WrapperService {
108109
) => {
109110
try {
110111
// console.log("Native STATUS Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers, url: req.url}));
111-
const result = await axios.get(
112+
const result = await druidHttpService.get(
112113
`${config.query_api.druid.host}:${config.query_api.druid.port}/status`
113114
);
114115
ResponseHandler.flatResponse(req, res, result);
115116
} catch (error: any) { this.errorHandler.handleError(req, res, next, error, false); }
116117
};
117118

118119
public submitIngestion = async (ingestionSpec: object) => {
119-
return await axios.post(`${config.query_api.druid.host}:${config.query_api.druid.port}/${config.query_api.druid.submit_ingestion}`, ingestionSpec)
120+
return await druidHttpService.post(`${config.query_api.druid.host}:${config.query_api.druid.port}/${config.query_api.druid.submit_ingestion}`, ingestionSpec)
120121
}
121122

122123
}

api-service/src/services/managers/grafana/alert/helpers/index.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,11 +244,27 @@ const getMatchingLabels = async (channels: string[]) => {
244244
}
245245
}
246246

247+
const getNotificationChannel = async (channels: string[]) => {
248+
const fetchChannel = (id: string) => {
249+
return Notification.findOne({ where: { id } })
250+
.then(response => response?.toJSON())
251+
.then(channelMetadata => {
252+
const { name, type } = channelMetadata;
253+
return name;
254+
})
255+
.catch(() => null);
256+
}
257+
258+
const [name] = await Promise.all(channels.map(fetchChannel));
259+
return name;
260+
}
261+
247262
const transformRule = async ({ value, condition, metadata, isGroup }: any) => {
248263
const { name, id, interval, category, frequency, labels = {}, annotations = {}, severity, description, notification = {} } = value;
249264
const annotationObj = { ...annotations, description: description };
250265
const channels = _.get(notification, "channels") || [];
251266
const matchingLabelsForNotification = await getMatchingLabels(channels);
267+
const channel = await getNotificationChannel(channels);
252268

253269
const payload = {
254270
grafana_alert: {
@@ -258,6 +274,7 @@ const transformRule = async ({ value, condition, metadata, isGroup }: any) => {
258274
exec_err_state: _.get(metadata, "exec_err_state", "Error"),
259275
data: metadata,
260276
is_paused: false,
277+
...(channel && { notification_settings: { receiver: channel } })
261278
},
262279
for: interval,
263280
annotations: annotationObj,

0 commit comments

Comments
 (0)