From 43f0dd7566c7db512b23168b27ed28caf856eed4 Mon Sep 17 00:00:00 2001 From: Mayur Date: Wed, 24 Sep 2025 19:54:46 +0530 Subject: [PATCH] feat: Added Revamped Cron Scheduling, adding 5 minute cron initially --- .../usecase/auto-import-jobs-schedular.ts | 77 +++++++++++-------- .../usecase/index.ts | 5 +- .../import-jobs/dtos/create-userjob.dto.ts | 4 + .../app/import-jobs/import-jobs.controller.ts | 1 + .../create-userjob/create-userjob.command.ts | 1 + .../create-userjob/create-userjob.usecase.ts | 30 +++++++- .../update-userjob/update-userjob.command.ts | 2 + .../update-userjob/update-userjob.usecase.ts | 13 +++- apps/api/src/app/shared/shared.module.ts | 5 +- .../AutoImportPhase3/AutoImportPhase3.tsx | 16 ++-- .../AutoImportPhase4/AutoImportPhase4.tsx | 7 +- apps/widget/src/util/api/api.service.ts | 2 + .../widget/src/util/helpers/common.helpers.ts | 26 ++++++- 13 files changed, 138 insertions(+), 51 deletions(-) diff --git a/apps/api/src/app/auto-import-jobs-schedular/usecase/auto-import-jobs-schedular.ts b/apps/api/src/app/auto-import-jobs-schedular/usecase/auto-import-jobs-schedular.ts index 8d094de40..770d3935b 100644 --- a/apps/api/src/app/auto-import-jobs-schedular/usecase/auto-import-jobs-schedular.ts +++ b/apps/api/src/app/auto-import-jobs-schedular/usecase/auto-import-jobs-schedular.ts @@ -1,26 +1,23 @@ import { UserJobEntity, UserJobRepository, WebhookDestinationRepository } from '@impler/dal'; import { Injectable } from '@nestjs/common'; -import * as dayjs from 'dayjs'; -import * as parser from 'cron-parser'; -import { Cron } from '@nestjs/schedule'; -import { UpdateUserJob } from 'app/import-jobs/usecase'; +import { Cron, CronExpression } from '@nestjs/schedule'; import { UserJobImportStatusEnum } from '@impler/shared'; -import { UserJobTriggerService } from 'app/import-jobs/usecase'; -import { CRON_SCHEDULE } from '@shared/constants'; -const parseCronExpression = require('@impler/shared/src/utils/cronstrue'); +import * as dayjs from 'dayjs'; +const parser = require('cron-parser'); +import parseCronExpression from '@impler/shared/src/utils/cronstrue'; +import { UserJobTriggerService } from 'app/import-jobs/usecase/userjob-usecase/userjob-trigger.usecase'; @Injectable() export class AutoImportJobsSchedular { constructor( private readonly userJobRepository: UserJobRepository, private readonly webhookDestinationRepository: WebhookDestinationRepository, - private readonly updateUserJob: UpdateUserJob, private readonly userJobTriggerService: UserJobTriggerService ) {} - @Cron(CRON_SCHEDULE.AUTO_IMPORT_DEFAULT_CRON_TIME) + @Cron(CronExpression.EVERY_MINUTE) async handleCronSchedular() { - console.log('Crone Running'); + console.log('Cron Running'); await this.fetchAndExecuteScheduledJobs(); } @@ -29,19 +26,13 @@ export class AutoImportJobsSchedular { const userJobs = await this.userJobRepository.find({}); for (const userJob of userJobs) { - console.log('Should run the Cron Job ?', await this.shouldCroneRun({ userJob }), userJob._id); if (await this.shouldCroneRun({ userJob })) { try { - const interval = parser.parseExpression(userJob.cron); + if (this.isJobDueNow(userJob.nextRun, now)) { + const nextScheduledTime = this.calculateNextRun(userJob.cron, userJob.nextRun); - const nextScheduledTime = dayjs(interval.next().toDate().toDateString()); - - if (this.isJobDueToday(userJob.cron, now)) { await this.scheduleUpdateNextRun(userJob._id, nextScheduledTime, dayjs(userJob.endsOn)); - await this.updateUserJob.execute(userJob._id, userJob); - - // await this.scheduleUserJob.execute(userJob._id, userJob.cron); await this.userJobTriggerService.execute(userJob._id); } } catch (error) {} @@ -49,6 +40,24 @@ export class AutoImportJobsSchedular { } } + calculateNextRun(cronExpression: string, currentNextRun: Date): dayjs.Dayjs { + try { + if (!cronExpression || typeof cronExpression !== 'string' || cronExpression.trim() === '') { + return dayjs(currentNextRun).add(24, 'hours'); + } + + const interval = parser.parseExpression(cronExpression.trim(), { + currentDate: currentNextRun, + }); + + const nextRun = interval.next().toDate(); + + return dayjs(nextRun); + } catch (error) { + return dayjs(currentNextRun).add(24, 'hours'); + } + } + async scheduleUpdateNextRun(userJobId: string, nextRunSchedule: dayjs.Dayjs, endsOn: dayjs.Dayjs) { const nextRunValue = dayjs(nextRunSchedule).isAfter(endsOn) && endsOn ? undefined : nextRunSchedule; @@ -63,16 +72,14 @@ export class AutoImportJobsSchedular { return parseCronExpression.toString(userCronExpression); } - private isJobDueToday(cronExpression: string, currentDate: dayjs.Dayjs): boolean { - try { - const interval = parser.parseExpression(cronExpression); - - const nextScheduledTime = dayjs(interval.next().toDate()); - - return nextScheduledTime.isSame(currentDate, 'd'); - } catch (error) { + private isJobDueNow(nextRunDate: Date, currentDate: dayjs.Dayjs): boolean { + if (!nextRunDate) { return false; } + + const nextRun = dayjs(nextRunDate); + + return currentDate.isSame(nextRun, 'minute'); } async fetchDestination(templateId: string) { @@ -96,13 +103,19 @@ export class AutoImportJobsSchedular { return false; } + if (userJob.endsOn && dayjs(userJob.endsOn).isBefore(now)) { + return false; + } + + if (!userJob.nextRun) { + return false; + } + if ( - (userJob.cron && userJob.status === UserJobImportStatusEnum.SCHEDULING) || - userJob.status === UserJobImportStatusEnum.RUNNING || - (userJob.status === UserJobImportStatusEnum.COMPLETED && - (await this.fetchDestination(userJob._templateId)) && - !userJob.endsOn) || - !dayjs(userJob.endsOn).isSame(now, 'd') + userJob.cron && + (userJob.status === UserJobImportStatusEnum.SCHEDULING || + userJob.status === UserJobImportStatusEnum.RUNNING || + (userJob.status === UserJobImportStatusEnum.COMPLETED && (await this.fetchDestination(userJob._templateId)))) ) { return true; } diff --git a/apps/api/src/app/auto-import-jobs-schedular/usecase/index.ts b/apps/api/src/app/auto-import-jobs-schedular/usecase/index.ts index fcdad8e7c..7731af050 100644 --- a/apps/api/src/app/auto-import-jobs-schedular/usecase/index.ts +++ b/apps/api/src/app/auto-import-jobs-schedular/usecase/index.ts @@ -1,4 +1,4 @@ -import { UpdateUserJob, UserJobTriggerService } from 'app/import-jobs/usecase'; +import { CreateUserJob, UpdateUserJob, UserJobTriggerService } from 'app/import-jobs/usecase'; import { AutoImportJobsSchedular } from './auto-import-jobs-schedular'; import { QueueService } from '@shared/services/queue.service'; @@ -8,6 +8,7 @@ export const USE_CASES = [ UserJobTriggerService, UserJobTriggerService, QueueService, + CreateUserJob, // ]; -export { AutoImportJobsSchedular, UpdateUserJob, UserJobTriggerService, QueueService }; +export { AutoImportJobsSchedular, UpdateUserJob, UserJobTriggerService, QueueService, CreateUserJob }; diff --git a/apps/api/src/app/import-jobs/dtos/create-userjob.dto.ts b/apps/api/src/app/import-jobs/dtos/create-userjob.dto.ts index 3dded7d18..909b53fb2 100644 --- a/apps/api/src/app/import-jobs/dtos/create-userjob.dto.ts +++ b/apps/api/src/app/import-jobs/dtos/create-userjob.dto.ts @@ -20,4 +20,8 @@ export class CreateUserJobDto { @IsString() @IsOptional() authHeaderValue?: string; + + @IsString() + @IsOptional() + cron?: string; } diff --git a/apps/api/src/app/import-jobs/import-jobs.controller.ts b/apps/api/src/app/import-jobs/import-jobs.controller.ts index 64b4920db..db7780550 100644 --- a/apps/api/src/app/import-jobs/import-jobs.controller.ts +++ b/apps/api/src/app/import-jobs/import-jobs.controller.ts @@ -43,6 +43,7 @@ export class ImportJobsController { extra: jobData.extra, externalUserId: jobData.externalUserId, authHeaderValue: jobData.authHeaderValue, + cron: jobData.cron, }); } diff --git a/apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.command.ts b/apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.command.ts index 60d905c3f..3966d9782 100644 --- a/apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.command.ts +++ b/apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.command.ts @@ -5,4 +5,5 @@ export class CreateUserJobCommand { _templateId: string; externalUserId?: string; authHeaderValue?: string; + cron?: string; } diff --git a/apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.usecase.ts b/apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.usecase.ts index d49002f9d..135fe2e82 100644 --- a/apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.usecase.ts +++ b/apps/api/src/app/import-jobs/usecase/create-userjob/create-userjob.usecase.ts @@ -5,7 +5,8 @@ import { APIMessages } from '@shared/constants'; import { RSSXMLService } from '@impler/services'; import { getMimeType, isValidXMLMimeType } from '@shared/helpers/common.helper'; import { WebSocketService } from '@shared/services'; - +const parser = require('cron-parser'); +import * as dayjs from 'dayjs'; @Injectable() export class CreateUserJob { constructor( @@ -20,6 +21,7 @@ export class CreateUserJob { _templateId, externalUserId, authHeaderValue, + cron, }: CreateUserJobCommand): Promise { const rssService = new RSSXMLService(url); @@ -45,6 +47,8 @@ export class CreateUserJob { formattedExtra = JSON.parse(extra); } catch (_) {} + const nextRun = this.calculateInitialNextRun(cron); + return await this.userJobRepository.create({ url, extra, @@ -52,6 +56,8 @@ export class CreateUserJob { headings: rssXmlParsedDataKeys?.keys || [], _templateId: _templateId, externalUserId: externalUserId || (formattedExtra as unknown as Record)?.externalUserId, + nextRun, + cron, }); } else { throw new BadRequestException(APIMessages.INVALID_RSS_URL); @@ -60,4 +66,26 @@ export class CreateUserJob { throw new BadRequestException(error); } } + + calculateInitialNextRun(cronExpression: string): Date { + try { + if (!cronExpression || typeof cronExpression !== 'string' || cronExpression.trim() === '') { + return dayjs().add(5, 'minutes').toDate(); + } + + const now = dayjs(); + const fiveMinutesFromNow = now.add(5, 'minutes'); + + const interval = parser.parseExpression(cronExpression.trim()); + const nextCronTime = dayjs(interval.next().toDate()); + + if (nextCronTime.isAfter(fiveMinutesFromNow)) { + return fiveMinutesFromNow.toDate(); + } else { + return nextCronTime.toDate(); + } + } catch (error) { + return dayjs().add(5, 'minutes').toDate(); + } + } } diff --git a/apps/api/src/app/import-jobs/usecase/update-userjob/update-userjob.command.ts b/apps/api/src/app/import-jobs/usecase/update-userjob/update-userjob.command.ts index 3a3b4b308..aa8c4750d 100644 --- a/apps/api/src/app/import-jobs/usecase/update-userjob/update-userjob.command.ts +++ b/apps/api/src/app/import-jobs/usecase/update-userjob/update-userjob.command.ts @@ -10,4 +10,6 @@ export class UpdateUserJobCommand { headings: string[]; status: string; + + nextRun?: Date; } diff --git a/apps/api/src/app/import-jobs/usecase/update-userjob/update-userjob.usecase.ts b/apps/api/src/app/import-jobs/usecase/update-userjob/update-userjob.usecase.ts index cd8da9e0e..e09d8d197 100644 --- a/apps/api/src/app/import-jobs/usecase/update-userjob/update-userjob.usecase.ts +++ b/apps/api/src/app/import-jobs/usecase/update-userjob/update-userjob.usecase.ts @@ -2,16 +2,23 @@ import { Injectable } from '@nestjs/common'; import { UserJobImportStatusEnum } from '@impler/shared'; import { UserJobEntity, UserJobRepository } from '@impler/dal'; import { UpdateUserJobCommand } from './update-userjob.command'; +import { CreateUserJob } from 'app/import-jobs/usecase/create-userjob/create-userjob.usecase'; @Injectable() export class UpdateUserJob { - constructor(private readonly userJobRepository: UserJobRepository) {} + constructor( + private readonly userJobRepository: UserJobRepository, + private readonly createUserJob: CreateUserJob + ) {} async execute(jobId: string, data: UpdateUserJobCommand): Promise { data.status = UserJobImportStatusEnum.SCHEDULING; - const userJob = await this.userJobRepository.findOneAndUpdate({ _id: jobId }, data); - // this.scheduleRssImportJob(jobId, data.cron); + if (data.cron) { + data.nextRun = this.createUserJob.calculateInitialNextRun(data.cron); + } + + const userJob = await this.userJobRepository.findOneAndUpdate({ _id: jobId }, data); return userJob; } diff --git a/apps/api/src/app/shared/shared.module.ts b/apps/api/src/app/shared/shared.module.ts index 3a46f67dc..4e6b6ab2c 100644 --- a/apps/api/src/app/shared/shared.module.ts +++ b/apps/api/src/app/shared/shared.module.ts @@ -1,6 +1,5 @@ import { Module } from '@nestjs/common'; import { JwtService } from '@nestjs/jwt'; -import { SchedulerRegistry } from '@nestjs/schedule'; import { ColumnRepository, CommonRepository, @@ -49,11 +48,9 @@ const DAL_MODELS = [ ValidatorRepository, WebhookDestinationRepository, BubbleDestinationRepository, - JobMappingRepository, UserJobRepository, - SchedulerRegistry, + JobMappingRepository, ProjectInvitationRepository, - UserJobRepository, FailedWebhookRetryRequestsRepository, WebhookLogRepository, ]; diff --git a/apps/widget/src/components/widget/Phases/AutoImport/AutoImportPhase3/AutoImportPhase3.tsx b/apps/widget/src/components/widget/Phases/AutoImport/AutoImportPhase3/AutoImportPhase3.tsx index cb076db4f..79ddca02b 100644 --- a/apps/widget/src/components/widget/Phases/AutoImport/AutoImportPhase3/AutoImportPhase3.tsx +++ b/apps/widget/src/components/widget/Phases/AutoImport/AutoImportPhase3/AutoImportPhase3.tsx @@ -12,7 +12,7 @@ import { autoImportSchedulerFrequency, AUTOIMPORTSCHEDULERFREQUENCY, colors } fr import { SchedulerFrequency } from './SchedularFrequency'; import useStyles from './AutoImportPhase3.Styles'; -import { generateCronExpression } from 'util/helpers/common.helpers'; +import { generateCronExpression, getFormattedFirstRunTime } from 'util/helpers/common.helpers'; interface IAutoImportPhase3Props { onNextClick: (importJob: IUserJob) => void; @@ -87,8 +87,9 @@ export function AutoImportPhase3({ onNextClick }: IAutoImportPhase3Props) { }, [formValues]); const handleNextClick = () => { + const currentCronExpression = generateCronExpression(formValues); updateUserJob({ - cron: cronExpression, + cron: currentCronExpression || cronExpression, endsOn: formValues.endsNever ? undefined : formValues.endsOn, }); }; @@ -173,9 +174,14 @@ export function AutoImportPhase3({ onNextClick }: IAutoImportPhase3Props) { {cronExpression && ( - - Current Schedule: {parseCronExpression.toString(cronExpression)} - + + + First import will run: {getFormattedFirstRunTime()} + + + Then repeats: {parseCronExpression.toString(cronExpression)} + + )} diff --git a/apps/widget/src/components/widget/Phases/AutoImport/AutoImportPhase4/AutoImportPhase4.tsx b/apps/widget/src/components/widget/Phases/AutoImport/AutoImportPhase4/AutoImportPhase4.tsx index 38e2c6606..48fb77af8 100644 --- a/apps/widget/src/components/widget/Phases/AutoImport/AutoImportPhase4/AutoImportPhase4.tsx +++ b/apps/widget/src/components/widget/Phases/AutoImport/AutoImportPhase4/AutoImportPhase4.tsx @@ -3,6 +3,7 @@ const parseCronExpression = require('@impler/shared/src/utils/cronstrue'); import { colors } from '@config'; import { CheckIcon } from '@icons'; import { useJobsInfo } from '@store/jobinfo.context'; +import { getFormattedFirstRunTime } from 'util/helpers/common.helpers'; import { PhasesEnum } from '@types'; import { Footer } from 'components/Common/Footer'; @@ -14,6 +15,10 @@ interface IAutoImportPhase4Props { export function AutoImportPhase4({ onCloseClick }: IAutoImportPhase4Props) { const { jobsInfo } = useJobsInfo(); + const getNextRunTime = () => { + return getFormattedFirstRunTime(); + }; + return ( <> @@ -33,7 +38,7 @@ export function AutoImportPhase4({ onCloseClick }: IAutoImportPhase4Props) { {jobsInfo.url} - Will be executed on {parseCronExpression.toString(jobsInfo.cron)} + First import will run at {getNextRunTime()}, then repeats {parseCronExpression.toString(jobsInfo.cron)}