Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion api-service/src/controllers/DatasetMetrics/DatasetMetrics.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,37 @@
},
"minItems": 1
},
"query_time_period":{
"query_time_period": {
"type": "integer",
"minimum": 1
},
"start_date": {
"type": "string"
},
"end_date": {
"type": "string"
}
},
"required": [
"category",
"dataset_id"
],
"oneOf": [
{
"required": ["query_time_period"],
"not": {
"anyOf": [
{ "required": ["start_date"] },
{ "required": ["end_date"] }
]
}
},
{
"required": ["start_date", "end_date"],
"not": {
"required": ["query_time_period"]
}
}
]
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,52 @@ import { schemaValidation } from "../../services/ValidationService";
import validationSchema from "./DatasetMetrics.json";
import { config } from "../../configs/Config";
import { datasetService } from "../../services/DatasetService";
import { getConnectors, getDataFreshness, getDataLineage, getDataObservability, getDataQuality, getDataVolume } from "../../services/DatasetMetricsService";
import { getConnectors, getDataFreshness, getDataLineage, getDataObservability, getDataVolume } from "../../services/DatasetMetricsService";

const apiId = "api.dataset.metrics";
const datasetMetrics = async (req: Request, res: Response) => {
const msgid = _.get(req, "body.params.msgid");
const requestBody = req.body;
const dataset_id = _.get(req, "body.request.dataset_id");
const timePeriod = _.get(req, "body.request.query_time_period") || config?.data_observability?.default_query_time_period;
const input_start_date = _.get(req, "body.request.start_date");
const input_end_date = _.get(req, "body.request.end_date");
const formattedStartDate = dayjs(input_start_date, 'YYYY/M/D')
.startOf('day')
.format('YYYY-MM-DDTHH:mm:ss');

const formattedEndDate = dayjs(input_end_date, 'YYYY/M/D')
.endOf('day')
.format('YYYY-MM-DDTHH:mm:ss');

if (input_start_date && input_end_date) {
if (dayjs(formattedStartDate).isAfter(dayjs(formattedEndDate))) {
logger.error({ apiId, datasetId: dataset_id, msgid, message: "Start date cannot be greater than end date", code: "INVALID_DATE_RANGE" });
return ResponseHandler.errorResponse({
message: "Invalid date range",
statusCode: 400,
errCode: "BAD_REQUEST",
code: "INVALID_DATE_RANGE"
}, req, res);
}
}

if (input_end_date && input_start_date && dayjs(formattedEndDate).isAfter(dayjs())) {
logger.error({ apiId, datasetId: dataset_id, msgid, message: "End date cannot be greater than the current date", code: "INVALID_DATE_RANGE" });
return ResponseHandler.errorResponse({
message: "Invalid date range: End date cannot be in the future",
statusCode: 400,
errCode: "BAD_REQUEST",
code: "INVALID_DATE_RANGE"
}, req, res);
}

const { category }: any = req.body.request;
const defaultThreshold = (typeof config?.data_observability?.default_freshness_threshold === 'number' ? config?.data_observability?.default_freshness_threshold : 5) * 60 * 1000; // 5 minutes in milliseconds
const dateFormat = 'YYYY-MM-DDTHH:mm:ss';
const endDate = dayjs().add(1, 'day').format(dateFormat);
const endDate = dayjs().format(dateFormat);
const startDate = dayjs(endDate).subtract(timePeriod, 'day').format(dateFormat);
const intervals = `${startDate}/${endDate}`;
const intervals = (input_start_date && input_end_date) ? `${formattedStartDate}/${formattedEndDate}` : `${startDate}/${endDate}`;
const isValidSchema = schemaValidation(requestBody, validationSchema);
const results = [];

Expand All @@ -38,22 +69,22 @@ const datasetMetrics = async (req: Request, res: Response) => {

try {
if (!category || category.includes("data_freshness")) {
const dataFreshnessResult = await getDataFreshness(dataset_id, intervals, defaultThreshold);
const dataFreshnessResult = await getDataFreshness(dataset_id, intervals, defaultThreshold, timePeriod);
results.push(dataFreshnessResult);
}

if (!category || category.includes("data_observability")) {
const dataObservabilityResult = await getDataObservability(dataset_id, intervals);
const dataObservabilityResult = await getDataObservability(dataset_id, intervals, timePeriod);
results.push(dataObservabilityResult);
}

if (!category || category.includes("data_volume")) {
const dataVolumeResult = await getDataVolume(dataset_id, timePeriod, dateFormat);
const dataVolumeResult = await getDataVolume(dataset_id, intervals, dateFormat, timePeriod);
results.push(dataVolumeResult);
}

if (!category || category.includes("data_lineage")) {
const dataLineageResult = await getDataLineage(dataset_id, intervals);
const dataLineageResult = await getDataLineage(dataset_id, intervals, timePeriod);
results.push(dataLineageResult);
}

Expand All @@ -62,11 +93,6 @@ const datasetMetrics = async (req: Request, res: Response) => {
results.push(connectorsResult);
}

if (!category || category.includes("data_quality")) {
const connectorsResult = await getDataQuality(dataset_id, intervals);
results.push(connectorsResult);
}

logger.info({ apiId, msgid, requestBody, datasetId: dataset_id, message: "Metrics fetched successfully" })
return ResponseHandler.successResponse(req, res, { status: 200, data: results });

Expand Down
121 changes: 51 additions & 70 deletions api-service/src/controllers/DatasetMetrics/queries.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import dayjs from "dayjs";

export const processingTimeQuery = (intervals: string, dataset_id: string) => ({
export const processingTimeQuery = (intervals: string, dataset_id: string, time_period: any) => ({
query: {
queryType: "groupBy",
dataSource: "system-events",
intervals: intervals,
granularity: {
type: "all",
timeZone: "UTC"
},
granularity: time_period === 1 ? "hour" : "day",
filter: {
type: "and",
fields: [
Expand All @@ -32,7 +29,7 @@ export const processingTimeQuery = (intervals: string, dataset_id: string) => ({
}
});

export const totalEventsQuery = (intervals: string, dataset_id: string) => ({
export const totalEventsQuery = (intervals: string, dataset_id: string, time_period: any) => ({
queryType: "timeseries",
dataSource: {
type: "table",
Expand All @@ -48,10 +45,7 @@ export const totalEventsQuery = (intervals: string, dataset_id: string) => ({
matchValueType: "STRING",
matchValue: dataset_id
},
granularity: {
type: "all",
timeZone: "UTC"
},
granularity: time_period === 1 ? "hour" : "day",
aggregations: [
{
type: "longSum",
Expand All @@ -61,7 +55,7 @@ export const totalEventsQuery = (intervals: string, dataset_id: string) => ({
]
});

export const totalFailedEventsQuery = (intervals: string, dataset_id: string) => ({
export const totalFailedEventsQuery = (intervals: string, dataset_id: string, time_period: any) => ({
queryType: "timeseries",
dataSource: {
type: "table",
Expand All @@ -77,10 +71,7 @@ export const totalFailedEventsQuery = (intervals: string, dataset_id: string) =>
matchValueType: "STRING",
matchValue: dataset_id
},
granularity: {
type: "all",
timeZone: "UTC"
},
granularity: time_period === 1 ? "hour" : "day",
aggregations: [
{
type: "filtered",
Expand Down Expand Up @@ -111,7 +102,26 @@ export const totalFailedEventsQuery = (intervals: string, dataset_id: string) =>
]
});

export const generateTimeseriesQuery = (intervals: string, dataset_id: string) => ({
export const generateTimeseriesQuery = (intervals: string, dataset_id: string, time_period: number) => ({
queryType: "timeseries",
dataSource: "system-events",
intervals: intervals,
granularity: time_period === 1 ? "hour" : "day",
filter: {
type: "and",
fields: [
{ type: "selector", dimension: "ctx_module", value: "processing" },
{ type: "selector", dimension: "ctx_dataset", value: dataset_id },
{ type: "selector", dimension: "ctx_pdata_pid", value: "router" },
{ type: "selector", dimension: "error_code", value: null }
]
},
aggregations: [
{ type: "longSum", name: "count", fieldName: "count" }
]
});

export const timeseriesQueryForVolumePercentage = (intervals: string, dataset_id: string) => ({
queryType: "timeseries",
dataSource: "system-events",
intervals: intervals,
Expand Down Expand Up @@ -245,56 +255,6 @@ export const generateTransformationFailedQuery = (intervals: string, dataset_id:
]
});

export const generateDedupFailedQuery = (intervals: string, dataset_id: string) => ({
queryType: "timeseries",
dataSource: {
type: "table",
name: "system-events"
},
intervals: {
type: "intervals",
intervals: [intervals]
},
filter: {
type: "equals",
column: "ctx_dataset",
matchValueType: "STRING",
matchValue: dataset_id
},
granularity: {
type: "all",
timeZone: "UTC"
},
aggregations: [
{
type: "filtered",
aggregator: {
type: "longSum",
name: "count",
fieldName: "count"
},
filter: {
type: "and",
fields: [
{
type: "equals",
column: "ctx_pdata_pid",
matchValueType: "STRING",
matchValue: "dedup"
},
{
type: "equals",
column: "error_type",
matchValueType: "STRING",
matchValue: "DedupFailed"
}
]
},
name: "count"
}
]
});

export const generateDenormFailedQuery = (intervals: string, dataset_id: string) => ({
queryType: "timeseries",
dataSource: {
Expand Down Expand Up @@ -388,15 +348,36 @@ export const generateConnectorQuery = (intervals: string, dataset_id: string) =>
});

export const generateTotalQueryCallsQuery = (time_period: string) => ({
end: dayjs().unix(),
query: `sum(sum_over_time(node_total_api_calls{entity="data-out"}[${time_period}]))`,
step: `${time_period}`,
start: dayjs().subtract(1, 'day').unix()
end: dayjs().unix(),
query: `sum(sum_over_time(node_total_api_calls{entity="data-out"}[${time_period}]))`,
step: `${time_period}`,
start: dayjs().subtract(1, 'day').unix()
});

export const generateDatasetQueryCallsQuery = (dataset: string, time_period: string) => ({
end: dayjs().unix(),
step: `${time_period}`,
query: `sum(sum_over_time(node_total_api_calls{dataset_id="${dataset}",entity="data-out"}[${time_period}]))`,
start: dayjs().subtract(1, 'day').unix(),
});

export const generateDedupFailedQuery = (dataset: string, time_period: string) => ({
end: dayjs().unix(),
step: `1d`,
query: `sum(sum_over_time(flink_taskmanager_job_task_operator_PipelinePreprocessorJob_${dataset}_dedup_failed_count[${time_period}]))`,
start: dayjs().subtract(1, 'day').unix(),
});

export const extractorSuccessCountQuery = (dataset: string, time_period: string) => ({
end: dayjs().unix(),
step: `1d`,
query: `sum(sum_over_time(flink_taskmanager_job_task_operator_ExtractorJob_${dataset}_extractor_success_count[${time_period}]))`,
start: dayjs().subtract(1, 'day').unix(),
});

export const extractorBatchDuplicateCountQuery = (dataset: string, time_period: string) => ({
end: dayjs().unix(),
step: `1d`,
query: `sum(sum_over_time(flink_taskmanager_job_task_operator_ExtractorJob_${dataset}_extractor_duplicate_count[${time_period}]))`,
start: dayjs().subtract(1, 'day').unix(),
});
Loading
Loading