Skip to content

Commit de4d382

Browse files
feat: Added Revamped Cron Scheduling, adding 5 minute cron initially (#1091)
2 parents 3678958 + 43f0dd7 commit de4d382

File tree

13 files changed

+138
-51
lines changed

13 files changed

+138
-51
lines changed

apps/api/src/app/auto-import-jobs-schedular/usecase/auto-import-jobs-schedular.ts

Lines changed: 45 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,23 @@
11
import { UserJobEntity, UserJobRepository, WebhookDestinationRepository } from '@impler/dal';
22
import { Injectable } from '@nestjs/common';
3-
import * as dayjs from 'dayjs';
4-
import * as parser from 'cron-parser';
5-
import { Cron } from '@nestjs/schedule';
6-
import { UpdateUserJob } from 'app/import-jobs/usecase';
3+
import { Cron, CronExpression } from '@nestjs/schedule';
74
import { UserJobImportStatusEnum } from '@impler/shared';
8-
import { UserJobTriggerService } from 'app/import-jobs/usecase';
9-
import { CRON_SCHEDULE } from '@shared/constants';
10-
const parseCronExpression = require('@impler/shared/src/utils/cronstrue');
5+
import * as dayjs from 'dayjs';
6+
const parser = require('cron-parser');
7+
import parseCronExpression from '@impler/shared/src/utils/cronstrue';
8+
import { UserJobTriggerService } from 'app/import-jobs/usecase/userjob-usecase/userjob-trigger.usecase';
119

1210
@Injectable()
1311
export class AutoImportJobsSchedular {
1412
constructor(
1513
private readonly userJobRepository: UserJobRepository,
1614
private readonly webhookDestinationRepository: WebhookDestinationRepository,
17-
private readonly updateUserJob: UpdateUserJob,
1815
private readonly userJobTriggerService: UserJobTriggerService
1916
) {}
2017

21-
@Cron(CRON_SCHEDULE.AUTO_IMPORT_DEFAULT_CRON_TIME)
18+
@Cron(CronExpression.EVERY_MINUTE)
2219
async handleCronSchedular() {
23-
console.log('Crone Running');
20+
console.log('Cron Running');
2421
await this.fetchAndExecuteScheduledJobs();
2522
}
2623

@@ -29,26 +26,38 @@ export class AutoImportJobsSchedular {
2926
const userJobs = await this.userJobRepository.find({});
3027

3128
for (const userJob of userJobs) {
32-
console.log('Should run the Cron Job ?', await this.shouldCroneRun({ userJob }), userJob._id);
3329
if (await this.shouldCroneRun({ userJob })) {
3430
try {
35-
const interval = parser.parseExpression(userJob.cron);
31+
if (this.isJobDueNow(userJob.nextRun, now)) {
32+
const nextScheduledTime = this.calculateNextRun(userJob.cron, userJob.nextRun);
3633

37-
const nextScheduledTime = dayjs(interval.next().toDate().toDateString());
38-
39-
if (this.isJobDueToday(userJob.cron, now)) {
4034
await this.scheduleUpdateNextRun(userJob._id, nextScheduledTime, dayjs(userJob.endsOn));
4135

42-
await this.updateUserJob.execute(userJob._id, userJob);
43-
44-
// await this.scheduleUserJob.execute(userJob._id, userJob.cron);
4536
await this.userJobTriggerService.execute(userJob._id);
4637
}
4738
} catch (error) {}
4839
}
4940
}
5041
}
5142

43+
calculateNextRun(cronExpression: string, currentNextRun: Date): dayjs.Dayjs {
44+
try {
45+
if (!cronExpression || typeof cronExpression !== 'string' || cronExpression.trim() === '') {
46+
return dayjs(currentNextRun).add(24, 'hours');
47+
}
48+
49+
const interval = parser.parseExpression(cronExpression.trim(), {
50+
currentDate: currentNextRun,
51+
});
52+
53+
const nextRun = interval.next().toDate();
54+
55+
return dayjs(nextRun);
56+
} catch (error) {
57+
return dayjs(currentNextRun).add(24, 'hours');
58+
}
59+
}
60+
5261
async scheduleUpdateNextRun(userJobId: string, nextRunSchedule: dayjs.Dayjs, endsOn: dayjs.Dayjs) {
5362
const nextRunValue = dayjs(nextRunSchedule).isAfter(endsOn) && endsOn ? undefined : nextRunSchedule;
5463

@@ -63,16 +72,14 @@ export class AutoImportJobsSchedular {
6372
return parseCronExpression.toString(userCronExpression);
6473
}
6574

66-
private isJobDueToday(cronExpression: string, currentDate: dayjs.Dayjs): boolean {
67-
try {
68-
const interval = parser.parseExpression(cronExpression);
69-
70-
const nextScheduledTime = dayjs(interval.next().toDate());
71-
72-
return nextScheduledTime.isSame(currentDate, 'd');
73-
} catch (error) {
75+
private isJobDueNow(nextRunDate: Date, currentDate: dayjs.Dayjs): boolean {
76+
if (!nextRunDate) {
7477
return false;
7578
}
79+
80+
const nextRun = dayjs(nextRunDate);
81+
82+
return currentDate.isSame(nextRun, 'minute');
7683
}
7784

7885
async fetchDestination(templateId: string) {
@@ -96,13 +103,19 @@ export class AutoImportJobsSchedular {
96103
return false;
97104
}
98105

106+
if (userJob.endsOn && dayjs(userJob.endsOn).isBefore(now)) {
107+
return false;
108+
}
109+
110+
if (!userJob.nextRun) {
111+
return false;
112+
}
113+
99114
if (
100-
(userJob.cron && userJob.status === UserJobImportStatusEnum.SCHEDULING) ||
101-
userJob.status === UserJobImportStatusEnum.RUNNING ||
102-
(userJob.status === UserJobImportStatusEnum.COMPLETED &&
103-
(await this.fetchDestination(userJob._templateId)) &&
104-
!userJob.endsOn) ||
105-
!dayjs(userJob.endsOn).isSame(now, 'd')
115+
userJob.cron &&
116+
(userJob.status === UserJobImportStatusEnum.SCHEDULING ||
117+
userJob.status === UserJobImportStatusEnum.RUNNING ||
118+
(userJob.status === UserJobImportStatusEnum.COMPLETED && (await this.fetchDestination(userJob._templateId))))
106119
) {
107120
return true;
108121
}
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { UpdateUserJob, UserJobTriggerService } from 'app/import-jobs/usecase';
1+
import { CreateUserJob, UpdateUserJob, UserJobTriggerService } from 'app/import-jobs/usecase';
22
import { AutoImportJobsSchedular } from './auto-import-jobs-schedular';
33
import { QueueService } from '@shared/services/queue.service';
44

@@ -8,6 +8,7 @@ export const USE_CASES = [
88
UserJobTriggerService,
99
UserJobTriggerService,
1010
QueueService,
11+
CreateUserJob,
1112
//
1213
];
13-
export { AutoImportJobsSchedular, UpdateUserJob, UserJobTriggerService, QueueService };
14+
export { AutoImportJobsSchedular, UpdateUserJob, UserJobTriggerService, QueueService, CreateUserJob };

apps/api/src/app/import-jobs/dtos/create-userjob.dto.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,8 @@ export class CreateUserJobDto {
2020
@IsString()
2121
@IsOptional()
2222
authHeaderValue?: string;
23+
24+
@IsString()
25+
@IsOptional()
26+
cron?: string;
2327
}

apps/api/src/app/import-jobs/import-jobs.controller.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ export class ImportJobsController {
4343
extra: jobData.extra,
4444
externalUserId: jobData.externalUserId,
4545
authHeaderValue: jobData.authHeaderValue,
46+
cron: jobData.cron,
4647
});
4748
}
4849

apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.command.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ export class CreateUserJobCommand {
55
_templateId: string;
66
externalUserId?: string;
77
authHeaderValue?: string;
8+
cron?: string;
89
}

apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.usecase.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import { APIMessages } from '@shared/constants';
55
import { RSSXMLService } from '@impler/services';
66
import { getMimeType, isValidXMLMimeType } from '@shared/helpers/common.helper';
77
import { WebSocketService } from '@shared/services';
8-
8+
const parser = require('cron-parser');
9+
import * as dayjs from 'dayjs';
910
@Injectable()
1011
export class CreateUserJob {
1112
constructor(
@@ -20,6 +21,7 @@ export class CreateUserJob {
2021
_templateId,
2122
externalUserId,
2223
authHeaderValue,
24+
cron,
2325
}: CreateUserJobCommand): Promise<UserJobEntity> {
2426
const rssService = new RSSXMLService(url);
2527

@@ -45,13 +47,17 @@ export class CreateUserJob {
4547
formattedExtra = JSON.parse(extra);
4648
} catch (_) {}
4749

50+
const nextRun = this.calculateInitialNextRun(cron);
51+
4852
return await this.userJobRepository.create({
4953
url,
5054
extra,
5155
authHeaderValue,
5256
headings: rssXmlParsedDataKeys?.keys || [],
5357
_templateId: _templateId,
5458
externalUserId: externalUserId || (formattedExtra as unknown as Record<string, any>)?.externalUserId,
59+
nextRun,
60+
cron,
5561
});
5662
} else {
5763
throw new BadRequestException(APIMessages.INVALID_RSS_URL);
@@ -60,4 +66,26 @@ export class CreateUserJob {
6066
throw new BadRequestException(error);
6167
}
6268
}
69+
70+
calculateInitialNextRun(cronExpression: string): Date {
71+
try {
72+
if (!cronExpression || typeof cronExpression !== 'string' || cronExpression.trim() === '') {
73+
return dayjs().add(5, 'minutes').toDate();
74+
}
75+
76+
const now = dayjs();
77+
const fiveMinutesFromNow = now.add(5, 'minutes');
78+
79+
const interval = parser.parseExpression(cronExpression.trim());
80+
const nextCronTime = dayjs(interval.next().toDate());
81+
82+
if (nextCronTime.isAfter(fiveMinutesFromNow)) {
83+
return fiveMinutesFromNow.toDate();
84+
} else {
85+
return nextCronTime.toDate();
86+
}
87+
} catch (error) {
88+
return dayjs().add(5, 'minutes').toDate();
89+
}
90+
}
6391
}

apps/api/src/app/import-jobs/usecase/update-userjob/update-userjob.command.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,6 @@ export class UpdateUserJobCommand {
1010
headings: string[];
1111

1212
status: string;
13+
14+
nextRun?: Date;
1315
}

apps/api/src/app/import-jobs/usecase/update-userjob/update-userjob.usecase.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,23 @@ import { Injectable } from '@nestjs/common';
22
import { UserJobImportStatusEnum } from '@impler/shared';
33
import { UserJobEntity, UserJobRepository } from '@impler/dal';
44
import { UpdateUserJobCommand } from './update-userjob.command';
5+
import { CreateUserJob } from 'app/import-jobs/usecase/create-userjob/create-userjob.usecase';
56

67
@Injectable()
78
export class UpdateUserJob {
8-
constructor(private readonly userJobRepository: UserJobRepository) {}
9+
constructor(
10+
private readonly userJobRepository: UserJobRepository,
11+
private readonly createUserJob: CreateUserJob
12+
) {}
913

1014
async execute(jobId: string, data: UpdateUserJobCommand): Promise<UserJobEntity> {
1115
data.status = UserJobImportStatusEnum.SCHEDULING;
12-
const userJob = await this.userJobRepository.findOneAndUpdate({ _id: jobId }, data);
1316

14-
// this.scheduleRssImportJob(jobId, data.cron);
17+
if (data.cron) {
18+
data.nextRun = this.createUserJob.calculateInitialNextRun(data.cron);
19+
}
20+
21+
const userJob = await this.userJobRepository.findOneAndUpdate({ _id: jobId }, data);
1522

1623
return userJob;
1724
}

apps/api/src/app/shared/shared.module.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { Module } from '@nestjs/common';
22
import { JwtService } from '@nestjs/jwt';
3-
import { SchedulerRegistry } from '@nestjs/schedule';
43
import {
54
ColumnRepository,
65
CommonRepository,
@@ -49,11 +48,9 @@ const DAL_MODELS = [
4948
ValidatorRepository,
5049
WebhookDestinationRepository,
5150
BubbleDestinationRepository,
52-
JobMappingRepository,
5351
UserJobRepository,
54-
SchedulerRegistry,
52+
JobMappingRepository,
5553
ProjectInvitationRepository,
56-
UserJobRepository,
5754
FailedWebhookRetryRequestsRepository,
5855
WebhookLogRepository,
5956
];

apps/widget/src/components/widget/Phases/AutoImport/AutoImportPhase3/AutoImportPhase3.tsx

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { autoImportSchedulerFrequency, AUTOIMPORTSCHEDULERFREQUENCY, colors } fr
1212
import { SchedulerFrequency } from './SchedularFrequency';
1313
import useStyles from './AutoImportPhase3.Styles';
1414

15-
import { generateCronExpression } from 'util/helpers/common.helpers';
15+
import { generateCronExpression, getFormattedFirstRunTime } from 'util/helpers/common.helpers';
1616

1717
interface IAutoImportPhase3Props {
1818
onNextClick: (importJob: IUserJob) => void;
@@ -87,8 +87,9 @@ export function AutoImportPhase3({ onNextClick }: IAutoImportPhase3Props) {
8787
}, [formValues]);
8888

8989
const handleNextClick = () => {
90+
const currentCronExpression = generateCronExpression(formValues);
9091
updateUserJob({
91-
cron: cronExpression,
92+
cron: currentCronExpression || cronExpression,
9293
endsOn: formValues.endsNever ? undefined : formValues.endsOn,
9394
});
9495
};
@@ -173,9 +174,14 @@ export function AutoImportPhase3({ onNextClick }: IAutoImportPhase3Props) {
173174

174175
<Stack spacing="xl">
175176
{cronExpression && (
176-
<Text fw="bolder" color={colors.StrokeLight}>
177-
Current Schedule: {parseCronExpression.toString(cronExpression)}
178-
</Text>
177+
<Stack spacing="xs">
178+
<Text fw="bolder" color={colors.StrokeLight}>
179+
First import will run: {getFormattedFirstRunTime()}
180+
</Text>
181+
<Text fw="normal" color={colors.StrokeLight} size="sm">
182+
Then repeats: {parseCronExpression.toString(cronExpression)}
183+
</Text>
184+
</Stack>
179185
)}
180186
</Stack>
181187
</Stack>

0 commit comments

Comments
 (0)