Skip to content

Commit 811e57a

Browse files
committed
feat: schedule daily log cleanup
1 parent 5dc831d commit 811e57a

File tree

5 files changed

+173
-0
lines changed

5 files changed

+173
-0
lines changed

apps/workers/src/workers/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,11 @@ export async function setupSchedules() {
8282
{ pattern: '0 0 4 * * *' },
8383
{ opts: { attempts: 1 } },
8484
)
85+
86+
// Every day at 1 AM - Schedule cleanup jobs for free plan workspaces
87+
await maintenanceQueue.upsertJobScheduler(
88+
'scheduleWorkspaceCleanupJobs',
89+
{ pattern: '0 0 1 * * *' },
90+
{ opts: { attempts: 1 } },
91+
)
8592
}

apps/workers/src/workers/worker-definitions/maintenanceWorker.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ const jobMappings = {
1414
refreshProjectStatsCacheJob: jobs.refreshProjectStatsCacheJob,
1515
refreshDocumentsStatsCacheJob: jobs.refreshDocumentsStatsCacheJob,
1616
refreshDocumentStatsCacheJob: jobs.refreshDocumentStatsCacheJob,
17+
scheduleWorkspaceCleanupJobs: jobs.scheduleWorkspaceCleanupJobs,
18+
cleanupWorkspaceOldLogsJob: jobs.cleanupWorkspaceOldLogsJob,
1719
}
1820

1921
export function startMaintenanceWorker() {
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
import { Job } from 'bullmq'
2+
import { and, eq, exists, lt, sql } from 'drizzle-orm'
3+
import { commits, documentLogs, projects, providerLogs } from '../../../schema'
4+
import Transaction from '../../../lib/Transaction'
5+
import { Result } from '../../../lib/Result'
6+
7+
export type CleanupWorkspaceOldLogsJobData = {
8+
workspaceId: number
9+
}
10+
11+
/**
12+
* Job that deletes document logs and associated provider logs older than 30 days
13+
* for a specific workspace.
14+
*
15+
* This job uses optimized SQL operations to avoid loading data into memory:
16+
* 1. Calculates the cutoff date (30 days ago)
17+
* 2. Deletes provider logs using a correlated subquery
18+
* 3. Deletes document logs using a correlated subquery
19+
* 4. Uses batch processing if needed for very large datasets
20+
*/
21+
export const cleanupWorkspaceOldLogsJob = async (
22+
job: Job<CleanupWorkspaceOldLogsJobData>,
23+
) => {
24+
const { workspaceId } = job.data
25+
26+
// Calculate cutoff date (30 days ago)
27+
const cutoffDate = new Date()
28+
cutoffDate.setDate(cutoffDate.getDate() - 30)
29+
30+
const transaction = new Transaction()
31+
return await transaction.call(async (tx) => {
32+
let totalDeletedProviderLogs = 0
33+
let totalDeletedDocumentLogs = 0
34+
35+
// Delete provider logs in batches to avoid memory issues and long-running transactions
36+
const batchSize = 1000
37+
let deletedProviderLogsBatch: number
38+
39+
do {
40+
// Delete provider logs associated with old document logs from this workspace
41+
// Use EXISTS with a correlated subquery to avoid loading UUIDs into memory
42+
const deletedProviderLogsResult = await tx
43+
.delete(providerLogs)
44+
.where(
45+
and(
46+
eq(providerLogs.workspaceId, workspaceId),
47+
exists(
48+
tx
49+
.select({ dummy: sql`1` })
50+
.from(documentLogs)
51+
.innerJoin(commits, eq(commits.id, documentLogs.commitId))
52+
.innerJoin(projects, eq(projects.id, commits.projectId))
53+
.where(
54+
and(
55+
eq(projects.workspaceId, workspaceId),
56+
lt(documentLogs.createdAt, cutoffDate),
57+
eq(documentLogs.uuid, providerLogs.documentLogUuid),
58+
),
59+
)
60+
.limit(1),
61+
),
62+
),
63+
)
64+
.returning({ id: providerLogs.id })
65+
66+
deletedProviderLogsBatch = deletedProviderLogsResult.length
67+
totalDeletedProviderLogs += deletedProviderLogsBatch
68+
69+
// Add a small delay between batches to prevent overwhelming the database
70+
if (deletedProviderLogsBatch === batchSize) {
71+
await new Promise((resolve) => setTimeout(resolve, 100))
72+
}
73+
} while (deletedProviderLogsBatch === batchSize)
74+
75+
// Delete document logs in batches
76+
let deletedDocumentLogsBatch: number
77+
78+
do {
79+
// Delete old document logs from this workspace
80+
// Use JOIN directly in DELETE for optimal performance
81+
const deletedDocumentLogsResult = await tx.execute(sql`
82+
DELETE FROM ${documentLogs}
83+
WHERE ${documentLogs.id} IN (
84+
SELECT dl.id
85+
FROM ${documentLogs} dl
86+
INNER JOIN ${commits} c ON c.id = dl.commit_id
87+
INNER JOIN ${projects} p ON p.id = c.project_id
88+
WHERE p.workspace_id = ${workspaceId}
89+
AND dl.created_at < ${cutoffDate}
90+
LIMIT ${batchSize}
91+
)
92+
`)
93+
94+
deletedDocumentLogsBatch = deletedDocumentLogsResult.rowCount || 0
95+
totalDeletedDocumentLogs += deletedDocumentLogsBatch
96+
97+
// Add a small delay between batches to prevent overwhelming the database
98+
if (deletedDocumentLogsBatch === batchSize) {
99+
await new Promise((resolve) => setTimeout(resolve, 100))
100+
}
101+
} while (deletedDocumentLogsBatch === batchSize)
102+
103+
return Result.ok({
104+
success: true,
105+
workspaceId,
106+
deletedDocumentLogs: totalDeletedDocumentLogs,
107+
deletedProviderLogs: totalDeletedProviderLogs,
108+
cutoffDate: cutoffDate.toISOString(),
109+
})
110+
})
111+
}

packages/core/src/jobs/job-definitions/maintenance/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@ export * from './refreshDocumentsStatsCacheJob'
22
export * from './refreshDocumentStatsCacheJob'
33
export * from './refreshProjectsStatsCacheJob'
44
export * from './refreshProjectStatsCacheJob'
5+
export * from './scheduleWorkspaceCleanupJobs'
6+
export * from './cleanupWorkspaceOldLogsJob'
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import { Job } from 'bullmq'
2+
import { eq, inArray } from 'drizzle-orm'
3+
import { FREE_PLANS } from '../../../plans'
4+
import { database } from '../../../client'
5+
import { subscriptions, workspaces } from '../../../schema'
6+
import { maintenanceQueue } from '../../queues'
7+
8+
export type ScheduleWorkspaceCleanupJobsData = Record<string, never>
9+
10+
/**
11+
* Job that runs nightly to schedule cleanup jobs for workspaces on free plans.
12+
*
13+
* This job:
14+
* 1. Finds all workspaces with free plans (hobby_v1, hobby_v2)
15+
* 2. Enqueues individual cleanup jobs for each workspace
16+
* 3. Each cleanup job will delete document logs and provider logs older than 30 days
17+
*/
18+
export const scheduleWorkspaceCleanupJobs = async (
19+
_: Job<ScheduleWorkspaceCleanupJobsData>,
20+
) => {
21+
// Find all workspaces with free plans
22+
const freeWorkspaces = await database
23+
.select({
24+
id: workspaces.id,
25+
})
26+
.from(workspaces)
27+
.innerJoin(
28+
subscriptions,
29+
eq(subscriptions.id, workspaces.currentSubscriptionId),
30+
)
31+
.where(inArray(subscriptions.plan, FREE_PLANS))
32+
.then((r) => r)
33+
34+
let enqueuedJobs = 0
35+
36+
// Enqueue individual cleanup job for each free workspace
37+
for (const workspace of freeWorkspaces) {
38+
await maintenanceQueue.add(
39+
'cleanupWorkspaceOldLogsJob',
40+
{ workspaceId: workspace.id },
41+
{ attempts: 3 },
42+
)
43+
enqueuedJobs++
44+
}
45+
46+
return {
47+
success: true,
48+
freeWorkspacesCount: freeWorkspaces.length,
49+
enqueuedJobs,
50+
}
51+
}

0 commit comments

Comments
 (0)