diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 696abadc8c..674b410eaa 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -56,14 +56,14 @@ jobs: secrets: inherit publish-webapp: - needs: [typecheck] + needs: [typecheck, units] uses: ./.github/workflows/publish-webapp.yml secrets: inherit with: image_tag: ${{ inputs.image_tag }} publish-worker: - needs: [typecheck] + needs: [typecheck, units] uses: ./.github/workflows/publish-worker.yml secrets: inherit with: diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index f0931683b4..589c9a0a33 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -460,6 +460,7 @@ const EnvironmentSchema = z.object({ RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0), RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(), RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), + RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000), RUN_ENGINE_WORKER_REDIS_HOST: z .string() @@ -717,7 +718,7 @@ const EnvironmentSchema = z.object({ SLACK_BOT_TOKEN: z.string().optional(), SLACK_SIGNUP_REASON_CHANNEL_ID: z.string().optional(), - + // kapa.ai KAPA_AI_WEBSITE_ID: z.string().optional(), }); diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index ad9e1c9aeb..01a9d3e5d4 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -95,6 +95,7 @@ function createRunEngine() { ...(env.RUN_ENGINE_RUN_QUEUE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }, }, + retryWarmStartThresholdMs: env.RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS, }); return engine; diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index d21b1d6795..f1102a195c 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -304,6 +304,7 @@ export class RunEngine { waitpointSystem: this.waitpointSystem, delayedRunSystem: this.delayedRunSystem, machines: this.options.machines, + retryWarmStartThresholdMs: this.options.retryWarmStartThresholdMs, }); this.dequeueSystem = new DequeueSystem({ diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index c34247f11c..b792dda793 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -541,7 +541,7 @@ export class RunQueue { } } - await this.#callNackMessage({ message }); + await this.#callNackMessage({ message, retryAt }); return true; }, diff --git a/internal-packages/run-engine/src/run-queue/tests/nack.test.ts b/internal-packages/run-engine/src/run-queue/tests/nack.test.ts index f7b8aa1449..4b1e832023 100644 --- a/internal-packages/run-engine/src/run-queue/tests/nack.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/nack.test.ts @@ -214,4 +214,65 @@ describe("RunQueue.nackMessage", () => { } } ); + + redisTest( + "nacking a message with retryAt sets the correct requeue time", + async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const envMasterQueue = `env:${authenticatedEnvDev.id}`; + + // Enqueue message + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: messageDev, + masterQueues: ["main", envMasterQueue], + }); + + // Dequeue message + const dequeued = await queue.dequeueMessageFromMasterQueue( + "test_12345", + envMasterQueue, + 10 + ); + expect(dequeued.length).toBe(1); + + // Set retryAt to 5 seconds in the future + const retryAt = Date.now() + 5000; + await queue.nackMessage({ + orgId: messageDev.orgId, + messageId: messageDev.runId, + retryAt, + }); + + // Check the score of the message in the queue + const queueKey = queue.keys.queueKey(authenticatedEnvDev, messageDev.queue); + const score = await queue.oldestMessageInQueue(authenticatedEnvDev, messageDev.queue); + expect(typeof score).toBe("number"); + if (typeof score !== "number") { + throw new Error("Expected score to be a number, but got undefined"); + } + // Should be within 100ms of retryAt + expect(Math.abs(score - retryAt)).toBeLessThanOrEqual(100); + } finally { + await queue.quit(); + } + } + ); }); diff --git a/internal-packages/testcontainers/src/index.ts b/internal-packages/testcontainers/src/index.ts index 7253470998..ae5dcc76d6 100644 --- a/internal-packages/testcontainers/src/index.ts +++ b/internal-packages/testcontainers/src/index.ts @@ -53,7 +53,9 @@ const postgresContainer = async ( try { await use(container); } finally { - await container.stop(); + // WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately. + // If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second + await container.stop({ timeout: 10 }); } }; @@ -92,7 +94,9 @@ const redisContainer = async ( try { await use(container); } finally { - await container.stop(); + // WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately. + // If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second + await container.stop({ timeout: 10 }); } }; @@ -142,7 +146,9 @@ const electricOrigin = async ( try { await use(origin); } finally { - await container.stop(); + // WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately. + // If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second + await container.stop({ timeout: 10 }); } };