Skip to content

Commit 00a015c

Browse files
v1.8.1 (#1100)
2 parents 2dfcbfa + c48e694 commit 00a015c

File tree

32 files changed

+439
-127
lines changed

32 files changed

+439
-127
lines changed

apps/api/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@impler/api",
3-
"version": "1.8.0",
3+
"version": "1.8.1",
44
"author": "implerhq",
55
"license": "MIT",
66
"private": true,

apps/api/src/app/activity/usecases/upload-history/upload-history.usecase.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ export class UploadHistory {
99

1010
async execute({ _projectId, date, limit, name, page }: UploadHistoryCommand): Promise<PaginationResult> {
1111
const uploadResult = await this.uploadRepository.getList(_projectId, name, date, page, limit);
12-
uploadResult.uploads = uploadResult.uploads.map((upload) => {
13-
upload.name = upload._template.name;
14-
delete upload._template;
1512

16-
return upload;
17-
});
13+
const transformedRecords = uploadResult.uploads.map((record) => ({
14+
...record,
15+
name: record._template?.name,
16+
_template: undefined,
17+
}));
1818

1919
return {
20-
data: uploadResult.uploads,
20+
data: transformedRecords,
2121
limit,
2222
page,
2323
totalPages: Math.ceil(uploadResult.totalRecords / limit),

apps/api/src/app/auto-import-jobs-schedular/usecase/auto-import-jobs-schedular.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ export class AutoImportJobsSchedular {
4848

4949
const interval = parser.parseExpression(cronExpression.trim(), {
5050
currentDate: currentNextRun,
51+
tz: 'Asia/Kolkata',
5152
});
5253

5354
const nextRun = interval.next().toDate();

apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.usecase.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,19 @@ export class CreateUserJob {
7373
return dayjs().add(5, 'minutes').toDate();
7474
}
7575

76+
const interval = parser.parseExpression(cronExpression.trim(), {
77+
tz: 'Asia/Kolkata',
78+
});
79+
const nextCronTime = dayjs(interval.next().toDate());
7680
const now = dayjs();
77-
const fiveMinutesFromNow = now.add(5, 'minutes');
7881

79-
const interval = parser.parseExpression(cronExpression.trim());
80-
const nextCronTime = dayjs(interval.next().toDate());
82+
if (nextCronTime.isBefore(now.add(1, 'minute'))) {
83+
const nextOccurrence = dayjs(interval.next().toDate());
8184

82-
if (nextCronTime.isAfter(fiveMinutesFromNow)) {
83-
return fiveMinutesFromNow.toDate();
84-
} else {
85-
return nextCronTime.toDate();
85+
return nextOccurrence.toDate();
8686
}
87+
88+
return nextCronTime.toDate();
8789
} catch (error) {
8890
return dayjs().add(5, 'minutes').toDate();
8991
}

apps/queue-manager/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@impler/queue-manager",
3-
"version": "1.8.0",
3+
"version": "1.8.1",
44
"author": "implerhq",
55
"license": "MIT",
66
"private": true,

apps/queue-manager/src/bootstrap.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
SendWebhookDataConsumer,
1111
EndImportConsumer,
1212
SendBubbleDataConsumer,
13-
GetImportJobDataConsumer,
13+
SendAutoImportJobDataConsumer,
1414
SendImportJobDataConsumer,
1515
SendFailedWebhookDataConsumer,
1616
} from './consumers';
@@ -47,7 +47,7 @@ export async function bootstrap() {
4747
const sendBubbleDataConsumer = new SendBubbleDataConsumer();
4848
const sendWebhookdataConsumer = new SendWebhookDataConsumer();
4949
const sendFailedWebhookDataConsumer = new SendFailedWebhookDataConsumer();
50-
const getImportJobbDataConsumer = new GetImportJobDataConsumer();
50+
const autoImportJobbDataConsumer = new SendAutoImportJobDataConsumer();
5151
const sendImportJobDataConsumer = new SendImportJobDataConsumer();
5252

5353
// add queues to channel
@@ -84,7 +84,7 @@ export async function bootstrap() {
8484
}),
8585
channel.consume(
8686
QueuesEnum.GET_IMPORT_JOB_DATA,
87-
getImportJobbDataConsumer.message.bind(getImportJobbDataConsumer),
87+
autoImportJobbDataConsumer.message.bind(autoImportJobbDataConsumer),
8888
{
8989
noAck: true,
9090
}

apps/queue-manager/src/consumers/get-import-job-data.consumer.ts renamed to apps/queue-manager/src/consumers/get-auto-import-job-data.consumer.ts

Lines changed: 77 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,69 @@
1-
import { RSSXMLService } from '@impler/services';
1+
import { PaymentAPIService, RSSXMLService } from '@impler/services';
22
import {
3-
ImportJobHistoryStatusEnum,
43
SendImportJobCachedData,
54
ColumnTypesEnum,
65
ITemplateSchemaItem,
76
ColumnDelimiterEnum,
7+
UserJobImportStatusEnum,
88
} from '@impler/shared';
99

10-
import dayjs from 'dayjs';
1110
import { SendImportJobDataConsumer } from './send-import-job-data.consumer';
12-
import { CommonRepository, JobMappingRepository, ColumnRepository } from '@impler/dal';
11+
import { JobMappingRepository, ColumnRepository, UserJobEntity } from '@impler/dal';
12+
13+
interface IValidationResult {
14+
hasInvalidRecords: boolean;
15+
totalRecords: number;
16+
validRecords: number;
17+
invalidRecords: number;
18+
validData: Record<string, unknown>[];
19+
invalidData: Record<string, unknown>[];
20+
}
1321

14-
export class GetImportJobDataConsumer extends SendImportJobDataConsumer {
15-
private commonRepository: CommonRepository = new CommonRepository();
22+
export class SendAutoImportJobDataConsumer extends SendImportJobDataConsumer {
1623
private jobMappingRepository: JobMappingRepository = new JobMappingRepository();
1724
private columnRepo: ColumnRepository = new ColumnRepository();
1825
private rssXmlService: RSSXMLService = new RSSXMLService();
26+
private paymentAPIService: PaymentAPIService = new PaymentAPIService();
1927

2028
async message(message: { content: string }) {
2129
const data = JSON.parse(message.content) as { _jobId: string };
22-
const importJobHistoryId = this.commonRepository.generateMongoId().toString();
2330
const validationResult = await this.getJobImportedData(data._jobId);
2431

25-
// Create history entry
26-
await this.importJobHistoryRepository.create({
27-
_id: importJobHistoryId,
28-
_jobId: data._jobId,
29-
status: ImportJobHistoryStatusEnum.PROCESSING,
30-
});
31-
3232
const userJobInfo = await this.userJobRepository.getUserJobWithTemplate(data._jobId);
3333
const webhookDestination = await this.webhookDestinationRepository.findOne({
3434
_templateId: userJobInfo._templateId,
3535
});
3636

3737
if (webhookDestination?.callbackUrl) {
3838
if (validationResult.validRecords > 0) {
39-
await this.sendDataImportData(data._jobId, validationResult.validData, 1, undefined, false, userJobInfo.endsOn);
39+
await this.sendDataImportData(
40+
data._jobId,
41+
validationResult.validData,
42+
1,
43+
undefined,
44+
false,
45+
userJobInfo.endsOn,
46+
validationResult
47+
);
4048
}
49+
4150
if (validationResult.invalidRecords > 0) {
4251
await this.sendDataImportData(
4352
data._jobId,
4453
validationResult.invalidData,
4554
1,
4655
undefined,
4756
true,
48-
userJobInfo.endsOn
57+
userJobInfo.endsOn,
58+
validationResult
4959
);
5060
}
5161
}
5262

5363
return;
5464
}
5565

56-
async getJobImportedData(_jobId: string): Promise<{
57-
importedData: Record<string, unknown>[];
58-
hasInvalidRecords: boolean;
59-
totalRecords: number;
60-
validRecords: number;
61-
invalidRecords: number;
62-
validData: Record<string, unknown>[];
63-
invalidData: Record<string, unknown>[];
64-
}> {
66+
async getJobImportedData(_jobId: string): Promise<IValidationResult & { importedData: Record<string, unknown>[] }> {
6567
try {
6668
const userJob = await this.userJobRepository.findOne({ _id: _jobId });
6769
if (!userJob) {
@@ -217,7 +219,8 @@ export class GetImportJobDataConsumer extends SendImportJobDataConsumer {
217219
page = 1,
218220
initialCachedData?: SendImportJobCachedData,
219221
areInvalidRecords?: boolean,
220-
endsOn?: Date
222+
endsOn?: Date,
223+
validationResult?: IValidationResult
221224
) {
222225
try {
223226
let cachedData = null;
@@ -268,11 +271,14 @@ export class GetImportJobDataConsumer extends SendImportJobDataConsumer {
268271
nextPageNumber,
269272
{ ...cachedData, page: nextPageNumber },
270273
areInvalidRecords,
271-
endsOn
274+
endsOn,
275+
validationResult
272276
);
273277
} else {
274-
if (endsOn && dayjs(endsOn).isSame(dayjs(), 'day')) {
275-
await this.finalizeUpload(_jobId);
278+
if (validationResult) {
279+
// Get userJobInfo for finalization
280+
const userJobInfo = await this.userJobRepository.getUserJobWithTemplate(_jobId);
281+
await this.finalizeAutoImportJob(_jobId, validationResult, userJobInfo);
276282
}
277283
}
278284
}
@@ -281,6 +287,48 @@ export class GetImportJobDataConsumer extends SendImportJobDataConsumer {
281287
}
282288
}
283289

290+
private async finalizeAutoImportJob(
291+
_jobId: string,
292+
validationResult: IValidationResult,
293+
userJobInfo: UserJobEntity
294+
): Promise<void> {
295+
try {
296+
await this.templateRepository.update(
297+
{ _id: userJobInfo._templateId },
298+
{
299+
$set: {
300+
isInvalidRecords: validationResult.hasInvalidRecords,
301+
invalidRecords: validationResult.invalidRecords,
302+
validRecords: validationResult.validRecords,
303+
totalRecords: validationResult.totalRecords,
304+
},
305+
}
306+
);
307+
308+
await this.paymentAPIService.createEvent(
309+
{
310+
uploadId: _jobId,
311+
totalRecords: validationResult.totalRecords,
312+
validRecords: validationResult.validRecords,
313+
invalidRecords: validationResult.invalidRecords,
314+
},
315+
userJobInfo.externalUserId
316+
);
317+
318+
await this.userJobRepository.update(
319+
{ _id: _jobId },
320+
{
321+
$set: {
322+
status: UserJobImportStatusEnum.RUNNING, //UserJobImportStatusEnum.COMPLETED,
323+
totalRecords: validationResult.totalRecords,
324+
validRecords: validationResult.validRecords,
325+
invalidRecords: validationResult.invalidRecords,
326+
},
327+
}
328+
);
329+
} catch (error) {}
330+
}
331+
284332
private validateRecordUsingColumnSchema(
285333
record: Record<string, unknown>,
286334
columns: ITemplateSchemaItem[]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
export * from './send-webhook-data.consumer';
22
export * from './end-import.consumer';
33
export * from './send-bubble-data.consumer';
4-
export * from './get-import-job-data.consumer';
4+
export * from './get-auto-import-job-data.consumer';
55
export * from './send-import-job-data.consumer';
66
export * from './send-failed-webhook-data.consumer';

apps/queue-manager/src/consumers/send-import-job-data.consumer.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ import {
55
WebhookDestinationRepository,
66
WebhookLogEntity,
77
UserJobRepository,
8+
WebhookLogRepository,
89
} from '@impler/dal';
9-
import { StorageService } from '@impler/services';
10+
import { PaymentAPIService, StorageService } from '@impler/services';
1011
import {
1112
SendImportJobData,
1213
SendImportJobCachedData,
@@ -20,17 +21,19 @@ import {
2021

2122
import { publishToQueue } from '../bootstrap';
2223
import { BaseConsumer } from './base.consumer';
23-
import { getStorageServiceClass } from '../helpers/services.helper';
24+
import { getPaymentApiServiceClass, getStorageServiceClass } from '../helpers/services.helper';
2425

2526
const MIN_LIMIT = 0;
2627
const DEFAULT_PAGE = 1;
2728

2829
export class SendImportJobDataConsumer extends BaseConsumer {
2930
private columnRepository: ColumnRepository = new ColumnRepository();
3031
public userJobRepository: UserJobRepository = new UserJobRepository();
31-
private templateRepository: TemplateRepository = new TemplateRepository();
32+
public templateRepository: TemplateRepository = new TemplateRepository();
3233
public importJobHistoryRepository: ImportJobHistoryRepository = new ImportJobHistoryRepository();
3334
public webhookDestinationRepository: WebhookDestinationRepository = new WebhookDestinationRepository();
35+
public webhookLogRepository: WebhookLogRepository = new WebhookLogRepository();
36+
public paymentApiService: PaymentAPIService = getPaymentApiServiceClass();
3437
public storageService: StorageService = getStorageServiceClass();
3538

3639
async message(message: { content: string }) {

apps/queue-manager/src/helpers/services.helper.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
import { StorageTypeEnum } from '@impler/shared';
2-
import { AzureStorageService, EmailService, S3StorageService, SESEmailService, StorageService } from '@impler/services';
2+
import {
3+
AzureStorageService,
4+
EmailService,
5+
S3StorageService,
6+
SESEmailService,
7+
StorageService,
8+
PaymentAPIService,
9+
} from '@impler/services';
310

411
let storageService: StorageService;
512
let emailService: EmailService;
13+
let paymentApiService: PaymentAPIService;
614

715
// Implementing singleton pattern for storage service
816
export function getStorageServiceClass() {
@@ -19,3 +27,10 @@ export function getEmailServiceClass() {
1927

2028
return emailService;
2129
}
30+
31+
export function getPaymentApiServiceClass() {
32+
if (paymentApiService) return paymentApiService;
33+
paymentApiService = new PaymentAPIService();
34+
35+
return paymentApiService;
36+
}

0 commit comments

Comments
 (0)