Skip to content

Commit b14bf6a

Browse files
authored
Merge pull request #358 from Sanketika-Obsrv/release-1.9.0
Release 1.9.0 to main
2 parents 4f25f52 + 4ed82a1 commit b14bf6a

File tree

7 files changed

+135
-14
lines changed

7 files changed

+135
-14
lines changed

api-service/src/configs/Config.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,5 +144,8 @@ export const config = {
144144
},
145145
"alerts_rules": {
146146
"config_path": process.env.alerts_config_path
147-
}
147+
},
148+
"dataset_filter_config": {
149+
"status_filter_limit": process.env.status_filter_limit ? parseInt(process.env.status_filter_limit) : 10 // Maximum number of filters allowed in a dataset
150+
}
148151
}

api-service/src/controllers/DatasetList/DatasetList.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,16 @@ import logger from "../../logger";
55
import { schemaValidation } from "../../services/ValidationService";
66
import DatasetCreate from "./DatasetListValidationSchema.json";
77
import { ResponseHandler } from "../../helpers/ResponseHandler";
8-
import { datasetService } from "../../services/DatasetService";
8+
import { attachDraftConnectors, attachLiveConnectors, datasetService } from "../../services/DatasetService";
99
import { obsrvError } from "../../types/ObsrvError";
10-
import { Dataset } from "../../models/Dataset";
11-
import { Datasource } from "../../models/Datasource";
10+
import { config } from "../../configs/Config";
1211

1312
export const apiId = "api.datasets.list"
1413
export const errorCode = "DATASET_LIST_FAILURE"
1514
const liveDatasetStatus = ["Live", "Retired", "Purged"]
1615
const draftDatasetStatus = ["Draft", "ReadyToPublish"]
1716
const defaultFields = ["dataset_id", "name", "type", "status", "tags", "version", "api_version", "dataset_config", "created_date", "updated_date"]
17+
const MAX_STATUS_ARRAY_SIZE = config.dataset_filter_config.status_filter_limit || 10;
1818

1919
const datasetList = async (req: Request, res: Response) => {
2020

@@ -35,11 +35,19 @@ const listDatasets = async (request: Record<string, any>): Promise<Record<string
3535

3636
const { filters = {} } = request || {};
3737
const datasetStatus = _.get(filters, "status");
38+
const connectorFilter = _.get(filters, "connectors");
3839
const status = _.isArray(datasetStatus) ? datasetStatus : _.compact([datasetStatus])
39-
const draftFilters = _.set(_.cloneDeep(filters), "status", _.isEmpty(status) ? draftDatasetStatus : _.intersection(status, draftDatasetStatus));
40-
const liveFilters = _.set(_.cloneDeep(filters), "status", _.isEmpty(status) ? liveDatasetStatus : _.intersection(status, liveDatasetStatus));
41-
const liveDatasetList = await datasetService.getLiveDatasets(liveFilters, defaultFields)
42-
const draftDatasetList = await datasetService.findDraftDatasets(draftFilters, [...defaultFields, "data_schema", "validation_config", "dedup_config", "denorm_config", "connectors_config", "version_key"], [["updated_date", "DESC"]]);
40+
if (status.length > MAX_STATUS_ARRAY_SIZE) {
41+
throw obsrvError("", "DATASET_LIST_INPUT_INVALID", "Status filter array length exceeds the allowed limit", "BAD_REQUEST", 400);
42+
}
43+
const draftFilters = _.omit(_.set(_.cloneDeep(filters), "status", _.isEmpty(status) ? draftDatasetStatus : _.intersection(status, draftDatasetStatus)), "connectors");
44+
const liveFilters = _.omit(_.set(_.cloneDeep(filters), "status", _.isEmpty(status) ? liveDatasetStatus : _.intersection(status, liveDatasetStatus)), "connectors");
45+
let liveDatasetList = await datasetService.getLiveDatasets(liveFilters, defaultFields)
46+
let draftDatasetList = await datasetService.findDraftDatasets(draftFilters, [...defaultFields, "data_schema", "validation_config", "dedup_config", "denorm_config", "connectors_config", "version_key"], [["updated_date", "DESC"]]);
47+
if(connectorFilter && !_.isEmpty(connectorFilter)) {
48+
liveDatasetList = await attachLiveConnectors(liveDatasetList, connectorFilter);
49+
draftDatasetList = await attachDraftConnectors(draftDatasetList, connectorFilter);
50+
}
4351
return _.compact(_.concat(liveDatasetList, draftDatasetList));
4452
}
4553

api-service/src/controllers/DatasetList/DatasetListValidationSchema.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,20 @@
4545
"type": {
4646
"type": "string",
4747
"enum": ["event", "transaction", "master"]
48+
},
49+
"connectors": {
50+
"anyOf": [
51+
{
52+
"type": "array",
53+
"items": {
54+
"type": "string"
55+
}
56+
},
57+
{
58+
"type": "string",
59+
"default": "all"
60+
}
61+
]
4862
}
4963
},
5064
"additionalProperties": false

api-service/src/controllers/DatasetRead/DatasetRead.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ const readDataset = async (datasetId: string, attributes: string[]): Promise<any
8282
const api_version = _.get(dataset, "api_version")
8383
const datasetConfigs: any = {}
8484
const transformations_config = await datasetService.getTransformations(datasetId, ["field_key", "transformation_function", "mode", "metadata"])
85-
const datasourceConfig = await Datasource.findOne({ where: { dataset_id: datasetId, is_primary: true }, attributes: ["datasource"], raw: true })
85+
const datasourceConfig = await Datasource.findOne({ where: { dataset_id: datasetId, is_primary: true, type: "druid" }, attributes: ["datasource"], raw: true })
8686
datasetConfigs["alias"] = _.get(datasourceConfig, "datasource")
8787
if (api_version !== "v2") {
8888
datasetConfigs["transformations_config"] = _.map(transformations_config, (config) => {

api-service/src/models/ConnectorInstances.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ export const ConnectorInstances = sequelize.define("connector_instances", {
1212
connector_id: {
1313
type: DataTypes.STRING
1414
},
15-
data_format: {
16-
type: DataTypes.STRING,
17-
defaultValue: "json"
18-
},
1915
connector_config: {
2016
type: DataTypes.STRING
2117
},

api-service/src/services/DatasetService.ts

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { config } from "../configs/Config";
2222
import { Op } from "sequelize";
2323
import TableDraft from "../models/Table";
2424
import { alertService } from "./AlertManagerService";
25+
import { ConnectorRegistry } from "../models/ConnectorRegistry";
2526

2627
class DatasetService {
2728

@@ -144,7 +145,7 @@ class DatasetService {
144145
{
145146
model: Datasource,
146147
attributes: ['datasource'],
147-
where: { is_primary: true },
148+
where: { is_primary: true, type: "druid" },
148149
required: false
149150
},
150151
], raw: true, where: filters, attributes, order: [["updated_date", "DESC"]]
@@ -510,4 +511,102 @@ export const validateStorageSupport = (dataset: Record<string, any>) => {
510511
}
511512
}
512513

514+
export const attachDraftConnectors = async (
515+
draftDatasetList: Record<string, any>[],
516+
connectorFilter: string | string[]
517+
): Promise<Record<string, any>[]> => {
518+
if (_.isEmpty(draftDatasetList)) {
519+
return [];
520+
}
521+
522+
const connectorIds = _.uniq(
523+
_.flatMap(draftDatasetList, dataset =>
524+
_.map(dataset.connectors_config, 'connector_id')
525+
)
526+
);
527+
528+
if (_.isEmpty(connectorIds)) {
529+
return draftDatasetList.map(dataset => ({
530+
...dataset,
531+
connectors_config: []
532+
}));
533+
}
534+
535+
const connectorRegistry: any = await ConnectorRegistry.findAll({
536+
where: { id: connectorIds },
537+
raw: true,
538+
attributes: ['id', 'name', 'category', 'type']
539+
});
540+
541+
return draftDatasetList.map(dataset => {
542+
let filteredConnectors = dataset.connectors_config;
543+
if (connectorFilter !== 'all') {
544+
const filterArray = _.castArray(connectorFilter); // Ensure it's an array
545+
filteredConnectors = _.filter(filteredConnectors, connector =>
546+
filterArray.includes(String(connector.connector_id))
547+
);
548+
}
549+
550+
const enrichedConnectors = filteredConnectors.map((connector: any) => {
551+
const registryDetails = _.find(connectorRegistry, {
552+
id: connector.connector_id
553+
});
554+
return {
555+
...connector,
556+
name: registryDetails?.name || null,
557+
category: registryDetails?.category || null,
558+
source: registryDetails?.type || null
559+
};
560+
});
561+
562+
return {
563+
...dataset,
564+
connectors_config: enrichedConnectors
565+
};
566+
});
567+
};
568+
569+
export const attachLiveConnectors = async (
570+
liveDatasetList: Record<string, any>,
571+
connectorFilter: string | string[]
572+
): Promise<Record<string, any>[]> => {
573+
if (_.isEmpty(liveDatasetList)) {
574+
return [];
575+
}
576+
577+
ConnectorRegistry.hasMany(ConnectorInstances, { foreignKey: 'connector_id' });
578+
const connectorRegistry = await ConnectorRegistry.findAll({
579+
include: [{
580+
model: ConnectorInstances,
581+
attributes: ['dataset_id', 'connector_id'],
582+
required: true
583+
}],
584+
raw: true,
585+
attributes: ['id', 'name', 'category', 'type']
586+
});
587+
588+
const filterArray = connectorFilter === 'all' ? null : _.castArray(connectorFilter);
589+
590+
return liveDatasetList.map((dataset: Record<string, any>) => {
591+
const datasetId = dataset.dataset_id;
592+
593+
const filteredConnectors = connectorRegistry.filter((connector: any) =>
594+
connector['connector_instances.dataset_id'] === datasetId &&
595+
(!filterArray || filterArray.includes(String(connector.id)))
596+
);
597+
598+
const enrichedConnectors = filteredConnectors.map((connector: any) => ({
599+
connector_id: connector.id,
600+
name: connector.name,
601+
category: connector.category,
602+
type: connector.type
603+
}));
604+
605+
return {
606+
...dataset,
607+
connectors_config: enrichedConnectors
608+
};
609+
});
610+
};
611+
513612
export const datasetService = new DatasetService();

api-service/src/services/TableGenerator.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ class TableGenerator extends BaseTableGenerator {
171171
case "boolean": return "string";
172172
case "array": return "json";
173173
case "string": return "string";
174+
case "double": return "double";
174175
default: return "auto";
175176
}
176177
}

0 commit comments

Comments
 (0)