Skip to content

Commit 2f63f81

Browse files
author
Lasim
committed
feat(backend): add cleanup old jobs cron and worker functionality
1 parent e8098dd commit 2f63f81

File tree

4 files changed

+113
-0
lines changed

4 files changed

+113
-0
lines changed

services/backend/src/cron/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { FastifyBaseLogger } from 'fastify';
22
import type { JobQueueService } from '../services/jobQueueService';
33
import { CronManager } from './cronManager';
44
import { createMcpClientActivityMetricsCleanupJob } from './jobs/mcpClientActivityMetricsCleanup';
5+
import { createCleanupOldJobsJob } from './jobs/cleanupOldJobs';
56
// import { createExampleCronJob } from './jobs/exampleJob';
67

78
/**
@@ -26,6 +27,9 @@ export function initializeCronJobs(
2627
// MCP client activity metrics cleanup (every 30 minutes)
2728
cronManager.register(createMcpClientActivityMetricsCleanupJob(jobQueueService));
2829

30+
// Cleanup old queue jobs (every 6 hours)
31+
cronManager.register(createCleanupOldJobsJob(jobQueueService));
32+
2933
// Example cron job - commented out, uncomment to test
3034
// cronManager.register(createExampleCronJob(jobQueueService));
3135

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import type { CronJob } from '../cronManager';
2+
import type { JobQueueService } from '../../services/jobQueueService';
3+
4+
export function createCleanupOldJobsJob(jobQueueService: JobQueueService): CronJob {
5+
return {
6+
name: 'cleanup-old-jobs',
7+
schedule: '0 */6 * * *', // Every 6 hours
8+
9+
task: async () => {
10+
await jobQueueService.createJob('cleanup_old_jobs', {
11+
olderThanDays: 14
12+
});
13+
}
14+
};
15+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import type { AnyDatabase } from '../db';
2+
import type { FastifyBaseLogger } from 'fastify';
3+
import type { Worker, WorkerResult } from './types';
4+
import { queueJobs, queueJobBatches } from '../db/schema.sqlite';
5+
import { lt, sql } from 'drizzle-orm';
6+
7+
interface CleanupPayload {
8+
olderThanDays: number;
9+
}
10+
11+
export class CleanupOldJobsWorker implements Worker {
12+
constructor(
13+
private readonly db: AnyDatabase,
14+
private readonly logger: FastifyBaseLogger
15+
) {}
16+
17+
async execute(payload: unknown, jobId: string): Promise<WorkerResult> {
18+
if (!this.isValidPayload(payload)) {
19+
return {
20+
success: false,
21+
message: 'Invalid payload format'
22+
};
23+
}
24+
25+
const { olderThanDays } = payload as CleanupPayload;
26+
27+
this.logger.info({
28+
jobId,
29+
olderThanDays,
30+
operation: 'cleanup_old_jobs'
31+
}, 'Starting cleanup of old queue jobs');
32+
33+
try {
34+
const cutoffDate = new Date();
35+
cutoffDate.setDate(cutoffDate.getDate() - olderThanDays);
36+
37+
// Delete old jobs
38+
const jobsResult = await this.db
39+
.delete(queueJobs)
40+
.where(lt(queueJobs.created_at, cutoffDate));
41+
42+
const jobsDeleted = (jobsResult.changes || jobsResult.rowsAffected || 0);
43+
44+
// Delete orphaned batches (batches with no jobs)
45+
const batchesResult = await this.db
46+
.delete(queueJobBatches)
47+
.where(
48+
sql`${queueJobBatches.id} NOT IN (SELECT DISTINCT ${queueJobs.batch_id} FROM ${queueJobs} WHERE ${queueJobs.batch_id} IS NOT NULL)`
49+
);
50+
51+
const batchesDeleted = (batchesResult.changes || batchesResult.rowsAffected || 0);
52+
53+
this.logger.info({
54+
jobId,
55+
jobsDeleted,
56+
batchesDeleted,
57+
cutoffDate: cutoffDate.toISOString(),
58+
operation: 'cleanup_old_jobs'
59+
}, 'Cleanup completed successfully');
60+
61+
return {
62+
success: true,
63+
message: `Deleted ${jobsDeleted} jobs and ${batchesDeleted} orphaned batches older than ${olderThanDays} days`,
64+
data: {
65+
jobsDeleted,
66+
batchesDeleted,
67+
cutoffDate: cutoffDate.toISOString()
68+
}
69+
};
70+
} catch (error) {
71+
this.logger.error({
72+
jobId,
73+
error,
74+
operation: 'cleanup_old_jobs'
75+
}, 'Cleanup job failed');
76+
77+
throw error;
78+
}
79+
}
80+
81+
private isValidPayload(payload: unknown): payload is CleanupPayload {
82+
if (typeof payload !== 'object' || payload === null) return false;
83+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
84+
const p = payload as any;
85+
return typeof p.olderThanDays === 'number' && p.olderThanDays > 0;
86+
}
87+
}

services/backend/src/workers/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { McpServerSyncWorker } from './mcpServerSyncWorker';
55
import { RegistryCoordinatorWorker } from './registryCoordinatorWorker';
66
import { EmailWorker } from './emailWorker';
77
import { McpClientActivityMetricsCleanupWorker } from './mcpClientActivityMetricsCleanupWorker';
8+
import { CleanupOldJobsWorker } from './cleanupOldJobsWorker';
89

910
/**
1011
* Register all workers with the job processor
@@ -56,6 +57,12 @@ export function registerWorkers(
5657
new McpClientActivityMetricsCleanupWorker(db, logger)
5758
);
5859

60+
// Register Cleanup Old Jobs Worker
61+
processor.registerWorker(
62+
'cleanup_old_jobs',
63+
new CleanupOldJobsWorker(db, logger)
64+
);
65+
5966
// Log all registered workers dynamically
6067
const registeredWorkers = processor.getRegisteredWorkerTypes();
6168
logger.info({

0 commit comments

Comments
 (0)