diff --git a/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.schedules.recover.ts b/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.schedules.recover.ts new file mode 100644 index 0000000000..a9ada56085 --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.schedules.recover.ts @@ -0,0 +1,58 @@ +import { ActionFunctionArgs, json, LoaderFunctionArgs } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { prisma } from "~/db.server"; +import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { scheduleEngine } from "~/v3/scheduleEngine.server"; + +const ParamsSchema = z.object({ + environmentId: z.string(), +}); + +export async function action({ request, params }: ActionFunctionArgs) { + // Next authenticate the request + const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); + + if (!authenticationResult) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + const user = await prisma.user.findUnique({ + where: { + id: authenticationResult.userId, + }, + }); + + if (!user) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + if (!user.admin) { + return json({ error: "You must be an admin to perform this action" }, { status: 403 }); + } + + const parsedParams = ParamsSchema.parse(params); + + const environment = await prisma.runtimeEnvironment.findFirst({ + where: { + id: parsedParams.environmentId, + }, + include: { + organization: true, + project: true, + }, + }); + + if (!environment) { + return json({ error: "Environment not found" }, { status: 404 }); + } + + const results = await scheduleEngine.recoverSchedulesInEnvironment( + environment.projectId, + environment.id + ); + + return json({ + success: true, + results, + }); +} diff --git a/apps/webapp/app/runEngine/concerns/traceEvents.server.ts b/apps/webapp/app/runEngine/concerns/traceEvents.server.ts index f8193ff112..2aafb068c0 100644 --- a/apps/webapp/app/runEngine/concerns/traceEvents.server.ts +++ b/apps/webapp/app/runEngine/concerns/traceEvents.server.ts @@ -39,6 +39,9 @@ export class DefaultTraceEventsConcern implements TraceEventConcern { }, incomplete: true, immediate: true, + startTime: request.options?.overrideCreatedAt + ? BigInt(request.options.overrideCreatedAt.getTime()) * BigInt(1000000) + : undefined, }, async (event, traceContext, traceparent) => { return await callback({ diff --git a/apps/webapp/app/runEngine/types.ts b/apps/webapp/app/runEngine/types.ts index f9853215b4..524d2ac714 100644 --- a/apps/webapp/app/runEngine/types.ts +++ b/apps/webapp/app/runEngine/types.ts @@ -21,6 +21,7 @@ export type TriggerTaskServiceOptions = { runFriendlyId?: string; skipChecks?: boolean; oneTimeUseToken?: string; + overrideCreatedAt?: Date; }; // domain/triggerTask.ts diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index 7d1625395f..cfca700209 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -970,7 +970,7 @@ export class EventRepository { const propagatedContext = extractContextFromCarrier(options.context ?? {}); const start = process.hrtime.bigint(); - const startTime = getNowInNanoseconds(); + const startTime = options.startTime ?? getNowInNanoseconds(); const traceId = options.spanParentAsLink ? this.generateTraceId() diff --git a/apps/webapp/app/v3/services/triggerTaskV1.server.ts b/apps/webapp/app/v3/services/triggerTaskV1.server.ts index f57a0cb6a0..5a913838ef 100644 --- a/apps/webapp/app/v3/services/triggerTaskV1.server.ts +++ b/apps/webapp/app/v3/services/triggerTaskV1.server.ts @@ -312,6 +312,9 @@ export class TriggerTaskServiceV1 extends BaseService { }, incomplete: true, immediate: true, + startTime: options.overrideCreatedAt + ? BigInt(options.overrideCreatedAt.getTime()) * BigInt(1000000) + : undefined, }, async (event, traceContext, traceparent) => { const run = await autoIncrementCounter.incrementInTransaction( diff --git a/apps/webapp/app/v3/taskEventStore.server.ts b/apps/webapp/app/v3/taskEventStore.server.ts index 7a87d29b35..269aab84b4 100644 --- a/apps/webapp/app/v3/taskEventStore.server.ts +++ b/apps/webapp/app/v3/taskEventStore.server.ts @@ -82,17 +82,20 @@ export class TaskEventStore { let finalWhere: Prisma.TaskEventWhereInput = where; if (table === "taskEventPartitioned") { - // Add 1 minute to endCreatedAt to make sure we include all events in the range. + // Add buffer to start and end of the range to make sure we include all events in the range. const end = endCreatedAt ? new Date(endCreatedAt.getTime() + env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000) : new Date(); + const startCreatedAtWithBuffer = new Date( + startCreatedAt.getTime() - env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000 + ); finalWhere = { AND: [ where, { createdAt: { - gte: startCreatedAt, + gte: startCreatedAtWithBuffer, lt: end, }, }, @@ -138,6 +141,11 @@ export class TaskEventStore { options?.includeDebugLogs === false || options?.includeDebugLogs === undefined; if (table === "taskEventPartitioned") { + const createdAtBufferInMillis = env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000; + const startCreatedAtWithBuffer = new Date(startCreatedAt.getTime() - createdAtBufferInMillis); + const $endCreatedAt = endCreatedAt ?? new Date(); + const endCreatedAtWithBuffer = new Date($endCreatedAt.getTime() + createdAtBufferInMillis); + return await this.readReplica.$queryRaw` SELECT "spanId", @@ -158,11 +166,8 @@ export class TaskEventStore { FROM "TaskEventPartitioned" WHERE "traceId" = ${traceId} - AND "createdAt" >= ${startCreatedAt.toISOString()}::timestamp - AND "createdAt" < ${(endCreatedAt - ? new Date(endCreatedAt.getTime() + env.TASK_EVENT_PARTITIONED_WINDOW_IN_SECONDS * 1000) - : new Date() - ).toISOString()}::timestamp + AND "createdAt" >= ${startCreatedAtWithBuffer.toISOString()}::timestamp + AND "createdAt" < ${endCreatedAtWithBuffer.toISOString()}::timestamp ${ filterDebug ? Prisma.sql`AND \"kind\" <> CAST('LOG'::text AS "public"."TaskEventKind")` diff --git a/internal-packages/schedule-engine/src/engine/index.ts b/internal-packages/schedule-engine/src/engine/index.ts index c1fa859d99..2e4292d6e5 100644 --- a/internal-packages/schedule-engine/src/engine/index.ts +++ b/internal-packages/schedule-engine/src/engine/index.ts @@ -8,7 +8,7 @@ import { Tracer, } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; -import { PrismaClient } from "@trigger.dev/database"; +import { PrismaClient, TaskSchedule, TaskScheduleInstance } from "@trigger.dev/database"; import { Worker, type JobHandlerParams } from "@trigger.dev/redis-worker"; import { calculateDistributedExecutionTime } from "./distributedScheduling.js"; import { calculateNextScheduledTimestamp, nextScheduledTimestamps } from "./scheduleCalculation.js"; @@ -645,6 +645,140 @@ export class ScheduleEngine { }); } + public recoverSchedulesInEnvironment(projectId: string, environmentId: string) { + return startSpan(this.tracer, "recoverSchedulesInEnvironment", async (span) => { + this.logger.info("Recovering schedules in environment", { + environmentId, + projectId, + }); + + span.setAttribute("environmentId", environmentId); + + const schedules = await this.prisma.taskSchedule.findMany({ + where: { + projectId, + instances: { + some: { + environmentId, + }, + }, + }, + select: { + id: true, + generatorExpression: true, + instances: { + select: { + id: true, + environmentId: true, + lastScheduledTimestamp: true, + nextScheduledTimestamp: true, + }, + }, + }, + }); + + const instancesWithSchedule = schedules + .map((schedule) => ({ + schedule, + instance: schedule.instances.find((instance) => instance.environmentId === environmentId), + })) + .filter((instance) => instance.instance) as Array<{ + schedule: Omit<(typeof schedules)[number], "instances">; + instance: NonNullable<(typeof schedules)[number]["instances"][number]>; + }>; + + if (instancesWithSchedule.length === 0) { + this.logger.info("No instances found for environment", { + environmentId, + projectId, + }); + + return { + recovered: [], + skipped: [], + }; + } + + const results = { + recovered: [], + skipped: [], + } as { recovered: string[]; skipped: string[] }; + + for (const { instance, schedule } of instancesWithSchedule) { + this.logger.info("Recovering schedule", { + schedule, + instance, + }); + + const [recoverError, result] = await tryCatch( + this.#recoverTaskScheduleInstance({ instance, schedule }) + ); + + if (recoverError) { + this.logger.error("Error recovering schedule", { + error: recoverError instanceof Error ? recoverError.message : String(recoverError), + }); + + span.setAttribute("recover_error", true); + span.setAttribute( + "recover_error_message", + recoverError instanceof Error ? recoverError.message : String(recoverError) + ); + } else { + span.setAttribute("recover_success", true); + + if (result === "recovered") { + results.recovered.push(instance.id); + } else { + results.skipped.push(instance.id); + } + } + } + + return results; + }); + } + + async #recoverTaskScheduleInstance({ + instance, + schedule, + }: { + instance: { + id: string; + environmentId: string; + lastScheduledTimestamp: Date | null; + nextScheduledTimestamp: Date | null; + }; + schedule: { id: string; generatorExpression: string }; + }) { + // inspect the schedule worker to see if there is a job for this instance + const job = await this.worker.getJob(`scheduled-task-instance:${instance.id}`); + + if (job) { + this.logger.info("Job already exists for instance", { + instanceId: instance.id, + job, + schedule, + }); + + return "skipped"; + } + + this.logger.info("No job found for instance, registering next run", { + instanceId: instance.id, + schedule, + }); + + // If the job does not exist, register the next run + await this.registerNextTaskScheduleInstance({ instanceId: instance.id }); + + return "recovered"; + } + + async getJob(id: string) { + return this.worker.getJob(id); + } + async quit() { this.logger.info("Shutting down schedule engine"); diff --git a/internal-packages/schedule-engine/test/scheduleEngine.test.ts b/internal-packages/schedule-engine/test/scheduleEngine.test.ts index e3990a51cf..fdc05e2574 100644 --- a/internal-packages/schedule-engine/test/scheduleEngine.test.ts +++ b/internal-packages/schedule-engine/test/scheduleEngine.test.ts @@ -1,9 +1,9 @@ import { containerTest } from "@internal/testcontainers"; import { trace } from "@internal/tracing"; -import { describe, expect, vi } from "vitest"; -import { ScheduleEngine } from "../src/index.js"; import { setTimeout } from "timers/promises"; +import { describe, expect, vi } from "vitest"; import { TriggerScheduledTaskParams } from "../src/engine/types.js"; +import { ScheduleEngine } from "../src/index.js"; describe("ScheduleEngine Integration", () => { containerTest( diff --git a/internal-packages/schedule-engine/test/scheduleRecovery.test.ts b/internal-packages/schedule-engine/test/scheduleRecovery.test.ts new file mode 100644 index 0000000000..9ad345101e --- /dev/null +++ b/internal-packages/schedule-engine/test/scheduleRecovery.test.ts @@ -0,0 +1,396 @@ +import { containerTest } from "@internal/testcontainers"; +import { trace } from "@internal/tracing"; +import { describe, expect, vi } from "vitest"; +import { TriggerScheduledTaskParams } from "../src/engine/types.js"; +import { ScheduleEngine } from "../src/index.js"; + +describe("Schedule Recovery", () => { + containerTest( + "should recover schedules when no existing jobs are found", + { timeout: 30_000 }, + async ({ prisma, redisOptions }) => { + const mockDevConnectedHandler = vi.fn().mockResolvedValue(true); + const triggerCalls: TriggerScheduledTaskParams[] = []; + + const engine = new ScheduleEngine({ + prisma, + redis: redisOptions, + distributionWindow: { seconds: 10 }, + worker: { + concurrency: 1, + disabled: true, // Disable worker to prevent automatic execution + pollIntervalMs: 1000, + }, + tracer: trace.getTracer("test", "0.0.0"), + onTriggerScheduledTask: async (params) => { + triggerCalls.push(params); + return { success: true }; + }, + isDevEnvironmentConnectedHandler: mockDevConnectedHandler, + }); + + try { + // Create test data + const organization = await prisma.organization.create({ + data: { + title: "Recovery Test Org", + slug: "recovery-test-org", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "Recovery Test Project", + slug: "recovery-test-project", + externalRef: "recovery-test-ref", + organizationId: organization.id, + }, + }); + + const environment = await prisma.runtimeEnvironment.create({ + data: { + slug: "recovery-test-env", + type: "PRODUCTION", + projectId: project.id, + organizationId: organization.id, + apiKey: "tr_recovery_test_1234", + pkApiKey: "pk_recovery_test_1234", + shortcode: "recovery-test-short", + }, + }); + + const taskSchedule = await prisma.taskSchedule.create({ + data: { + friendlyId: "sched_recovery_123", + taskIdentifier: "recovery-test-task", + projectId: project.id, + deduplicationKey: "recovery-test-dedup", + userProvidedDeduplicationKey: false, + generatorExpression: "0 */5 * * *", // Every 5 minutes + generatorDescription: "Every 5 minutes", + timezone: "UTC", + type: "DECLARATIVE", + active: true, + externalId: "recovery-ext-123", + }, + }); + + const scheduleInstance = await prisma.taskScheduleInstance.create({ + data: { + taskScheduleId: taskSchedule.id, + environmentId: environment.id, + active: true, + }, + }); + + // Verify no job exists initially + const jobBeforeRecovery = await engine.getJob( + `scheduled-task-instance:${scheduleInstance.id}` + ); + expect(jobBeforeRecovery).toBeNull(); + + // Perform recovery + await engine.recoverSchedulesInEnvironment(project.id, environment.id); + + // Verify that a job was created + const jobAfterRecovery = await engine.getJob( + `scheduled-task-instance:${scheduleInstance.id}` + ); + expect(jobAfterRecovery).not.toBeNull(); + expect(jobAfterRecovery?.job).toBe("schedule.triggerScheduledTask"); + + // Verify the instance was updated with next scheduled timestamp + const updatedInstance = await prisma.taskScheduleInstance.findFirst({ + where: { id: scheduleInstance.id }, + }); + expect(updatedInstance?.nextScheduledTimestamp).toBeDefined(); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "should not create duplicate jobs when schedule already has an active job", + { timeout: 30_000 }, + async ({ prisma, redisOptions }) => { + const mockDevConnectedHandler = vi.fn().mockResolvedValue(true); + const triggerCalls: TriggerScheduledTaskParams[] = []; + + const engine = new ScheduleEngine({ + prisma, + redis: redisOptions, + distributionWindow: { seconds: 10 }, + worker: { + concurrency: 1, + disabled: true, // Disable worker to prevent automatic execution + pollIntervalMs: 1000, + }, + tracer: trace.getTracer("test", "0.0.0"), + onTriggerScheduledTask: async (params) => { + triggerCalls.push(params); + return { success: true }; + }, + isDevEnvironmentConnectedHandler: mockDevConnectedHandler, + }); + + try { + // Create test data + const organization = await prisma.organization.create({ + data: { + title: "Duplicate Test Org", + slug: "duplicate-test-org", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "Duplicate Test Project", + slug: "duplicate-test-project", + externalRef: "duplicate-test-ref", + organizationId: organization.id, + }, + }); + + const environment = await prisma.runtimeEnvironment.create({ + data: { + slug: "duplicate-test-env", + type: "PRODUCTION", + projectId: project.id, + organizationId: organization.id, + apiKey: "tr_duplicate_test_1234", + pkApiKey: "pk_duplicate_test_1234", + shortcode: "duplicate-test-short", + }, + }); + + const taskSchedule = await prisma.taskSchedule.create({ + data: { + friendlyId: "sched_duplicate_123", + taskIdentifier: "duplicate-test-task", + projectId: project.id, + deduplicationKey: "duplicate-test-dedup", + userProvidedDeduplicationKey: false, + generatorExpression: "0 */10 * * *", // Every 10 minutes + generatorDescription: "Every 10 minutes", + timezone: "UTC", + type: "DECLARATIVE", + active: true, + externalId: "duplicate-ext-123", + }, + }); + + const scheduleInstance = await prisma.taskScheduleInstance.create({ + data: { + taskScheduleId: taskSchedule.id, + environmentId: environment.id, + active: true, + }, + }); + + // First, register the schedule normally + await engine.registerNextTaskScheduleInstance({ instanceId: scheduleInstance.id }); + + // Verify job exists + const jobAfterFirstRegistration = await engine.getJob( + `scheduled-task-instance:${scheduleInstance.id}` + ); + expect(jobAfterFirstRegistration).not.toBeNull(); + const firstJobId = jobAfterFirstRegistration?.id; + + // Now run recovery - it should not create a duplicate job + await engine.recoverSchedulesInEnvironment(project.id, environment.id); + + // Verify the same job still exists (no duplicate created) + const jobAfterRecovery = await engine.getJob( + `scheduled-task-instance:${scheduleInstance.id}` + ); + expect(jobAfterRecovery).not.toBeNull(); + expect(jobAfterRecovery?.id).toBe(firstJobId); + expect(jobAfterRecovery?.deduplicationKey).toBe(jobAfterFirstRegistration.deduplicationKey); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "should recover multiple schedules in the same environment", + { timeout: 30_000 }, + async ({ prisma, redisOptions }) => { + const mockDevConnectedHandler = vi.fn().mockResolvedValue(true); + const triggerCalls: TriggerScheduledTaskParams[] = []; + + const engine = new ScheduleEngine({ + prisma, + redis: redisOptions, + distributionWindow: { seconds: 10 }, + worker: { + concurrency: 1, + disabled: true, // Disable worker to prevent automatic execution + pollIntervalMs: 1000, + }, + tracer: trace.getTracer("test", "0.0.0"), + onTriggerScheduledTask: async (params) => { + triggerCalls.push(params); + return { success: true }; + }, + isDevEnvironmentConnectedHandler: mockDevConnectedHandler, + }); + + try { + // Create test data + const organization = await prisma.organization.create({ + data: { + title: "Multiple Test Org", + slug: "multiple-test-org", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "Multiple Test Project", + slug: "multiple-test-project", + externalRef: "multiple-test-ref", + organizationId: organization.id, + }, + }); + + const environment = await prisma.runtimeEnvironment.create({ + data: { + slug: "multiple-test-env", + type: "PRODUCTION", + projectId: project.id, + organizationId: organization.id, + apiKey: "tr_multiple_test_1234", + pkApiKey: "pk_multiple_test_1234", + shortcode: "multiple-test-short", + }, + }); + + // Create multiple task schedules + const schedules = []; + const instances = []; + + for (let i = 1; i <= 3; i++) { + const taskSchedule = await prisma.taskSchedule.create({ + data: { + friendlyId: `sched_multiple_${i}`, + taskIdentifier: `multiple-test-task-${i}`, + projectId: project.id, + deduplicationKey: `multiple-test-dedup-${i}`, + userProvidedDeduplicationKey: false, + generatorExpression: `${i} */15 * * *`, // Every 15 minutes at different minute offsets + generatorDescription: `Every 15 minutes (${i})`, + timezone: "UTC", + type: "DECLARATIVE", + active: true, + externalId: `multiple-ext-${i}`, + }, + }); + + const scheduleInstance = await prisma.taskScheduleInstance.create({ + data: { + taskScheduleId: taskSchedule.id, + environmentId: environment.id, + active: true, + }, + }); + + schedules.push(taskSchedule); + instances.push(scheduleInstance); + } + + // Verify no jobs exist initially + for (const instance of instances) { + const job = await engine.getJob(`scheduled-task-instance:${instance.id}`); + expect(job).toBeNull(); + } + + // Perform recovery + await engine.recoverSchedulesInEnvironment(project.id, environment.id); + + // Verify that jobs were created for all instances + for (const instance of instances) { + const job = await engine.getJob(`scheduled-task-instance:${instance.id}`); + expect(job).not.toBeNull(); + expect(job?.job).toBe("schedule.triggerScheduledTask"); + } + + // Verify all instances were updated + for (const instance of instances) { + const updatedInstance = await prisma.taskScheduleInstance.findFirst({ + where: { id: instance.id }, + }); + expect(updatedInstance?.nextScheduledTimestamp).toBeDefined(); + } + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "should handle recovery gracefully when no schedules exist in environment", + { timeout: 30_000 }, + async ({ prisma, redisOptions }) => { + const mockDevConnectedHandler = vi.fn().mockResolvedValue(true); + const triggerCalls: TriggerScheduledTaskParams[] = []; + + const engine = new ScheduleEngine({ + prisma, + redis: redisOptions, + distributionWindow: { seconds: 10 }, + worker: { + concurrency: 1, + disabled: true, // Disable worker to prevent automatic execution + pollIntervalMs: 1000, + }, + tracer: trace.getTracer("test", "0.0.0"), + onTriggerScheduledTask: async (params) => { + triggerCalls.push(params); + return { success: true }; + }, + isDevEnvironmentConnectedHandler: mockDevConnectedHandler, + }); + + try { + // Create test data but no schedules + const organization = await prisma.organization.create({ + data: { + title: "Empty Test Org", + slug: "empty-test-org", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "Empty Test Project", + slug: "empty-test-project", + externalRef: "empty-test-ref", + organizationId: organization.id, + }, + }); + + const environment = await prisma.runtimeEnvironment.create({ + data: { + slug: "empty-test-env", + type: "PRODUCTION", + projectId: project.id, + organizationId: organization.id, + apiKey: "tr_empty_test_1234", + pkApiKey: "pk_empty_test_1234", + shortcode: "empty-test-short", + }, + }); + + // Perform recovery on empty environment - should not throw errors + await expect( + engine.recoverSchedulesInEnvironment(project.id, environment.id) + ).resolves.not.toThrow(); + } finally { + await engine.quit(); + } + } + ); +}); diff --git a/packages/redis-worker/src/queue.test.ts b/packages/redis-worker/src/queue.test.ts index ffea46b6b3..6c8b888407 100644 --- a/packages/redis-worker/src/queue.test.ts +++ b/packages/redis-worker/src/queue.test.ts @@ -76,6 +76,58 @@ describe("SimpleQueue", () => { } }); + redisTest("getJob", { timeout: 20_000 }, async ({ redisContainer }) => { + const queue = new SimpleQueue({ + name: "test-1", + schema: { + test: z.object({ + value: z.number(), + }), + }, + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + logger: new Logger("test", "log"), + }); + + try { + await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 }); + const job1 = await queue.getJob("1"); + + expect(job1).toEqual( + expect.objectContaining({ + id: "1", + job: "test", + item: { value: 1 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); + + await queue.enqueue({ id: "2", job: "test", item: { value: 2 }, visibilityTimeoutMs: 2000 }); + const job2 = await queue.getJob("2"); + + expect(job2).toEqual( + expect.objectContaining({ + id: "2", + job: "test", + item: { value: 2 }, + visibilityTimeoutMs: 2000, + attempt: 0, + timestamp: expect.any(Date), + }) + ); + + const job3 = await queue.getJob("3"); + expect(job3).toBeNull(); + } finally { + await queue.close(); + } + }); + redisTest("no items", { timeout: 20_000 }, async ({ redisContainer }) => { const queue = new SimpleQueue({ name: "test-1", diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index 6a1618404b..dd692f51fd 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -295,6 +295,27 @@ export class SimpleQueue { } } + async getJob(id: string): Promise | null> { + const result = await this.redis.getJob(`queue`, `items`, id); + + if (!result) { + return null; + } + + const [_, score, serializedItem] = result; + const item = JSON.parse(serializedItem) as QueueItem; + + return { + id, + job: item.job, + item: item.item, + visibilityTimeoutMs: item.visibilityTimeoutMs, + attempt: item.attempt ?? 0, + timestamp: new Date(Number(score)), + deduplicationKey: item.deduplicationKey ?? undefined, + }; + } + async moveToDeadLetterQueue(id: string, errorMessage: string): Promise { try { const result = await this.redis.moveToDeadLetterQueue( @@ -425,6 +446,26 @@ export class SimpleQueue { `, }); + this.redis.defineCommand("getJob", { + numberOfKeys: 2, + lua: ` + local queue = KEYS[1] + local items = KEYS[2] + local jobId = ARGV[1] + + local serializedItem = redis.call('HGET', items, jobId) + + if serializedItem == false then + return nil + end + + -- get the score from the queue sorted set + local score = redis.call('ZSCORE', queue, jobId) + + return { jobId, score, serializedItem } + `, + }); + this.redis.defineCommand("ackItem", { numberOfKeys: 2, lua: ` @@ -597,5 +638,12 @@ declare module "@internal/redis" { serializedItem: string, callback?: Callback ): Result; + + getJob( + queue: string, + items: string, + id: string, + callback?: Callback<[string, string, string] | null> + ): Result<[string, string, string] | null, Context>; } } diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index e167611c79..644b804162 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -382,6 +382,10 @@ class Worker { ); } + async getJob(id: string) { + return this.queue.getJob(id); + } + /** * The main loop that each worker runs. It repeatedly polls for items, * processes them, and then waits before the next iteration.