Skip to content

Commit b0108dc

Browse files
authored
Merge pull request #360 from Sanketika-Obsrv/druid-auth-OBS-#I637
feat #OBS-I736 : Enable druid authentication
2 parents b2c4c61 + 78a8c0e commit b0108dc

File tree

6 files changed

+82
-34
lines changed

6 files changed

+82
-34
lines changed

api-service/src/configs/Config.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ export const config = {
2929
"sql_query_path": "/druid/v2/sql/",
3030
"native_query_path": "/druid/v2",
3131
"list_datasources_path": "/druid/v2/datasources",
32-
"submit_ingestion": "druid/indexer/v1/supervisor"
32+
"submit_ingestion": "druid/indexer/v1/supervisor",
33+
"username": process.env.druid_username || "admin",
34+
"password": process.env.druid_password || "admin123"
3335
},
3436
"prometheus": {
3537
"url": process.env.prometheus_url || "http://localhost:9090"

api-service/src/connections/druidConnection.ts

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,50 @@ import * as _ from "lodash";
33
import { config } from "../configs/Config";
44
const druidPort = _.get(config, "query_api.druid.port");
55
const druidHost = _.get(config, "query_api.druid.host");
6+
const druidUsername = _.get(config, "query_api.druid.username");
7+
const druidPassword = _.get(config, "query_api.druid.password");
68
const nativeQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.native_query_path}`;
79
const sqlQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.sql_query_path}`;
810

911
export const executeNativeQuery = async (payload: any) => {
10-
const queryResult = await axios.post(nativeQueryEndpoint, payload)
12+
const queryResult = await axios.post(nativeQueryEndpoint, payload,
13+
{
14+
auth: {
15+
username: druidUsername,
16+
password: druidPassword,
17+
},
18+
}
19+
)
1120
return queryResult;
1221
}
1322

1423
export const executeSqlQuery = async (payload: any) => {
15-
const queryResult = await axios.post(sqlQueryEndpoint, payload)
24+
const queryResult = await axios.post(sqlQueryEndpoint, payload,
25+
{
26+
auth: {
27+
username: druidUsername,
28+
password: druidPassword,
29+
},
30+
}
31+
)
1632
return queryResult;
1733
}
1834

1935
export const getDatasourceListFromDruid = async () => {
20-
const existingDatasources = await axios.get(`${config?.query_api?.druid?.host}:${config?.query_api?.druid?.port}${config.query_api.druid.list_datasources_path}`, {})
36+
const existingDatasources = await axios.get(`${config?.query_api?.druid?.host}:${config?.query_api?.druid?.port}${config.query_api.druid.list_datasources_path}`, {
37+
auth: {
38+
username: druidUsername,
39+
password: druidPassword,
40+
}
41+
})
2142
return existingDatasources;
2243
}
2344

24-
export const druidHttpService = axios.create({ baseURL: `${config.query_api.druid.host}:${config.query_api.druid.port}`, headers: { "Content-Type": "application/json" } });
45+
export const druidHttpService = axios.create({
46+
baseURL: `${config.query_api.druid.host}:${config.query_api.druid.port}`,
47+
headers: { "Content-Type": "application/json" },
48+
auth: {
49+
username: druidUsername,
50+
password: druidPassword,
51+
}
52+
});

api-service/src/services/DatasetMetricsService.ts

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ import dayjs from "dayjs";
33
import _ from "lodash";
44
import { config } from "../configs/Config";
55
import { dataLineageSuccessQuery, generateConnectorQuery, generateDatasetQueryCallsQuery, generateDedupFailedQuery, generateDenormFailedQuery, generateTimeseriesQuery, generateTimeseriesQueryEventsPerHour, generateTotalQueryCallsQuery, generateTransformationFailedQuery, processingTimeQuery, totalEventsQuery, totalFailedEventsQuery } from "../controllers/DatasetMetrics/queries";
6+
import { druidHttpService } from "../connections/druidConnection";
67
const druidPort = _.get(config, "query_api.druid.port");
78
const druidHost = _.get(config, "query_api.druid.host");
89
const nativeQueryEndpoint = `${druidHost}:${druidPort}${config.query_api.druid.native_query_path}`;
910
const prometheusEndpoint = `${config.query_api.prometheus.url}/api/v1/query_range`;
1011

1112
export const getDataFreshness = async (dataset_id: string, intervals: string, defaultThreshold: number) => {
1213
const queryPayload = processingTimeQuery(intervals, dataset_id);
13-
const druidResponse = await axios.post(nativeQueryEndpoint, queryPayload?.query);
14+
const druidResponse = await druidHttpService.post(nativeQueryEndpoint, queryPayload?.query);
1415
const avgProcessingTime = _.get(druidResponse, "data[0].average_processing_time", 0);
1516
const freshnessStatus = avgProcessingTime < defaultThreshold ? "Healthy" : "Unhealthy";
1617

@@ -41,8 +42,8 @@ export const getDataObservability = async (dataset_id: string, intervals: string
4142
const totalQueryCallsAtDatasetLevel = generateDatasetQueryCallsQuery(dataset_id, config?.data_observability?.data_out_query_time_period);
4243

4344
const [totalEventsResponse, totalFailedEventsResponse, totalApiCallsResponse, totalCallsAtDatasetLevelResponse] = await Promise.all([
44-
axios.post(nativeQueryEndpoint, totalEventsPayload),
45-
axios.post(nativeQueryEndpoint, totalFailedEventsPayload),
45+
druidHttpService.post(nativeQueryEndpoint, totalEventsPayload),
46+
druidHttpService.post(nativeQueryEndpoint, totalFailedEventsPayload),
4647
axios.request({ url: prometheusEndpoint, method: "GET", params: totalQueryCalls }),
4748
axios.request({ url: prometheusEndpoint, method: "GET", params: totalQueryCallsAtDatasetLevel })
4849
]);
@@ -110,13 +111,13 @@ export const getDataVolume = async (dataset_id: string, volume_by_days: number,
110111
previousHourResponse, previousDayResponse, previousWeekResponse,
111112
nDaysResponse
112113
] = await Promise.all([
113-
axios.post(nativeQueryEndpoint, currentHourPayload),
114-
axios.post(nativeQueryEndpoint, currentDayPayload),
115-
axios.post(nativeQueryEndpoint, currentWeekPayload),
116-
axios.post(nativeQueryEndpoint, previousHourPayload),
117-
axios.post(nativeQueryEndpoint, previousDayPayload),
118-
axios.post(nativeQueryEndpoint, previousWeekPayload),
119-
axios.post(nativeQueryEndpoint, nDaysPayload)
114+
druidHttpService.post(nativeQueryEndpoint, currentHourPayload),
115+
druidHttpService.post(nativeQueryEndpoint, currentDayPayload),
116+
druidHttpService.post(nativeQueryEndpoint, currentWeekPayload),
117+
druidHttpService.post(nativeQueryEndpoint, previousHourPayload),
118+
druidHttpService.post(nativeQueryEndpoint, previousDayPayload),
119+
druidHttpService.post(nativeQueryEndpoint, previousWeekPayload),
120+
druidHttpService.post(nativeQueryEndpoint, nDaysPayload)
120121
]);
121122
const currentHourCount = _.get(currentHourResponse, "data[0].result.count") || 0;
122123
const currentDayCount = _.get(currentDayResponse, "data[0].result.count") || 0;
@@ -158,14 +159,14 @@ export const getDataLineage = async (dataset_id: string, intervals: string) => {
158159
transformationSuccessResponse, dedupSuccessResponse, denormSuccessResponse,
159160
totalValidationResponse, totalValidationFailedResponse, transformationFailedResponse, dedupFailedResponse, denormFailedResponse
160161
] = await Promise.all([
161-
axios.post(nativeQueryEndpoint, transformationSuccessPayload),
162-
axios.post(nativeQueryEndpoint, dedupSuccessPayload),
163-
axios.post(nativeQueryEndpoint, denormSuccessPayload),
164-
axios.post(nativeQueryEndpoint, totalValidationPayload),
165-
axios.post(nativeQueryEndpoint, totalValidationFailedPayload),
166-
axios.post(nativeQueryEndpoint, transformationFailedPayload),
167-
axios.post(nativeQueryEndpoint, dedupFailedPayload),
168-
axios.post(nativeQueryEndpoint, denormFailedPayload)
162+
druidHttpService.post(nativeQueryEndpoint, transformationSuccessPayload),
163+
druidHttpService.post(nativeQueryEndpoint, dedupSuccessPayload),
164+
druidHttpService.post(nativeQueryEndpoint, denormSuccessPayload),
165+
druidHttpService.post(nativeQueryEndpoint, totalValidationPayload),
166+
druidHttpService.post(nativeQueryEndpoint, totalValidationFailedPayload),
167+
druidHttpService.post(nativeQueryEndpoint, transformationFailedPayload),
168+
druidHttpService.post(nativeQueryEndpoint, dedupFailedPayload),
169+
druidHttpService.post(nativeQueryEndpoint, denormFailedPayload)
169170
]);
170171

171172
// success at each level
@@ -198,7 +199,7 @@ export const getDataLineage = async (dataset_id: string, intervals: string) => {
198199

199200
export const getConnectors = async (dataset_id: string, intervals: string) => {
200201
const connectorQueryPayload = generateConnectorQuery(intervals, dataset_id);
201-
const connectorResponse = await axios.post(nativeQueryEndpoint, connectorQueryPayload);
202+
const connectorResponse = await druidHttpService.post(nativeQueryEndpoint, connectorQueryPayload);
202203
const connectorsData = _.get(connectorResponse, "data[0].result", []);
203204
const result = {
204205
category: "connectors",
@@ -217,8 +218,8 @@ export const getDataQuality = async (dataset_id: string, intervals: string) => {
217218
const totalValidationFailedPayload = dataLineageSuccessQuery(intervals, dataset_id, "error_pdata_status", "failed");
218219
const [totalValidationResponse, totalValidationFailedResponse,
219220
] = await Promise.all([
220-
axios.post(nativeQueryEndpoint, totalValidationPayload),
221-
axios.post(nativeQueryEndpoint, totalValidationFailedPayload),
221+
druidHttpService.post(nativeQueryEndpoint, totalValidationPayload),
222+
druidHttpService.post(nativeQueryEndpoint, totalValidationFailedPayload),
222223
]);
223224
const totalValidationCount = _.get(totalValidationResponse, "data[0].result.count") || 0;
224225
const totalValidationFailedCount = _.get(totalValidationFailedResponse, "data[0].result.count") || 0;

api-service/src/services/WrapperService.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { NextFunction, Request, Response } from "express";
33
import { config } from "../configs/Config";
44
import { ResponseHandler } from "../helpers/ResponseHandler";
55
import { ErrorResponseHandler } from "../helpers/ErrorResponseHandler";
6+
import { druidHttpService } from "../connections/druidConnection";
67

78
class WrapperService {
89

@@ -19,7 +20,7 @@ class WrapperService {
1920
try {
2021
// console.log("SQL Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers}));
2122
const authorization = req?.headers?.authorization;
22-
const result = await axios.post(
23+
const result = await druidHttpService.post(
2324
`${config.query_api.druid.host}:${config.query_api.druid.port}${config.query_api.druid.sql_query_path}`,
2425
req.body, {
2526
headers: { Authorization: authorization },
@@ -38,7 +39,7 @@ class WrapperService {
3839
// console.log("Native POST Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers, url: req.url}));
3940
const headers = req?.headers;
4041
const url = req?.url;
41-
const result = await axios.post(
42+
const result = await druidHttpService.post(
4243
`${config.query_api.druid.host}:${config.query_api.druid.port}${url}`,
4344
req.body, { headers, }
4445
);
@@ -55,7 +56,7 @@ class WrapperService {
5556
// console.log("Native DEL Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers, url: req.url}));
5657
const headers = req?.headers;
5758
const url = req?.url;
58-
const result = await axios.delete(
59+
const result = await druidHttpService.delete(
5960
`${config.query_api.druid.host}:${config.query_api.druid.port}${url}`,
6061
{
6162
headers,
@@ -74,7 +75,7 @@ class WrapperService {
7475
// console.log("Native GET Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers, url: req.url}));
7576
const headers = req?.headers;
7677
const url = req?.url;
77-
const result = await axios.get(
78+
const result = await druidHttpService.get(
7879
`${config.query_api.druid.host}:${config.query_api.druid.port}${url}`,
7980
{
8081
headers,
@@ -93,7 +94,7 @@ class WrapperService {
9394
try {
9495
const headers = req?.headers;
9596
const url = req?.url;
96-
const result = await axios.get(
97+
const result = await druidHttpService.get(
9798
`${config.query_api.druid.host}:${config.query_api.druid.port}${url}`,
9899
{ headers }
99100
);
@@ -108,15 +109,15 @@ class WrapperService {
108109
) => {
109110
try {
110111
// console.log("Native STATUS Request to druid - \n" + JSON.stringify({"ts": Date.now(), body: req.body, headers: req.headers, url: req.url}));
111-
const result = await axios.get(
112+
const result = await druidHttpService.get(
112113
`${config.query_api.druid.host}:${config.query_api.druid.port}/status`
113114
);
114115
ResponseHandler.flatResponse(req, res, result);
115116
} catch (error: any) { this.errorHandler.handleError(req, res, next, error, false); }
116117
};
117118

118119
public submitIngestion = async (ingestionSpec: object) => {
119-
return await axios.post(`${config.query_api.druid.host}:${config.query_api.druid.port}/${config.query_api.druid.submit_ingestion}`, ingestionSpec)
120+
return await druidHttpService.post(`${config.query_api.druid.host}:${config.query_api.druid.port}/${config.query_api.druid.submit_ingestion}`, ingestionSpec)
120121
}
121122

122123
}

command-service/src/command/druid_command.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from model.data_models import Action, ActionResponse, CommandPayload
66
from service.db_service import DatabaseService
77
from service.http_service import HttpService
8+
import base64
89

910

1011
class DruidCommand(ICommand):
@@ -19,6 +20,16 @@ def __init__(
1920
router_post = self.config.find("druid.router_port")
2021
self.supervisor_endpoint = self.config.find("druid.supervisor_endpoint")
2122
self.router_url = f"{router_host}:{router_post}/druid"
23+
self._auth = None
24+
25+
@property
26+
def auth(self):
27+
"""Lazy initialization of auth header."""
28+
if self._auth is None:
29+
username = self.config.find("druid.username")
30+
password = self.config.find("druid.password")
31+
self._auth = base64.b64encode(f"{username}:{password}".encode()).decode()
32+
return self._auth
2233

2334
def execute(self, command_payload: CommandPayload, action: Action):
2435
if action == Action.SUBMIT_INGESTION_TASKS.name:
@@ -42,7 +53,10 @@ def _submit_ingestion_task(self, dataset_id):
4253
response = self.http_service.post(
4354
url=f"{self.router_url}/{self.supervisor_endpoint}",
4455
body=ingestion_spec,
45-
headers={"Content-Type": "application/json"}
56+
headers={
57+
"Content-Type": "application/json",
58+
"Authorization": f"Basic {self.auth}"
59+
}
4660
)
4761
if response.status != 200:
4862
task_submitted = 0

command-service/src/config/service_config.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ druid:
3939
router_host: http://localhost
4040
router_port: 8888
4141
supervisor_endpoint: indexer/v1/supervisor
42+
username: admin
43+
password: admin123
4244

4345
helm_charts_base_dir: ../helm-charts
4446

0 commit comments

Comments
 (0)