Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
import { UserJobEntity, UserJobRepository, WebhookDestinationRepository } from '@impler/dal';
import { Injectable } from '@nestjs/common';
import * as dayjs from 'dayjs';
import * as parser from 'cron-parser';
import { Cron } from '@nestjs/schedule';
import { UpdateUserJob } from 'app/import-jobs/usecase';
import { Cron, CronExpression } from '@nestjs/schedule';
import { UserJobImportStatusEnum } from '@impler/shared';
import { UserJobTriggerService } from 'app/import-jobs/usecase';
import { CRON_SCHEDULE } from '@shared/constants';
const parseCronExpression = require('@impler/shared/src/utils/cronstrue');
import * as dayjs from 'dayjs';
const parser = require('cron-parser');
import parseCronExpression from '@impler/shared/src/utils/cronstrue';
import { UserJobTriggerService } from 'app/import-jobs/usecase/userjob-usecase/userjob-trigger.usecase';

@Injectable()
export class AutoImportJobsSchedular {
constructor(
private readonly userJobRepository: UserJobRepository,
private readonly webhookDestinationRepository: WebhookDestinationRepository,
private readonly updateUserJob: UpdateUserJob,
private readonly userJobTriggerService: UserJobTriggerService
) {}

@Cron(CRON_SCHEDULE.AUTO_IMPORT_DEFAULT_CRON_TIME)
@Cron(CronExpression.EVERY_MINUTE)
async handleCronSchedular() {
console.log('Crone Running');
console.log('Cron Running');
await this.fetchAndExecuteScheduledJobs();
}

Expand All @@ -29,26 +26,38 @@ export class AutoImportJobsSchedular {
const userJobs = await this.userJobRepository.find({});

for (const userJob of userJobs) {
console.log('Should run the Cron Job ?', await this.shouldCroneRun({ userJob }), userJob._id);
if (await this.shouldCroneRun({ userJob })) {
try {
const interval = parser.parseExpression(userJob.cron);
if (this.isJobDueNow(userJob.nextRun, now)) {
const nextScheduledTime = this.calculateNextRun(userJob.cron, userJob.nextRun);

const nextScheduledTime = dayjs(interval.next().toDate().toDateString());

if (this.isJobDueToday(userJob.cron, now)) {
await this.scheduleUpdateNextRun(userJob._id, nextScheduledTime, dayjs(userJob.endsOn));

await this.updateUserJob.execute(userJob._id, userJob);

// await this.scheduleUserJob.execute(userJob._id, userJob.cron);
await this.userJobTriggerService.execute(userJob._id);
}
} catch (error) {}
}
}
}

calculateNextRun(cronExpression: string, currentNextRun: Date): dayjs.Dayjs {
try {
if (!cronExpression || typeof cronExpression !== 'string' || cronExpression.trim() === '') {
return dayjs(currentNextRun).add(24, 'hours');
}

const interval = parser.parseExpression(cronExpression.trim(), {
currentDate: currentNextRun,
});

const nextRun = interval.next().toDate();

return dayjs(nextRun);
} catch (error) {
return dayjs(currentNextRun).add(24, 'hours');
}
}

async scheduleUpdateNextRun(userJobId: string, nextRunSchedule: dayjs.Dayjs, endsOn: dayjs.Dayjs) {
const nextRunValue = dayjs(nextRunSchedule).isAfter(endsOn) && endsOn ? undefined : nextRunSchedule;

Expand All @@ -63,16 +72,14 @@ export class AutoImportJobsSchedular {
return parseCronExpression.toString(userCronExpression);
}

private isJobDueToday(cronExpression: string, currentDate: dayjs.Dayjs): boolean {
try {
const interval = parser.parseExpression(cronExpression);

const nextScheduledTime = dayjs(interval.next().toDate());

return nextScheduledTime.isSame(currentDate, 'd');
} catch (error) {
private isJobDueNow(nextRunDate: Date, currentDate: dayjs.Dayjs): boolean {
if (!nextRunDate) {
return false;
}

const nextRun = dayjs(nextRunDate);

return currentDate.isSame(nextRun, 'minute');
}

async fetchDestination(templateId: string) {
Expand All @@ -96,13 +103,19 @@ export class AutoImportJobsSchedular {
return false;
}

if (userJob.endsOn && dayjs(userJob.endsOn).isBefore(now)) {
return false;
}

if (!userJob.nextRun) {
return false;
}

if (
(userJob.cron && userJob.status === UserJobImportStatusEnum.SCHEDULING) ||
userJob.status === UserJobImportStatusEnum.RUNNING ||
(userJob.status === UserJobImportStatusEnum.COMPLETED &&
(await this.fetchDestination(userJob._templateId)) &&
!userJob.endsOn) ||
!dayjs(userJob.endsOn).isSame(now, 'd')
userJob.cron &&
(userJob.status === UserJobImportStatusEnum.SCHEDULING ||
userJob.status === UserJobImportStatusEnum.RUNNING ||
(userJob.status === UserJobImportStatusEnum.COMPLETED && (await this.fetchDestination(userJob._templateId))))
) {
return true;
}
Expand Down
5 changes: 3 additions & 2 deletions apps/api/src/app/auto-import-jobs-schedular/usecase/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { UpdateUserJob, UserJobTriggerService } from 'app/import-jobs/usecase';
import { CreateUserJob, UpdateUserJob, UserJobTriggerService } from 'app/import-jobs/usecase';
import { AutoImportJobsSchedular } from './auto-import-jobs-schedular';
import { QueueService } from '@shared/services/queue.service';

Expand All @@ -8,6 +8,7 @@ export const USE_CASES = [
UserJobTriggerService,
UserJobTriggerService,
QueueService,
CreateUserJob,
//
];
export { AutoImportJobsSchedular, UpdateUserJob, UserJobTriggerService, QueueService };
export { AutoImportJobsSchedular, UpdateUserJob, UserJobTriggerService, QueueService, CreateUserJob };
4 changes: 4 additions & 0 deletions apps/api/src/app/import-jobs/dtos/create-userjob.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ export class CreateUserJobDto {
@IsString()
@IsOptional()
authHeaderValue?: string;

@IsString()
@IsOptional()
cron?: string;
}
1 change: 1 addition & 0 deletions apps/api/src/app/import-jobs/import-jobs.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export class ImportJobsController {
extra: jobData.extra,
externalUserId: jobData.externalUserId,
authHeaderValue: jobData.authHeaderValue,
cron: jobData.cron,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ export class CreateUserJobCommand {
_templateId: string;
externalUserId?: string;
authHeaderValue?: string;
cron?: string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { APIMessages } from '@shared/constants';
import { RSSXMLService } from '@impler/services';
import { getMimeType, isValidXMLMimeType } from '@shared/helpers/common.helper';
import { WebSocketService } from '@shared/services';

const parser = require('cron-parser');
import * as dayjs from 'dayjs';
@Injectable()
export class CreateUserJob {
constructor(
Expand All @@ -20,6 +21,7 @@ export class CreateUserJob {
_templateId,
externalUserId,
authHeaderValue,
cron,
}: CreateUserJobCommand): Promise<UserJobEntity> {
const rssService = new RSSXMLService(url);

Expand All @@ -45,13 +47,17 @@ export class CreateUserJob {
formattedExtra = JSON.parse(extra);
} catch (_) {}

const nextRun = this.calculateInitialNextRun(cron);

return await this.userJobRepository.create({
url,
extra,
authHeaderValue,
headings: rssXmlParsedDataKeys?.keys || [],
_templateId: _templateId,
externalUserId: externalUserId || (formattedExtra as unknown as Record<string, any>)?.externalUserId,
nextRun,
cron,
});
} else {
throw new BadRequestException(APIMessages.INVALID_RSS_URL);
Expand All @@ -60,4 +66,26 @@ export class CreateUserJob {
throw new BadRequestException(error);
}
}

calculateInitialNextRun(cronExpression: string): Date {
try {
if (!cronExpression || typeof cronExpression !== 'string' || cronExpression.trim() === '') {
return dayjs().add(5, 'minutes').toDate();
}

const now = dayjs();
const fiveMinutesFromNow = now.add(5, 'minutes');

const interval = parser.parseExpression(cronExpression.trim());
const nextCronTime = dayjs(interval.next().toDate());

if (nextCronTime.isAfter(fiveMinutesFromNow)) {
return fiveMinutesFromNow.toDate();
} else {
return nextCronTime.toDate();
}
} catch (error) {
return dayjs().add(5, 'minutes').toDate();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ export class UpdateUserJobCommand {
headings: string[];

status: string;

nextRun?: Date;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,23 @@ import { Injectable } from '@nestjs/common';
import { UserJobImportStatusEnum } from '@impler/shared';
import { UserJobEntity, UserJobRepository } from '@impler/dal';
import { UpdateUserJobCommand } from './update-userjob.command';
import { CreateUserJob } from 'app/import-jobs/usecase/create-userjob/create-userjob.usecase';

@Injectable()
export class UpdateUserJob {
constructor(private readonly userJobRepository: UserJobRepository) {}
constructor(
private readonly userJobRepository: UserJobRepository,
private readonly createUserJob: CreateUserJob
) {}

async execute(jobId: string, data: UpdateUserJobCommand): Promise<UserJobEntity> {
data.status = UserJobImportStatusEnum.SCHEDULING;
const userJob = await this.userJobRepository.findOneAndUpdate({ _id: jobId }, data);

// this.scheduleRssImportJob(jobId, data.cron);
if (data.cron) {
data.nextRun = this.createUserJob.calculateInitialNextRun(data.cron);
}

const userJob = await this.userJobRepository.findOneAndUpdate({ _id: jobId }, data);

return userJob;
}
Expand Down
5 changes: 1 addition & 4 deletions apps/api/src/app/shared/shared.module.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Module } from '@nestjs/common';
import { JwtService } from '@nestjs/jwt';
import { SchedulerRegistry } from '@nestjs/schedule';
import {
ColumnRepository,
CommonRepository,
Expand Down Expand Up @@ -49,11 +48,9 @@ const DAL_MODELS = [
ValidatorRepository,
WebhookDestinationRepository,
BubbleDestinationRepository,
JobMappingRepository,
UserJobRepository,
SchedulerRegistry,
JobMappingRepository,
ProjectInvitationRepository,
UserJobRepository,
FailedWebhookRetryRequestsRepository,
WebhookLogRepository,
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { autoImportSchedulerFrequency, AUTOIMPORTSCHEDULERFREQUENCY, colors } fr
import { SchedulerFrequency } from './SchedularFrequency';
import useStyles from './AutoImportPhase3.Styles';

import { generateCronExpression } from 'util/helpers/common.helpers';
import { generateCronExpression, getFormattedFirstRunTime } from 'util/helpers/common.helpers';

interface IAutoImportPhase3Props {
onNextClick: (importJob: IUserJob) => void;
Expand Down Expand Up @@ -87,8 +87,9 @@ export function AutoImportPhase3({ onNextClick }: IAutoImportPhase3Props) {
}, [formValues]);

const handleNextClick = () => {
const currentCronExpression = generateCronExpression(formValues);
updateUserJob({
cron: cronExpression,
cron: currentCronExpression || cronExpression,
endsOn: formValues.endsNever ? undefined : formValues.endsOn,
});
};
Expand Down Expand Up @@ -173,9 +174,14 @@ export function AutoImportPhase3({ onNextClick }: IAutoImportPhase3Props) {

<Stack spacing="xl">
{cronExpression && (
<Text fw="bolder" color={colors.StrokeLight}>
Current Schedule: {parseCronExpression.toString(cronExpression)}
</Text>
<Stack spacing="xs">
<Text fw="bolder" color={colors.StrokeLight}>
First import will run: {getFormattedFirstRunTime()}
</Text>
<Text fw="normal" color={colors.StrokeLight} size="sm">
Then repeats: {parseCronExpression.toString(cronExpression)}
</Text>
</Stack>
)}
</Stack>
</Stack>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const parseCronExpression = require('@impler/shared/src/utils/cronstrue');
import { colors } from '@config';
import { CheckIcon } from '@icons';
import { useJobsInfo } from '@store/jobinfo.context';
import { getFormattedFirstRunTime } from 'util/helpers/common.helpers';

import { PhasesEnum } from '@types';
import { Footer } from 'components/Common/Footer';
Expand All @@ -14,6 +15,10 @@ interface IAutoImportPhase4Props {
export function AutoImportPhase4({ onCloseClick }: IAutoImportPhase4Props) {
const { jobsInfo } = useJobsInfo();

const getNextRunTime = () => {
return getFormattedFirstRunTime();
};

return (
<>
<Stack mt="md" align="center" style={{ flexGrow: 1 }}>
Expand All @@ -33,7 +38,7 @@ export function AutoImportPhase4({ onCloseClick }: IAutoImportPhase4Props) {
<Text color={colors.lightBlue}>{jobsInfo.url}</Text>
</Paper>
<Text fw="bold" color={colors.softGrey}>
Will be executed on {parseCronExpression.toString(jobsInfo.cron)}
First import will run at {getNextRunTime()}, then repeats {parseCronExpression.toString(jobsInfo.cron)}
</Text>
</Stack>
<Footer onNextClick={onCloseClick} active={PhasesEnum.CONFIRM} />
Expand Down
2 changes: 2 additions & 0 deletions apps/widget/src/util/api/api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ export class ApiService {
extra?: string;
schema?: string;
output?: string;
cron?: string;
}) {
return this.httpClient.post(`/import-jobs/${data.templateId}`, {
webSocketSessionId: data.webSocketSessionId,
Expand All @@ -218,6 +219,7 @@ export class ApiService {
extra: data.extra,
schema: data.schema,
output: data.output,
cron: data.cron,
});
}

Expand Down
Loading
Loading