Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api-service/src/configs/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ export const config = {
"sql_query_path": "/druid/v2/sql/",
"native_query_path": "/druid/v2",
"list_datasources_path": "/druid/v2/datasources",
"submit_ingestion": "druid/indexer/v1/supervisor"
"submit_ingestion": "druid/indexer/v1/supervisor",
"username": process.env.druid_username || "admin",
"password": process.env.druid_password || "admin123"
},
"prometheus": {
"url": process.env.prometheus_url || "http://localhost:9090"
Expand Down
36 changes: 32 additions & 4 deletions api-service/src/connections/druidConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,50 @@ import * as _ from "lodash";
import { config } from "../configs/Config";
const druidPort = _.get(config, "query_api.druid.port");
const druidHost = _.get(config, "query_api.druid.host");
const druidUsername = _.get(config, "query_api.druid.username");
const druidPassword = _.get(config, "query_api.druid.password");
const nativeQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.native_query_path}`;
const sqlQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.sql_query_path}`;

export const executeNativeQuery = async (payload: any) => {
const queryResult = await axios.post(nativeQueryEndpoint, payload)
const queryResult = await axios.post(nativeQueryEndpoint, payload,
{
auth: {
username: druidUsername,
password: druidPassword,
},
}
)
return queryResult;
}

export const executeSqlQuery = async (payload: any) => {
const queryResult = await axios.post(sqlQueryEndpoint, payload)
const queryResult = await axios.post(sqlQueryEndpoint, payload,
{
auth: {
username: druidUsername,
password: druidPassword,
},
}
)
return queryResult;
}

export const getDatasourceListFromDruid = async () => {
const existingDatasources = await axios.get(`${config?.query_api?.druid?.host}:${config?.query_api?.druid?.port}${config.query_api.druid.list_datasources_path}`, {})
const existingDatasources = await axios.get(`${config?.query_api?.druid?.host}:${config?.query_api?.druid?.port}${config.query_api.druid.list_datasources_path}`, {
auth: {
username: druidUsername,
password: druidPassword,
}
})
return existingDatasources;
}

export const druidHttpService = axios.create({ baseURL: `${config.query_api.druid.host}:${config.query_api.druid.port}`, headers: { "Content-Type": "application/json" } });
export const druidHttpService = axios.create({
baseURL: `${config.query_api.druid.host}:${config.query_api.druid.port}`,
headers: { "Content-Type": "application/json" },
auth: {
username: druidUsername,
password: druidPassword,
}
});
43 changes: 22 additions & 21 deletions api-service/src/services/DatasetMetricsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ import dayjs from "dayjs";
import _ from "lodash";
import { config } from "../configs/Config";
import { dataLineageSuccessQuery, generateConnectorQuery, generateDatasetQueryCallsQuery, generateDedupFailedQuery, generateDenormFailedQuery, generateTimeseriesQuery, generateTimeseriesQueryEventsPerHour, generateTotalQueryCallsQuery, generateTransformationFailedQuery, processingTimeQuery, totalEventsQuery, totalFailedEventsQuery } from "../controllers/DatasetMetrics/queries";
import { druidHttpService } from "../connections/druidConnection";
const druidPort = _.get(config, "query_api.druid.port");
const druidHost = _.get(config, "query_api.druid.host");
const nativeQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.native_query_path}`;
const prometheusEndpoint = `${config.query_api.prometheus.url}/api/v1/query_range`;

export const getDataFreshness = async (dataset_id: string, intervals: string, defaultThreshold: number) => {
const queryPayload = processingTimeQuery(intervals, dataset_id);
const druidResponse = await axios.post(nativeQueryEndpoint, queryPayload?.query);
const druidResponse = await druidHttpService.post(nativeQueryEndpoint, queryPayload?.query);
const avgProcessingTime = _.get(druidResponse, "data[0].average_processing_time", 0);
const freshnessStatus = avgProcessingTime < defaultThreshold ? "Healthy" : "Unhealthy";

Expand Down Expand Up @@ -41,8 +42,8 @@ export const getDataObservability = async (dataset_id: string, intervals: string
const totalQueryCallsAtDatasetLevel = generateDatasetQueryCallsQuery(dataset_id, config?.data_observability?.data_out_query_time_period);

const [totalEventsResponse, totalFailedEventsResponse, totalApiCallsResponse, totalCallsAtDatasetLevelResponse] = await Promise.all([
axios.post(nativeQueryEndpoint, totalEventsPayload),
axios.post(nativeQueryEndpoint, totalFailedEventsPayload),
druidHttpService.post(nativeQueryEndpoint, totalEventsPayload),
druidHttpService.post(nativeQueryEndpoint, totalFailedEventsPayload),
axios.request({ url: prometheusEndpoint, method: "GET", params: totalQueryCalls }),
axios.request({ url: prometheusEndpoint, method: "GET", params: totalQueryCallsAtDatasetLevel })
]);
Expand Down Expand Up @@ -110,13 +111,13 @@ export const getDataVolume = async (dataset_id: string, volume_by_days: number,
previousHourResponse, previousDayResponse, previousWeekResponse,
nDaysResponse
] = await Promise.all([
axios.post(nativeQueryEndpoint, currentHourPayload),
axios.post(nativeQueryEndpoint, currentDayPayload),
axios.post(nativeQueryEndpoint, currentWeekPayload),
axios.post(nativeQueryEndpoint, previousHourPayload),
axios.post(nativeQueryEndpoint, previousDayPayload),
axios.post(nativeQueryEndpoint, previousWeekPayload),
axios.post(nativeQueryEndpoint, nDaysPayload)
druidHttpService.post(nativeQueryEndpoint, currentHourPayload),
druidHttpService.post(nativeQueryEndpoint, currentDayPayload),
druidHttpService.post(nativeQueryEndpoint, currentWeekPayload),
druidHttpService.post(nativeQueryEndpoint, previousHourPayload),
druidHttpService.post(nativeQueryEndpoint, previousDayPayload),
druidHttpService.post(nativeQueryEndpoint, previousWeekPayload),
druidHttpService.post(nativeQueryEndpoint, nDaysPayload)
]);
const currentHourCount = _.get(currentHourResponse, "data[0].result.count") || 0;
const currentDayCount = _.get(currentDayResponse, "data[0].result.count") || 0;
Expand Down Expand Up @@ -158,14 +159,14 @@ export const getDataLineage = async (dataset_id: string, intervals: string) => {
transformationSuccessResponse, dedupSuccessResponse, denormSuccessResponse,
totalValidationResponse, totalValidationFailedResponse, transformationFailedResponse, dedupFailedResponse, denormFailedResponse
] = await Promise.all([
axios.post(nativeQueryEndpoint, transformationSuccessPayload),
axios.post(nativeQueryEndpoint, dedupSuccessPayload),
axios.post(nativeQueryEndpoint, denormSuccessPayload),
axios.post(nativeQueryEndpoint, totalValidationPayload),
axios.post(nativeQueryEndpoint, totalValidationFailedPayload),
axios.post(nativeQueryEndpoint, transformationFailedPayload),
axios.post(nativeQueryEndpoint, dedupFailedPayload),
axios.post(nativeQueryEndpoint, denormFailedPayload)
druidHttpService.post(nativeQueryEndpoint, transformationSuccessPayload),
druidHttpService.post(nativeQueryEndpoint, dedupSuccessPayload),
druidHttpService.post(nativeQueryEndpoint, denormSuccessPayload),
druidHttpService.post(nativeQueryEndpoint, totalValidationPayload),
druidHttpService.post(nativeQueryEndpoint, totalValidationFailedPayload),
druidHttpService.post(nativeQueryEndpoint, transformationFailedPayload),
druidHttpService.post(nativeQueryEndpoint, dedupFailedPayload),
druidHttpService.post(nativeQueryEndpoint, denormFailedPayload)
]);

// success at each level
Expand Down Expand Up @@ -198,7 +199,7 @@ export const getDataLineage = async (dataset_id: string, intervals: string) => {

export const getConnectors = async (dataset_id: string, intervals: string) => {
const connectorQueryPayload = generateConnectorQuery(intervals, dataset_id);
const connectorResponse = await axios.post(nativeQueryEndpoint, connectorQueryPayload);
const connectorResponse = await druidHttpService.post(nativeQueryEndpoint, connectorQueryPayload);
const connectorsData = _.get(connectorResponse, "data[0].result", []);
const result = {
category: "connectors",
Expand All @@ -217,8 +218,8 @@ export const getDataQuality = async (dataset_id: string, intervals: string) => {
const totalValidationFailedPayload = dataLineageSuccessQuery(intervals, dataset_id, "error_pdata_status", "failed");
const [totalValidationResponse, totalValidationFailedResponse,
] = await Promise.all([
axios.post(nativeQueryEndpoint, totalValidationPayload),
axios.post(nativeQueryEndpoint, totalValidationFailedPayload),
druidHttpService.post(nativeQueryEndpoint, totalValidationPayload),
druidHttpService.post(nativeQueryEndpoint, totalValidationFailedPayload),
]);
const totalValidationCount = _.get(totalValidationResponse, "data[0].result.count") || 0;
const totalValidationFailedCount = _.get(totalValidationFailedResponse, "data[0].result.count") || 0;
Expand Down
8 changes: 7 additions & 1 deletion api-service/src/services/DatasetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,13 @@ class DatasetService {
const draftDatasource = this.createDraftDatasource(draftDataset, "datalake");
const dsId = _.join([draftDataset.dataset_id, "events", "datalake"], "_")
const liveDatasource = await Datasource.findOne({ where: { id: dsId }, attributes: ["ingestion_spec"], raw: true }) as unknown as Record<string, any>
const ingestionSpec = tableGenerator.getHudiIngestionSpecForUpdate(draftDataset, liveDatasource?.ingestion_spec, allFields, draftDatasource?.datasource_ref);
let ingestionSpec = _.get(liveDatasource, "ingestion_spec");
if (_.isEmpty(ingestionSpec)) {
ingestionSpec = tableGenerator.getHudiIngestionSpecForCreate(draftDataset, allFields, draftDatasource.datasource_ref);
}
else {
ingestionSpec = tableGenerator.getHudiIngestionSpecForUpdate(draftDataset, liveDatasource?.ingestion_spec, allFields, draftDatasource?.datasource_ref);
}
_.set(draftDatasource, "ingestion_spec", ingestionSpec)
_.set(draftDatasource, "created_by", created_by);
_.set(draftDatasource, "updated_by", updated_by);
Expand Down
2 changes: 1 addition & 1 deletion api-service/src/services/TableGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class TableGenerator extends BaseTableGenerator {
oldColumnSpec.push({
"type": col.type,
"name": col.name,
"index": currIndex++
"index": ++currIndex
})
})
}
Expand Down
15 changes: 8 additions & 7 deletions api-service/src/services/WrapperService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { NextFunction, Request, Response } from "express";
import { config } from "../configs/Config";
import { ResponseHandler } from "../helpers/ResponseHandler";
import { ErrorResponseHandler } from "../helpers/ErrorResponseHandler";
import { druidHttpService } from "../connections/druidConnection";

class WrapperService {

Expand All @@ -19,7 +20,7 @@ class WrapperService {
try {
// console.log("SQL Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers}));
const authorization = req?.headers?.authorization;
const result = await axios.post(
const result = await druidHttpService.post(
`${config.query_api.druid.host}:${config.query_api.druid.port}${config.query_api.druid.sql_query_path}`,
req.body, {
headers: { Authorization: authorization },
Expand All @@ -38,7 +39,7 @@ class WrapperService {
// console.log("Native POST Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers, url: req.url}));
const headers = req?.headers;
const url = req?.url;
const result = await axios.post(
const result = await druidHttpService.post(
`${config.query_api.druid.host}:${config.query_api.druid.port}${url}`,
req.body, { headers, }
);
Expand All @@ -55,7 +56,7 @@ class WrapperService {
// console.log("Native DEL Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers, url: req.url}));
const headers = req?.headers;
const url = req?.url;
const result = await axios.delete(
const result = await druidHttpService.delete(
`${config.query_api.druid.host}:${config.query_api.druid.port}${url}`,
{
headers,
Expand All @@ -74,7 +75,7 @@ class WrapperService {
// console.log("Native GET Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers, url: req.url}));
const headers = req?.headers;
const url = req?.url;
const result = await axios.get(
const result = await druidHttpService.get(
`${config.query_api.druid.host}:${config.query_api.druid.port}${url}`,
{
headers,
Expand All @@ -93,7 +94,7 @@ class WrapperService {
try {
const headers = req?.headers;
const url = req?.url;
const result = await axios.get(
const result = await druidHttpService.get(
`${config.query_api.druid.host}:${config.query_api.druid.port}${url}`,
{ headers }
);
Expand All @@ -108,15 +109,15 @@ class WrapperService {
) => {
try {
// console.log("Native STATUS Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers, url: req.url}));
const result = await axios.get(
const result = await druidHttpService.get(
`${config.query_api.druid.host}:${config.query_api.druid.port}/status`
);
ResponseHandler.flatResponse(req, res, result);
} catch (error: any) { this.errorHandler.handleError(req, res, next, error, false); }
};

public submitIngestion = async (ingestionSpec: object) => {
return await axios.post(`${config.query_api.druid.host}:${config.query_api.druid.port}/${config.query_api.druid.submit_ingestion}`, ingestionSpec)
return await druidHttpService.post(`${config.query_api.druid.host}:${config.query_api.druid.port}/${config.query_api.druid.submit_ingestion}`, ingestionSpec)
}

}
Expand Down
17 changes: 17 additions & 0 deletions api-service/src/services/managers/grafana/alert/helpers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,27 @@ const getMatchingLabels = async (channels: string[]) => {
}
}

const getNotificationChannel = async (channels: string[]) => {
const fetchChannel = (id: string) => {
return Notification.findOne({ where: { id } })
.then(response => response?.toJSON())
.then(channelMetadata => {
const { name, type } = channelMetadata;
return name;
})
.catch(() => null);
}

const [name] = await Promise.all(channels.map(fetchChannel));
return name;
}

const transformRule = async ({ value, condition, metadata, isGroup }: any) => {
const { name, id, interval, category, frequency, labels = {}, annotations = {}, severity, description, notification = {} } = value;
const annotationObj = { ...annotations, description: description };
const channels = _.get(notification, "channels") || [];
const matchingLabelsForNotification = await getMatchingLabels(channels);
const channel = await getNotificationChannel(channels);

const payload = {
grafana_alert: {
Expand All @@ -258,6 +274,7 @@ const transformRule = async ({ value, condition, metadata, isGroup }: any) => {
exec_err_state: _.get(metadata, "exec_err_state", "Error"),
data: metadata,
is_paused: false,
...(channel && { notification_settings: { receiver: channel } })
},
for: interval,
annotations: annotationObj,
Expand Down
Loading
Loading