Skip to content

Commit 8e71dbb

Browse files
Merging Next Into ---> Main (#1073)
2 parents 73624b5 + 81ac118 commit 8e71dbb

File tree

6 files changed

+290
-14
lines changed

6 files changed

+290
-14
lines changed

apps/queue-manager/src/consumers/get-import-job-data.consumer.ts

Lines changed: 270 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
11
import { RSSXMLService } from '@impler/services';
2-
import { ImportJobHistoryStatusEnum, SendImportJobCachedData } from '@impler/shared';
3-
import { JobMappingRepository, CommonRepository } from '@impler/dal';
2+
import {
3+
ImportJobHistoryStatusEnum,
4+
SendImportJobCachedData,
5+
ColumnTypesEnum,
6+
ITemplateSchemaItem,
7+
ColumnDelimiterEnum,
8+
} from '@impler/shared';
9+
410
import { SendImportJobDataConsumer } from './send-import-job-data.consumer';
11+
import { CommonRepository, JobMappingRepository, ColumnRepository } from '@impler/dal';
512

613
export class GetImportJobDataConsumer extends SendImportJobDataConsumer {
714
private commonRepository: CommonRepository = new CommonRepository();
815
private jobMappingRepository: JobMappingRepository = new JobMappingRepository();
16+
private columnRepo: ColumnRepository = new ColumnRepository();
917
private rssXmlService: RSSXMLService = new RSSXMLService();
1018

1119
async message(message: { content: string }) {
1220
const data = JSON.parse(message.content) as { _jobId: string };
1321
const importJobHistoryId = this.commonRepository.generateMongoId().toString();
14-
const importedData = await this.getJobImportedData(data._jobId);
22+
const validationResult = await this.getJobImportedData(data._jobId);
1523

1624
// Create history entry
1725
await this.importJobHistoryRepository.create({
@@ -26,13 +34,26 @@ export class GetImportJobDataConsumer extends SendImportJobDataConsumer {
2634
});
2735

2836
if (webhookDestination?.callbackUrl) {
29-
await this.sendDataImportData(data._jobId, importedData);
37+
if (validationResult.validRecords > 0) {
38+
await this.sendDataImportData(data._jobId, validationResult.validData, 1, undefined, false);
39+
}
40+
if (validationResult.invalidRecords > 0) {
41+
await this.sendDataImportData(data._jobId, validationResult.invalidData, 1, undefined, true);
42+
}
3043
}
3144

3245
return;
3346
}
3447

35-
async getJobImportedData(_jobId: string) {
48+
async getJobImportedData(_jobId: string): Promise<{
49+
importedData: Record<string, unknown>[];
50+
hasInvalidRecords: boolean;
51+
totalRecords: number;
52+
validRecords: number;
53+
invalidRecords: number;
54+
validData: Record<string, unknown>[];
55+
invalidData: Record<string, unknown>[];
56+
}> {
3657
try {
3758
const userJob = await this.userJobRepository.findOne({ _id: _jobId });
3859
if (!userJob) {
@@ -55,17 +76,140 @@ export class GetImportJobDataConsumer extends SendImportJobDataConsumer {
5576
const batchResult = await this.rssXmlService.getBatchXMLKeyValuesByPaths(parsedXMLData.xmlData, mappings);
5677
const mappedData = await this.rssXmlService.mappingFunction(mappings, batchResult);
5778

58-
return mappedData;
79+
const validationResult = await this.validateData(_jobId, mappedData);
80+
81+
// Send data to webhook with validation status
82+
this.sendDataImportData(_jobId, mappedData, 1, undefined, validationResult.hasInvalidRecords);
83+
84+
if (validationResult.hasInvalidRecords) {
85+
await this.userJobRepository.update({ _id: _jobId }, { $set: { isInvalidRecords: true } });
86+
}
87+
88+
return {
89+
importedData: mappedData,
90+
hasInvalidRecords: validationResult.hasInvalidRecords,
91+
totalRecords: validationResult.totalRecords,
92+
validRecords: validationResult.validRecords,
93+
invalidRecords: validationResult.invalidRecords,
94+
validData: validationResult.validData,
95+
invalidData: validationResult.invalidData,
96+
};
97+
} catch (error) {}
98+
}
99+
100+
private async validateData(
101+
_jobId: string,
102+
mappedData: Record<string, unknown>[]
103+
): Promise<{
104+
hasInvalidRecords: boolean;
105+
totalRecords: number;
106+
validRecords: number;
107+
invalidRecords: number;
108+
validData: Record<string, unknown>[];
109+
invalidData: Record<string, unknown>[];
110+
}> {
111+
try {
112+
const userJob = await this.userJobRepository.findOne({ _id: _jobId });
113+
if (!userJob) {
114+
throw new Error(`Job not found for _jobId: ${_jobId}`);
115+
}
116+
117+
// Get template columns (schema)
118+
const columns = await this.columnRepo.find({ _templateId: userJob._templateId });
119+
if (!columns || columns.length === 0) {
120+
return {
121+
hasInvalidRecords: false,
122+
totalRecords: 0,
123+
validRecords: 0,
124+
invalidRecords: 0,
125+
validData: [],
126+
invalidData: [],
127+
};
128+
}
129+
130+
const multiSelectColumnHeadings: Record<string, string> = {};
131+
132+
(columns as unknown as ITemplateSchemaItem[]).forEach((column) => {
133+
if (column.type === ColumnTypesEnum.SELECT && column.allowMultiSelect)
134+
multiSelectColumnHeadings[column.key] = column.delimiter || ColumnDelimiterEnum.COMMA;
135+
});
136+
137+
let totalRecords = 0;
138+
let validRecords = 0;
139+
let invalidRecords = 0;
140+
const validData: Record<string, unknown>[] = [];
141+
const invalidData: Record<string, unknown>[] = [];
142+
143+
for (const recordData of mappedData) {
144+
// Format record for multi-select handling
145+
const checkRecord: Record<string, unknown> = this.formatRecord({
146+
record: { record: recordData },
147+
multiSelectColumnHeadings,
148+
});
149+
150+
const validationResult = this.validateRecordUsingColumnSchema(
151+
checkRecord,
152+
columns as unknown as ITemplateSchemaItem[]
153+
);
154+
155+
totalRecords++;
156+
157+
if (validationResult.isValid) {
158+
validRecords++;
159+
validData.push(recordData);
160+
} else {
161+
invalidRecords++;
162+
invalidData.push(recordData);
163+
}
164+
}
165+
166+
const hasInvalidRecords = invalidRecords > 0;
167+
168+
return {
169+
hasInvalidRecords,
170+
totalRecords,
171+
validRecords,
172+
invalidRecords,
173+
validData,
174+
invalidData,
175+
};
59176
} catch (error) {
60-
throw error;
177+
return {
178+
hasInvalidRecords: false,
179+
totalRecords: 0,
180+
validRecords: 0,
181+
invalidRecords: 0,
182+
validData: [],
183+
invalidData: [],
184+
};
61185
}
62186
}
63187

188+
private formatRecord({
189+
record,
190+
multiSelectColumnHeadings,
191+
}: {
192+
record: { record: Record<string, unknown> };
193+
multiSelectColumnHeadings?: Record<string, string>;
194+
}) {
195+
return Object.keys(multiSelectColumnHeadings || {}).reduce(
196+
(acc, heading) => {
197+
if (typeof record.record[heading] === 'string') {
198+
acc[heading] = (record.record[heading] as string)?.split(multiSelectColumnHeadings[heading]);
199+
}
200+
201+
return acc;
202+
},
203+
{ ...record.record }
204+
);
205+
}
206+
64207
private async sendDataImportData(
65208
_jobId: string,
66-
allDataJson: any[],
209+
allDataJson: Record<string, any>[],
67210
page = 1,
68-
initialCachedData?: SendImportJobCachedData
211+
initialCachedData?: SendImportJobCachedData,
212+
areInvalidRecords?: boolean
69213
) {
70214
try {
71215
let cachedData = null;
@@ -84,6 +228,7 @@ export class GetImportJobDataConsumer extends SendImportJobDataConsumer {
84228
recordFormat: cachedData.recordFormat,
85229
chunkFormat: cachedData.chunkFormat,
86230
...cachedData,
231+
isInvalidRecords: areInvalidRecords,
87232
});
88233

89234
const headers =
@@ -109,15 +254,129 @@ export class GetImportJobDataConsumer extends SendImportJobDataConsumer {
109254
});
110255

111256
if (nextPageNumber) {
112-
// Recursively call for next page with updated page number
113257
await this.sendDataImportData(_jobId, allDataJson, nextPageNumber, { ...cachedData, page: nextPageNumber });
114258
} else {
115-
// Processing is done
116259
await this.finalizeUpload(_jobId);
117260
}
118261
}
119262
} catch (error) {
120263
throw error;
121264
}
122265
}
266+
267+
private validateRecordUsingColumnSchema(
268+
record: Record<string, unknown>,
269+
columns: ITemplateSchemaItem[]
270+
): { isValid: boolean } {
271+
enum ValidationTypesEnum {
272+
RANGE = 'range',
273+
LENGTH = 'length',
274+
UNIQUE_WITH = 'unique_with',
275+
DIGITS = 'digits',
276+
}
277+
let isValid = true;
278+
279+
for (const column of columns) {
280+
const value = record[column.key];
281+
282+
if (value === undefined) {
283+
isValid = false;
284+
continue;
285+
}
286+
287+
if (column.isRequired && (value === null || value === '' || !value)) {
288+
isValid = false;
289+
continue;
290+
}
291+
292+
if (value !== null && value !== '') {
293+
switch (column.type) {
294+
case ColumnTypesEnum.NUMBER:
295+
if (isNaN(Number(value))) {
296+
isValid = false;
297+
}
298+
break;
299+
case ColumnTypesEnum.DOUBLE:
300+
if (isNaN(Number(value)) || !Number.isFinite(Number(value))) {
301+
isValid = false;
302+
}
303+
break;
304+
case ColumnTypesEnum.EMAIL:
305+
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
306+
if (!emailRegex.test(String(value))) {
307+
isValid = false;
308+
}
309+
break;
310+
case ColumnTypesEnum.DATE:
311+
if (isNaN(Date.parse(String(value)))) {
312+
isValid = false;
313+
}
314+
break;
315+
case ColumnTypesEnum.REGEX:
316+
if (column.regex && !new RegExp(column.regex).test(String(value))) {
317+
isValid = false;
318+
}
319+
break;
320+
case ColumnTypesEnum.SELECT:
321+
if (column.selectValues && !column.selectValues.includes(String(value))) {
322+
isValid = false;
323+
}
324+
break;
325+
case ColumnTypesEnum.IMAGE:
326+
const imageUrlRegex = /^https?:\/\/.+\.(jpg|jpeg|png|gif|bmp|webp|svg)$/i;
327+
if (!imageUrlRegex.test(String(value))) {
328+
isValid = false;
329+
}
330+
break;
331+
}
332+
}
333+
334+
if (
335+
value !== undefined &&
336+
value !== null &&
337+
value !== '' &&
338+
column.validations &&
339+
column.validations.length > 0
340+
) {
341+
for (const validation of column.validations) {
342+
switch (validation.validate) {
343+
case ValidationTypesEnum.RANGE:
344+
const numValue = Number(value);
345+
if (!isNaN(numValue)) {
346+
if (validation.min !== undefined && numValue < validation.min) {
347+
isValid = false;
348+
}
349+
if (validation.max !== undefined && numValue > validation.max) {
350+
isValid = false;
351+
}
352+
}
353+
break;
354+
case ValidationTypesEnum.LENGTH:
355+
const strValue = String(value);
356+
if (validation.min !== undefined && strValue.length < validation.min) {
357+
isValid = false;
358+
}
359+
if (validation.max !== undefined && strValue.length > validation.max) {
360+
isValid = false;
361+
}
362+
break;
363+
case ValidationTypesEnum.DIGITS:
364+
const digitStr = String(value).replace(/[^0-9]/g, '');
365+
if (validation.min !== undefined && digitStr.length < validation.min) {
366+
isValid = false;
367+
}
368+
if (validation.max !== undefined && digitStr.length > validation.max) {
369+
isValid = false;
370+
}
371+
break;
372+
case ValidationTypesEnum.UNIQUE_WITH:
373+
break;
374+
}
375+
if (!isValid) break;
376+
}
377+
}
378+
}
379+
380+
return { isValid };
381+
}
123382
}

apps/queue-manager/src/consumers/send-import-job-data.consumer.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,15 @@ export class SendImportJobDataConsumer extends BaseConsumer {
104104
recordFormat,
105105
extra = '',
106106
multiSelectHeadings,
107-
}: SendImportJobCachedData & { data: any[]; uploadId: string }): { sendData: Record<string, unknown>; page: number } {
107+
isInvalidRecords,
108+
}: SendImportJobCachedData & {
109+
data: any[];
110+
uploadId: string;
111+
isInvalidRecords?: boolean;
112+
}): {
113+
sendData: Record<string, unknown>;
114+
page: number;
115+
} {
108116
const defaultValuesObj = JSON.parse(defaultValues);
109117
let slicedData = data.slice(
110118
Math.max((page - DEFAULT_PAGE) * chunkSize, MIN_LIMIT),
@@ -132,6 +140,7 @@ export class SendImportJobDataConsumer extends BaseConsumer {
132140
chunkSize: slicedData.length,
133141
extra: extra ? JSON.parse(extra) : '',
134142
totalPages: this.getTotalPages(data.length, chunkSize),
143+
...(isInvalidRecords !== undefined && { isInvalidRecords }),
135144
};
136145

137146
return {
@@ -182,6 +191,7 @@ export class SendImportJobDataConsumer extends BaseConsumer {
182191
defaultValues: JSON.stringify(defaultValueObj),
183192
recordFormat: userJob.customRecordFormat,
184193
chunkFormat: userJob.customChunkFormat,
194+
isInvalidRecords: userJob.isInvalidRecords,
185195
};
186196
}
187197

libs/dal/src/repositories/user-job/user-job.entity.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@ export class UserJobEntity {
2626
customChunkFormat: string;
2727

2828
customSchema: string;
29+
30+
isInvalidRecords?: boolean;
2931
}

libs/dal/src/repositories/user-job/user-job.schema.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ const userJobSchema = new Schema(
4646
customSchema: {
4747
type: Schema.Types.String,
4848
},
49+
isInvalidRecords: {
50+
type: Schema.Types.Boolean,
51+
default: false,
52+
},
4953
},
5054
{ ...schemaOptions }
5155
);

0 commit comments

Comments
 (0)