diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 82cb261c..0cdf5067 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -473,9 +473,25 @@ packages/[package-name]/ ### ✅ Required Practices 1. **README Files** - - Every package must have a README.md - - Include: description, installation, usage examples, API overview - - Keep examples up-to-date with code + - README files should stay simple with just an introduction to concepts + - Keep READMEs concise: key features, quick examples, and links to full documentation + - Full documentation belongs in the `docs/` directory to maintain a single source of truth + - README example: brief feature list → quick code snippet → link to docs site + - This prevents documentation drift and duplication + + ```markdown + ## Feature Name + + Brief introduction to the feature (1-2 sentences). + + **Quick Example:** + + \`\`\`typescript + // Minimal working example + \`\`\` + + 📖 **[Learn more →](https://btravers.github.io/amqp-contract/guide/feature)** + ``` 2. **Code Comments** - Use JSDoc for all public APIs diff --git a/package.json b/package.json index aad3a812..5e81016f 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "setup": "pnpm install && pnpm build", "sort-package-json": "sort-package-json '**/package.json' '!**/node_modules/**'", "test": "turbo run test", - "test:integration": "turbo run test:integration", + "test:integration": "turbo run test:integration --concurrency=1", "typecheck": "turbo run typecheck", "version": "changeset version" }, diff --git a/packages/contract/README.md b/packages/contract/README.md index 4c6c6ff0..aeceaa70 100644 --- a/packages/contract/README.md +++ b/packages/contract/README.md @@ -75,6 +75,12 @@ const contract = defineContract({ - ✅ Event-oriented (publisher-first) and command-oriented (consumer-first) patterns - ✅ Flexible routing key patterns for topic exchanges +## Error Handling & Retry Policies + +Error handling and retry policies are configured at the **worker level**, not in contracts. Contracts define message schemas and topology, while workers control retry behavior (including exponential backoff and dead letter integration) during message processing. + +📖 **[Learn more about retry policies and error handling in workers →](https://btravers.github.io/amqp-contract/guide/worker-usage.html#retry-policies)** + ## Documentation 📖 **[Read the full documentation →](https://btravers.github.io/amqp-contract)** diff --git a/packages/contract/src/builder.ts b/packages/contract/src/builder.ts index 7dff3c5b..b70ba258 100644 --- a/packages/contract/src/builder.ts +++ b/packages/contract/src/builder.ts @@ -598,9 +598,14 @@ export function definePublisher( * Consumers are associated with a specific queue and message type. When you create a worker * with this consumer, it will process messages from the queue according to the schema. * + * **Retry policy configuration has moved to worker-level options.** + * This separates contract definition (what messages look like) from deployment configuration + * (how to handle failures). Configure retry policies when creating the worker, similar to + * how you configure prefetch. + * * @param queue - The queue definition to consume from * @param message - The message definition with payload schema - * @param options - Optional consumer configuration + * @param options - Optional consumer configuration (currently unused, kept for future extensibility) * @returns A consumer definition with inferred message types * * @example @@ -616,19 +621,35 @@ export function definePublisher( * }) * ); * + * // Basic consumer (contract-level definition) * const processOrderConsumer = defineConsumer(orderQueue, orderMessage); * - * // Later, when creating a worker, you'll provide a handler for this consumer: - * // const worker = await TypedAmqpWorker.create({ - * // contract, - * // handlers: { - * // processOrder: async (message) => { - * // // message is automatically typed based on the schema - * // console.log(message.orderId); // string - * // } - * // }, - * // connection - * // }); + * // Later, when creating a worker, configure retry policy (deployment-level): + * const worker = await TypedAmqpWorker.create({ + * contract, + * handlers: { + * processOrder: [ + * async (message) => { + * // message is automatically typed based on the schema + * console.log(message.orderId); // string + * }, + * { + * prefetch: 10, + * retryPolicy: { + * maxAttempts: 3, + * backoff: { + * type: 'exponential', + * initialInterval: 1_000, + * maxInterval: 60_000, + * coefficient: 2 + * }, + * nonRetryableErrors: ['ValidationError'] + * } + * } + * ] + * }, + * urls: ['amqp://localhost'] + * }).resultToPromise(); * ``` */ export function defineConsumer( diff --git a/packages/contract/src/types.ts b/packages/contract/src/types.ts index 688c8ee2..96caf6fa 100644 --- a/packages/contract/src/types.ts +++ b/packages/contract/src/types.ts @@ -418,14 +418,36 @@ export type PublisherDefinition { ... }, + * { + * prefetch: 10, + * retryPolicy: { + * maxAttempts: 3, + * backoff: { type: 'exponential', initialInterval: 1_000 } + * } + * } + * ] + * }, + * urls: ['amqp://localhost'] + * }).resultToPromise(); * ``` */ export type ConsumerDefinition = { diff --git a/packages/worker/README.md b/packages/worker/README.md index 0a5e74da..ea973f26 100644 --- a/packages/worker/README.md +++ b/packages/worker/README.md @@ -20,6 +20,8 @@ pnpm add @amqp-contract/worker - ✅ **Type-safe message consumption** — Handlers are fully typed based on your contract - ✅ **Automatic validation** — Messages are validated before reaching your handlers +- ✅ **Retry policies** — Configurable retry limits with exponential backoff to prevent infinite loops +- ✅ **Dead letter exchange support** — Automatically route permanently failed messages to DLX - ✅ **Prefetch configuration** — Control message flow with per-consumer prefetch settings - ✅ **Batch processing** — Process multiple messages at once for better throughput - ✅ **Automatic reconnection** — Built-in connection management with failover support @@ -75,6 +77,77 @@ You can define handlers outside of the worker creation using `defineHandler` and ## Error Handling +### Retry Policies (Production-Ready) + +**For production use, always configure a retry policy** to prevent infinite retry loops and handle permanently failed messages gracefully. + +```typescript +import { defineConsumer, defineQueue, defineMessage } from "@amqp-contract/contract"; +import { z } from "zod"; + +const orderQueue = defineQueue("order-processing", { + durable: true, + deadLetter: { + exchange: dlxExchange, // Messages that exceed retry limit go here + routingKey: "order.failed", + }, +}); + +const orderMessage = defineMessage( + z.object({ + orderId: z.string(), + amount: z.number(), + }), +); + +const processOrderConsumer = defineConsumer(orderQueue, orderMessage); + +// Configure retry policy at worker level, not in contract +const worker = await TypedAmqpWorker.create({ + contract, + handlers: { + processOrder: [ + async (message) => { + /* handler */ + }, + { + prefetch: 10, + retryPolicy: { + maxAttempts: 3, // Maximum 3 attempts (initial + 2 retries) + backoff: { + type: "exponential", // or "fixed" + initialInterval: 1_000, // Start with 1 second + maxInterval: 60_000, // Cap at 60 seconds + coefficient: 2, // Double interval each retry (1s, 2s, 4s, ...) + }, + nonRetryableErrors: ["ValidationError", "AuthenticationError"], + }, + }, + ], + }, + urls: ["amqp://localhost"], +}).resultToPromise(); +``` + +**Retry Policy Options:** + +- `maxAttempts`: Maximum number of attempts (initial + retries, defaults to 1 if not specified) +- `backoff.type`: `"fixed"` (same interval) or `"exponential"` (increasing interval) +- `backoff.initialInterval`: Interval in milliseconds before first retry (default: 1_000) +- `backoff.maxInterval`: Maximum interval for exponential backoff (default: 60_000) +- `backoff.coefficient`: Multiplier for exponential backoff (default: 2) +- `nonRetryableErrors`: Array of error constructor names or message substrings that should not be retried + +**Behavior:** + +- Messages are retried up to `maxAttempts` times with configurable backoff intervals +- Attempt count is tracked in message headers (`x-retry-count`) +- After exhausting attempts, messages are sent to the dead letter exchange (if configured) +- If no DLX is configured, messages are rejected without requeue +- Errors matching `nonRetryableErrors` are immediately rejected (not retried) + +### Basic Error Handling + Worker handlers use standard Promise-based async/await pattern: ```typescript @@ -86,7 +159,7 @@ handlers: { // Message acknowledged automatically on success } catch (error) { // Exception automatically caught by worker - // Message is requeued for retry + // Message is retried according to retry policy throw error; } }; @@ -102,6 +175,16 @@ Worker defines error classes for internal use: These errors are logged but **handlers don't need to use them** - just throw standard exceptions. +### Migration from Legacy Behavior + +**BREAKING CHANGE**: The default behavior has changed. Previously, consumers without retry policies would retry infinitely. Now they default to 1 attempt (no retries) to prevent infinite loops. + +To migrate: + +1. Add a dead letter exchange to your queue configuration (optional but recommended) +2. Configure a retry policy at the worker level in your handler options (not in contract definition) +3. Test with your actual failure scenarios to tune the retry parameters + ## API For complete API documentation, see the [Worker API Reference](https://btravers.github.io/amqp-contract/api/worker). diff --git a/packages/worker/src/__tests__/context.ts b/packages/worker/src/__tests__/context.ts new file mode 100644 index 00000000..9b70e5fe --- /dev/null +++ b/packages/worker/src/__tests__/context.ts @@ -0,0 +1,44 @@ +import type { ContractDefinition } from "@amqp-contract/contract"; +import { TypedAmqpWorker } from "../worker.js"; +import type { WorkerInferConsumerHandlers } from "../types.js"; +import { it as baseIt } from "@amqp-contract/testing/extension"; + +export const it = baseIt.extend<{ + workerFactory: ( + contract: TContract, + handlers: WorkerInferConsumerHandlers, + ) => Promise>; +}>({ + workerFactory: async ({ amqpConnectionUrl }, use) => { + const workers: Array> = []; + + try { + await use( + async ( + contract: TContract, + handlers: WorkerInferConsumerHandlers, + ) => { + const worker = await TypedAmqpWorker.create({ + contract, + handlers, + urls: [amqpConnectionUrl], + }).resultToPromise(); + + workers.push(worker); + return worker; + }, + ); + } finally { + await Promise.all( + workers.map(async (worker) => { + try { + await worker.close().resultToPromise(); + } catch (error) { + // Swallow errors during cleanup + console.error("Failed to close worker during fixture cleanup:", error); + } + }), + ); + } + }, +}); diff --git a/packages/worker/src/__tests__/worker-prefetch-batch.spec.ts b/packages/worker/src/__tests__/worker-prefetch-batch.spec.ts index 91c50d04..013ebfd3 100644 --- a/packages/worker/src/__tests__/worker-prefetch-batch.spec.ts +++ b/packages/worker/src/__tests__/worker-prefetch-batch.spec.ts @@ -1,6 +1,4 @@ -/* oxlint-disable eslint/sort-imports */ import { - ContractDefinition, defineConsumer, defineContract, defineExchange, @@ -11,51 +9,8 @@ import { } from "@amqp-contract/contract"; import { describe, expect, vi } from "vitest"; import { TypedAmqpWorker } from "../worker.js"; -import { it as baseIt } from "@amqp-contract/testing/extension"; +import { it } from "./context.js"; import { z } from "zod"; -import type { WorkerInferConsumerHandlers } from "../types.js"; - -const it = baseIt.extend<{ - workerFactory: ( - contract: TContract, - handlers: WorkerInferConsumerHandlers, - ) => Promise>; -}>({ - workerFactory: async ({ amqpConnectionUrl }, use) => { - const workers: Array> = []; - - try { - await use( - async ( - contract: TContract, - handlers: WorkerInferConsumerHandlers, - ) => { - const worker = await TypedAmqpWorker.create({ - contract, - handlers, - urls: [amqpConnectionUrl], - }).resultToPromise(); - - workers.push(worker); - return worker; - }, - ); - } finally { - // Clean up all workers before fixture cleanup (which deletes the vhost) - await Promise.all( - workers.map(async (worker) => { - try { - await worker.close().resultToPromise(); - } catch (error) { - // Swallow errors during cleanup to avoid unhandled rejections - // eslint-disable-next-line no-console - console.error("Failed to close TypedAmqpWorker during fixture cleanup:", error); - } - }), - ); - } - }, -}); describe("AmqpWorker Prefetch and Batch Integration", () => { it("should apply prefetch configuration to consumer", async ({ diff --git a/packages/worker/src/__tests__/worker-retry.spec.ts b/packages/worker/src/__tests__/worker-retry.spec.ts new file mode 100644 index 00000000..1c0d6eed --- /dev/null +++ b/packages/worker/src/__tests__/worker-retry.spec.ts @@ -0,0 +1,341 @@ +import { + defineConsumer, + defineContract, + defineExchange, + defineMessage, + definePublisher, + defineQueue, + defineQueueBinding, +} from "@amqp-contract/contract"; +import { describe, expect, vi } from "vitest"; +import { it } from "./context.js"; +import { z } from "zod"; + +describe("AmqpWorker Retry Integration", () => { + it("should retry failed messages up to maxAttempts limit", async ({ + workerFactory, + publishMessage, + }) => { + // GIVEN + const TestMessage = z.object({ + id: z.string(), + }); + + const exchange = defineExchange("retry-test-exchange", "topic", { durable: false }); + const queue = defineQueue("retry-test-queue", { durable: false }); + + const contract = defineContract({ + exchanges: { test: exchange }, + queues: { testQueue: queue }, + bindings: { + testBinding: defineQueueBinding(queue, exchange, { routingKey: "test.#" }), + }, + publishers: { + testPublisher: definePublisher(exchange, defineMessage(TestMessage), { + routingKey: "test.message", + }), + }, + consumers: { + testConsumer: defineConsumer(queue, defineMessage(TestMessage), { + retryPolicy: { + maxAttempts: 3, + backoff: { + type: "fixed", + initialInterval: 100, + }, + }, + }), + }, + }); + + let attemptCount = 0; + await workerFactory(contract, { + testConsumer: () => { + attemptCount++; + throw new Error("Simulated failure"); + }, + }); + + // WHEN - Publish a message that will fail + publishMessage("retry-test-exchange", "test.message", { + id: "test-1", + }); + + // THEN - Handler should be called maxAttempts times (3 attempts total) + await vi.waitFor(() => { + if (attemptCount < 3) { + throw new Error("Not enough attempts yet"); + } + }); + + expect(attemptCount).toBe(3); + }); + + it("should apply exponential backoff between retries", async ({ + workerFactory, + publishMessage, + }) => { + // GIVEN + const TestMessage = z.object({ + id: z.string(), + }); + + const exchange = defineExchange("backoff-test-exchange", "topic", { durable: false }); + const queue = defineQueue("backoff-test-queue", { durable: false }); + + const contract = defineContract({ + exchanges: { test: exchange }, + queues: { testQueue: queue }, + bindings: { + testBinding: defineQueueBinding(queue, exchange, { routingKey: "test.#" }), + }, + publishers: { + testPublisher: definePublisher(exchange, defineMessage(TestMessage), { + routingKey: "test.message", + }), + }, + consumers: { + testConsumer: defineConsumer(queue, defineMessage(TestMessage), { + retryPolicy: { + maxAttempts: 3, + backoff: { + type: "exponential", + initialInterval: 100, + coefficient: 2, + }, + }, + }), + }, + }); + + const timestamps: number[] = []; + await workerFactory(contract, { + testConsumer: () => { + timestamps.push(Date.now()); + throw new Error("Simulated failure"); + }, + }); + + // WHEN + publishMessage("backoff-test-exchange", "test.message", { + id: "test-1", + }); + + // THEN - Verify exponential backoff delays + await vi.waitFor(() => { + if (timestamps.length < 3) { + throw new Error("Not enough attempts yet"); + } + }); + + expect(timestamps.length).toBeGreaterThanOrEqual(3); + const [ts0, ts1, ts2] = timestamps; + if (ts0 !== undefined && ts1 !== undefined && ts2 !== undefined) { + const delay1 = ts1 - ts0; + const delay2 = ts2 - ts1; + // Second delay should be roughly 2x the first delay (100ms vs 200ms) + // Allow some margin for execution time + expect(delay2).toBeGreaterThan(delay1 * 1.5); + } + }); + + it("should not retry when maxAttempts is 0", async ({ workerFactory, publishMessage }) => { + // GIVEN + const TestMessage = z.object({ + id: z.string(), + }); + + const exchange = defineExchange("no-retry-exchange", "topic", { durable: false }); + const queue = defineQueue("no-retry-queue", { durable: false }); + + const contract = defineContract({ + exchanges: { test: exchange }, + queues: { testQueue: queue }, + bindings: { + testBinding: defineQueueBinding(queue, exchange, { routingKey: "test.#" }), + }, + publishers: { + testPublisher: definePublisher(exchange, defineMessage(TestMessage), { + routingKey: "test.message", + }), + }, + consumers: { + testConsumer: defineConsumer(queue, defineMessage(TestMessage), { + retryPolicy: { + maxAttempts: 0, // Fail fast + }, + }), + }, + }); + + let attemptCount = 0; + await workerFactory(contract, { + testConsumer: () => { + attemptCount++; + throw new Error("Simulated failure"); + }, + }); + + // WHEN + publishMessage("no-retry-exchange", "test.message", { + id: "test-1", + }); + + // THEN - Handler called once (initial attempt), no retries + // Note: maxAttempts: 0 means "process once with no retries", not "never process" + await vi.waitFor(() => { + if (attemptCount < 1) { + throw new Error("Not enough attempts yet"); + } + }); + + expect(attemptCount).toBe(1); + }); + + it("should send to DLX when maxAttempts exceeded", async ({ workerFactory, publishMessage }) => { + // GIVEN - Create main queue with DLX configured + const TestMessage = z.object({ + id: z.string(), + }); + + const mainExchange = defineExchange("main-exchange", "topic", { durable: false }); + const dlxExchange = defineExchange("dlx-exchange", "topic", { durable: false }); + const mainQueue = defineQueue("main-queue", { + durable: false, + deadLetter: { + exchange: dlxExchange, + routingKey: "failed", + }, + }); + const dlxQueue = defineQueue("dlx-queue", { durable: false }); + + const contract = defineContract({ + exchanges: { + main: mainExchange, + dlx: dlxExchange, + }, + queues: { + mainQueue, + dlxQueue, + }, + bindings: { + mainBinding: defineQueueBinding(mainQueue, mainExchange, { routingKey: "test.#" }), + dlxBinding: defineQueueBinding(dlxQueue, dlxExchange, { routingKey: "failed" }), + }, + publishers: { + mainPublisher: definePublisher(mainExchange, defineMessage(TestMessage), { + routingKey: "test.message", + }), + }, + consumers: { + mainConsumer: defineConsumer(mainQueue, defineMessage(TestMessage), { + retryPolicy: { + maxAttempts: 2, + backoff: { + type: "fixed", + initialInterval: 100, + }, + }, + }), + dlxConsumer: defineConsumer(dlxQueue, defineMessage(TestMessage)), + }, + }); + + let mainAttemptCount = 0; + const dlxMessages: Array<{ id: string }> = []; + + await workerFactory(contract, { + mainConsumer: () => { + mainAttemptCount++; + throw new Error("Simulated failure"); + }, + dlxConsumer: (msg) => { + dlxMessages.push(msg); + return Promise.resolve(); + }, + }); + + // WHEN + publishMessage("main-exchange", "test.message", { + id: "test-dlx-1", + }); + + // THEN + await vi.waitFor(() => { + if (mainAttemptCount < 2) { + throw new Error("Not enough attempts yet"); + } + }); + + expect(mainAttemptCount).toBe(2); // 2 attempts total + expect(dlxMessages).toEqual([{ id: "test-dlx-1" }]); + }); + + it("should successfully process message after transient failure", async ({ + workerFactory, + publishMessage, + }) => { + // GIVEN + const TestMessage = z.object({ + id: z.string(), + value: z.number(), + }); + + const exchange = defineExchange("transient-exchange", "topic", { durable: false }); + const queue = defineQueue("transient-queue", { durable: false }); + + const contract = defineContract({ + exchanges: { test: exchange }, + queues: { testQueue: queue }, + bindings: { + testBinding: defineQueueBinding(queue, exchange, { routingKey: "test.#" }), + }, + publishers: { + testPublisher: definePublisher(exchange, defineMessage(TestMessage), { + routingKey: "test.message", + }), + }, + consumers: { + testConsumer: defineConsumer(queue, defineMessage(TestMessage), { + retryPolicy: { + maxAttempts: 3, + backoff: { + type: "fixed", + initialInterval: 100, + }, + }, + }), + }, + }); + + let attemptCount = 0; + const successfulMessages: Array<{ id: string; value: number }> = []; + await workerFactory(contract, { + testConsumer: (msg: { id: string; value: number }) => { + attemptCount++; + // Fail first 2 attempts, succeed on 3rd + if (attemptCount < 3) { + throw new Error("Transient failure"); + } + successfulMessages.push(msg); + return Promise.resolve(); + }, + }); + + // WHEN + publishMessage("transient-exchange", "test.message", { + id: "test-transient", + value: 42, + }); + + // THEN - Message should eventually succeed + await vi.waitFor(() => { + if (attemptCount < 3) { + throw new Error("Not enough attempts yet"); + } + }); + + expect(attemptCount).toBe(3); + expect(successfulMessages).toEqual([{ id: "test-transient", value: 42 }]); + }); +}); diff --git a/packages/worker/src/__tests__/worker.spec.ts b/packages/worker/src/__tests__/worker.spec.ts index 5eaa08e2..4eaed020 100644 --- a/packages/worker/src/__tests__/worker.spec.ts +++ b/packages/worker/src/__tests__/worker.spec.ts @@ -1,5 +1,4 @@ import { - ContractDefinition, defineConsumer, defineContract, defineExchange, @@ -11,52 +10,9 @@ import { } from "@amqp-contract/contract"; import { describe, expect, vi } from "vitest"; import { TypedAmqpWorker } from "../worker.js"; -import type { WorkerInferConsumerHandlers } from "../types.js"; -import { it as baseIt } from "@amqp-contract/testing/extension"; +import { it } from "./context.js"; import { z } from "zod"; -const it = baseIt.extend<{ - workerFactory: ( - contract: TContract, - handlers: WorkerInferConsumerHandlers, - ) => Promise>; -}>({ - workerFactory: async ({ amqpConnectionUrl }, use) => { - const workers: Array> = []; - - try { - await use( - async ( - contract: TContract, - handlers: WorkerInferConsumerHandlers, - ) => { - const worker = await TypedAmqpWorker.create({ - contract, - handlers, - urls: [amqpConnectionUrl], - }).resultToPromise(); - - workers.push(worker); - return worker; - }, - ); - } finally { - // Clean up all workers before fixture cleanup (which deletes the vhost) - await Promise.all( - workers.map(async (worker) => { - try { - await worker.close().resultToPromise(); - } catch (error) { - // Swallow errors during cleanup to avoid unhandled rejections - // eslint-disable-next-line no-console - console.error("Failed to close worker during fixture cleanup:", error); - } - }), - ); - } - }, -}); - describe("AmqpWorker Integration", () => { it("should consume messages from a real RabbitMQ instance", async ({ workerFactory, diff --git a/packages/worker/src/retry.spec.ts b/packages/worker/src/retry.spec.ts new file mode 100644 index 00000000..bd94cfcf --- /dev/null +++ b/packages/worker/src/retry.spec.ts @@ -0,0 +1,383 @@ +import { + RETRY_COUNT_HEADER, + calculateBackoffDelay, + getRetryCount, + isNonRetryableError, + shouldRetry, +} from "./retry.js"; +import { describe, expect, it } from "vitest"; +import type { Message } from "amqplib"; +import type { RetryPolicy } from "./types.js"; + +describe("Retry utilities", () => { + describe("getRetryCount", () => { + it("should return 0 when retry count header is not present", () => { + const msg: Message = { + properties: { + headers: {}, + }, + } as Message; + + expect(getRetryCount(msg)).toBe(0); + }); + + it("should return 0 when headers are undefined", () => { + const msg: Message = { + properties: {}, + } as Message; + + expect(getRetryCount(msg)).toBe(0); + }); + + it("should return retry count from header", () => { + const msg: Message = { + properties: { + headers: { + [RETRY_COUNT_HEADER]: 3, + }, + }, + } as Message; + + expect(getRetryCount(msg)).toBe(3); + }); + + it("should return 0 for invalid retry count values", () => { + const msg: Message = { + properties: { + headers: { + [RETRY_COUNT_HEADER]: "invalid", + }, + }, + } as Message; + + expect(getRetryCount(msg)).toBe(0); + }); + + it("should return 0 for negative retry count values", () => { + const msg: Message = { + properties: { + headers: { + [RETRY_COUNT_HEADER]: -1, + }, + }, + } as Message; + + expect(getRetryCount(msg)).toBe(0); + }); + }); + + describe("calculateBackoffDelay", () => { + it("should return default delay when no backoff configured", () => { + const policy: RetryPolicy = { + maxAttempts: 3, + }; + + expect(calculateBackoffDelay(0, policy)).toBe(1_000); + }); + + it("should return initial delay for fixed backoff", () => { + const policy: RetryPolicy = { + maxAttempts: 3, + backoff: { + type: "fixed", + initialInterval: 2_000, + }, + }; + + expect(calculateBackoffDelay(0, policy)).toBe(2_000); + expect(calculateBackoffDelay(1, policy)).toBe(2_000); + expect(calculateBackoffDelay(5, policy)).toBe(2_000); + }); + + it("should calculate exponential backoff correctly", () => { + const policy: RetryPolicy = { + maxAttempts: 5, + backoff: { + type: "exponential", + initialInterval: 1_000, + coefficient: 2, + }, + }; + + expect(calculateBackoffDelay(0, policy)).toBe(1_000); // 1000 * 2^0 = 1000 + expect(calculateBackoffDelay(1, policy)).toBe(2_000); // 1000 * 2^1 = 2000 + expect(calculateBackoffDelay(2, policy)).toBe(4_000); // 1000 * 2^2 = 4000 + expect(calculateBackoffDelay(3, policy)).toBe(8_000); // 1000 * 2^3 = 8000 + }); + + it("should respect max delay for exponential backoff", () => { + const policy: RetryPolicy = { + maxAttempts: 10, + backoff: { + type: "exponential", + initialInterval: 1_000, + maxInterval: 5_000, + coefficient: 2, + }, + }; + + expect(calculateBackoffDelay(0, policy)).toBe(1_000); + expect(calculateBackoffDelay(1, policy)).toBe(2_000); + expect(calculateBackoffDelay(2, policy)).toBe(4_000); + expect(calculateBackoffDelay(3, policy)).toBe(5_000); // Capped at maxInterval + expect(calculateBackoffDelay(10, policy)).toBe(5_000); // Still capped + }); + + it("should use default values when not specified", () => { + const policy: RetryPolicy = { + maxAttempts: 3, + backoff: { + type: "exponential", + }, + }; + + // Default: initialInterval=1_000, coefficient=2, maxInterval=60_000 + expect(calculateBackoffDelay(0, policy)).toBe(1_000); + expect(calculateBackoffDelay(1, policy)).toBe(2_000); + }); + }); + + describe("shouldRetry", () => { + it("should default to no retries when no policy configured", () => { + const msg: Message = { + properties: { + headers: { + [RETRY_COUNT_HEADER]: 0, + }, + }, + } as Message; + + const result = shouldRetry(msg, undefined); + + // When no policy is configured, default to 1 attempt (no retries) + expect(result).toEqual({ + shouldRetry: false, + delay: 0, + currentRetryCount: 0, + }); + }); + + it("should allow retry when under max retries", () => { + const policy: RetryPolicy = { + maxAttempts: 3, + backoff: { + type: "fixed", + initialInterval: 1_000, + }, + }; + + const msg: Message = { + properties: { + headers: { + [RETRY_COUNT_HEADER]: 1, + }, + }, + } as Message; + + const result = shouldRetry(msg, policy); + + expect(result).toEqual({ + shouldRetry: true, + delay: 1_000, + currentRetryCount: 1, + }); + }); + + it("should disallow retry when max attempts reached", () => { + const policy: RetryPolicy = { + maxAttempts: 3, + }; + + // After 2 attempts (count 0, 1), we're about to do attempt 3 + // With maxAttempts=3, this should be the last attempt + // So after it fails (count 2), we should NOT retry + const msg: Message = { + properties: { + headers: { + [RETRY_COUNT_HEADER]: 2, + }, + }, + } as Message; + + const result = shouldRetry(msg, policy); + + expect(result).toEqual({ + shouldRetry: false, + delay: 0, + currentRetryCount: 2, + }); + }); + + it("should disallow retry when max retries exceeded", () => { + const policy: RetryPolicy = { + maxAttempts: 3, + }; + + const msg: Message = { + properties: { + headers: { + [RETRY_COUNT_HEADER]: 5, + }, + }, + } as Message; + + const result = shouldRetry(msg, policy); + + expect(result).toEqual({ + shouldRetry: false, + delay: 0, + currentRetryCount: 5, + }); + }); + + it("should calculate exponential backoff delay", () => { + const policy: RetryPolicy = { + maxAttempts: 5, + backoff: { + type: "exponential", + initialInterval: 1_000, + coefficient: 2, + }, + }; + + const msg: Message = { + properties: { + headers: { + [RETRY_COUNT_HEADER]: 2, + }, + }, + } as Message; + + const result = shouldRetry(msg, policy); + + expect(result).toEqual({ + shouldRetry: true, + delay: 4_000, // 1000 * 2^2 = 4000 + currentRetryCount: 2, + }); + }); + + it("should handle zero max retries (fail fast)", () => { + const policy: RetryPolicy = { + maxAttempts: 0, + }; + + const msg: Message = { + properties: { + headers: {}, + }, + } as Message; + + const result = shouldRetry(msg, policy); + + expect(result).toEqual({ + shouldRetry: false, + delay: 0, + currentRetryCount: 0, + }); + }); + }); + + describe("isNonRetryableError", () => { + it("should return false when no policy configured", () => { + const error = new Error("Test error"); + expect(isNonRetryableError(error, undefined)).toBe(false); + }); + + it("should return false when nonRetryableErrors not configured", () => { + const error = new Error("Test error"); + const policy: RetryPolicy = { maxAttempts: 3 }; + expect(isNonRetryableError(error, policy)).toBe(false); + }); + + it("should return false when nonRetryableErrors is empty", () => { + const error = new Error("Test error"); + const policy: RetryPolicy = { maxAttempts: 3, nonRetryableErrors: [] }; + expect(isNonRetryableError(error, policy)).toBe(false); + }); + + it("should match by error constructor name", () => { + class ValidationError extends Error { + constructor(message: string) { + super(message); + this.name = "ValidationError"; + } + } + const error = new ValidationError("Invalid input"); + const policy: RetryPolicy = { + maxAttempts: 3, + nonRetryableErrors: ["ValidationError"], + }; + expect(isNonRetryableError(error, policy)).toBe(true); + }); + + it("should match by error message substring (case-insensitive)", () => { + const error = new Error("Invalid format provided"); + const policy: RetryPolicy = { + maxAttempts: 3, + nonRetryableErrors: ["invalid format"], + }; + expect(isNonRetryableError(error, policy)).toBe(true); + }); + + it("should match by error message substring with different casing", () => { + const error = new Error("AUTHENTICATION FAILED"); + const policy: RetryPolicy = { + maxAttempts: 3, + nonRetryableErrors: ["authentication failed"], + }; + expect(isNonRetryableError(error, policy)).toBe(true); + }); + + it("should match multiple patterns", () => { + class AuthError extends Error { + constructor(message: string) { + super(message); + this.name = "AuthError"; + } + } + const error = new AuthError("Unauthorized access"); + const policy: RetryPolicy = { + maxAttempts: 3, + nonRetryableErrors: ["ValidationError", "AuthError", "timeout"], + }; + expect(isNonRetryableError(error, policy)).toBe(true); + }); + + it("should return false when no patterns match", () => { + const error = new Error("Connection timeout"); + const policy: RetryPolicy = { + maxAttempts: 3, + nonRetryableErrors: ["ValidationError", "AuthenticationError"], + }; + expect(isNonRetryableError(error, policy)).toBe(false); + }); + + it("should handle non-Error objects", () => { + const error = "string error"; + const policy: RetryPolicy = { + maxAttempts: 3, + nonRetryableErrors: ["string error"], + }; + expect(isNonRetryableError(error, policy)).toBe(true); + }); + + it("should handle non-Error objects that don't match", () => { + const error = "network failure"; + const policy: RetryPolicy = { + maxAttempts: 3, + nonRetryableErrors: ["ValidationError"], + }; + expect(isNonRetryableError(error, policy)).toBe(false); + }); + + it("should match partial error message substring", () => { + const error = new Error("The provided email format is invalid"); + const policy: RetryPolicy = { + maxAttempts: 3, + nonRetryableErrors: ["invalid"], + }; + expect(isNonRetryableError(error, policy)).toBe(true); + }); + }); +}); diff --git a/packages/worker/src/retry.ts b/packages/worker/src/retry.ts new file mode 100644 index 00000000..7481b958 --- /dev/null +++ b/packages/worker/src/retry.ts @@ -0,0 +1,103 @@ +import type { Message } from "amqplib"; +import type { RetryPolicy } from "./types.js"; + +/** + * Header key used to track attempt count in AMQP message headers. + * @internal + */ +export const RETRY_COUNT_HEADER = "x-retry-count"; + +/** + * Get the current attempt count from message headers. + * @param msg - The AMQP message + * @returns The current attempt count (0 for initial attempt) + * @internal + */ +export function getRetryCount(msg: Message): number { + const retryCount = msg.properties.headers?.[RETRY_COUNT_HEADER]; + if (typeof retryCount === "number" && retryCount >= 0) { + return retryCount; + } + return 0; +} + +/** + * Calculate the interval before the next retry using the backoff strategy. + * @param attemptNumber - Current attempt index (0 for initial attempt, 1 for first retry, etc.) + * @param policy - The retry policy configuration + * @returns Interval in milliseconds + * @internal + */ +export function calculateBackoffDelay(attemptNumber: number, policy: RetryPolicy): number { + const backoff = policy.backoff; + if (!backoff) { + return 1_000; // Default 1 second + } + + const type = backoff.type ?? "fixed"; + const initialInterval = backoff.initialInterval ?? 1_000; + const maxInterval = backoff.maxInterval ?? 60_000; + const coefficient = backoff.coefficient ?? 2; + + if (type === "fixed") { + return initialInterval; + } + + // Exponential backoff: initialInterval * (coefficient ^ attemptNumber) + const exponentialInterval = initialInterval * Math.pow(coefficient, attemptNumber); + return Math.min(exponentialInterval, maxInterval); +} + +/** + * Check if an error is non-retryable based on the retry policy. + * @param error - The error that was thrown + * @param policy - The retry policy configuration (optional) + * @returns True if the error should not be retried + * @internal + */ +export function isNonRetryableError(error: unknown, policy: RetryPolicy | undefined): boolean { + if (!policy?.nonRetryableErrors || policy.nonRetryableErrors.length === 0) { + return false; + } + + const errorName = error instanceof Error ? error.constructor.name : ""; + const errorMessage = error instanceof Error ? error.message : String(error); + + return policy.nonRetryableErrors.some((pattern) => { + // Match against error constructor name + if (errorName === pattern) { + return true; + } + // Match against error message (case-insensitive substring) + if (errorMessage.toLowerCase().includes(pattern.toLowerCase())) { + return true; + } + return false; + }); +} + +/** + * Check if a message should be retried based on the retry policy. + * @param msg - The AMQP message + * @param policy - The retry policy configuration (optional) + * @returns Object indicating if retry should happen and the interval + * @internal + */ +export function shouldRetry( + msg: Message, + policy: RetryPolicy | undefined, +): { shouldRetry: boolean; delay: number; currentRetryCount: number } { + const currentRetryCount = getRetryCount(msg); + // Default to 1 attempt (no retries) if no policy or maxAttempts specified + const maxAttempts = policy?.maxAttempts ?? 1; + + // Check if performing the next attempt would exceed the attempt limit + if (currentRetryCount + 1 >= maxAttempts) { + return { shouldRetry: false, delay: 0, currentRetryCount }; + } + + // Calculate backoff interval for this retry attempt + const delay = policy ? calculateBackoffDelay(currentRetryCount, policy) : 0; + + return { shouldRetry: true, delay, currentRetryCount }; +} diff --git a/packages/worker/src/types.ts b/packages/worker/src/types.ts index 4d38e32c..863a1ccc 100644 --- a/packages/worker/src/types.ts +++ b/packages/worker/src/types.ts @@ -5,6 +5,105 @@ import type { } from "@amqp-contract/contract"; import type { StandardSchemaV1 } from "@standard-schema/spec"; +/** + * Retry policy for handling failed message processing. + * + * Inspired by Temporal.io's retry policies, this configuration allows you to: + * - Limit the number of retry attempts to prevent infinite loops + * - Configure exponential backoff to reduce load during outages + * - Specify non-retryable errors that should immediately fail + * + * @example + * ```typescript + * const retryPolicy: RetryPolicy = { + * maxAttempts: 3, + * backoff: { + * type: 'exponential', + * initialInterval: 1000, + * maxInterval: 60000, + * coefficient: 2 + * }, + * nonRetryableErrors: ['ValidationError', 'AuthenticationError'] + * }; + * ``` + */ +export type RetryPolicy = { + /** + * Maximum number of attempts (initial attempt + retries). + * After this limit is reached, the message will be: + * - Sent to the dead letter exchange if configured on the queue + * - Rejected (nacked without requeue) if no dead letter exchange + * + * Set to 1 to process once with no retries on failure (fail fast). + * Set to 0 to process once with no retries (effectively same as 1). + * If not specified, defaults to 1 (no retries). + */ + maxAttempts?: number; + + /** + * Backoff strategy for retry intervals. + * Adds delay between retry attempts to avoid overwhelming the system. + */ + backoff?: { + /** + * Type of backoff strategy. + * - `fixed`: Same interval for every retry + * - `exponential`: Interval increases exponentially with each retry + * + * If not specified, defaults to 'fixed'. + */ + type?: "fixed" | "exponential"; + + /** + * Initial interval in milliseconds before the first retry. + * For exponential backoff, this is the base interval. + * + * If not specified, defaults to 1000ms (1 second). + */ + initialInterval?: number; + + /** + * Maximum interval in milliseconds between retries. + * Prevents exponential backoff from growing indefinitely. + * + * Only applies to exponential backoff. + * If not specified, defaults to 60000ms (60 seconds). + */ + maxInterval?: number; + + /** + * Multiplication coefficient for exponential backoff. + * Each retry interval is multiplied by this value. + * + * Formula: interval = initialInterval * (coefficient ^ attemptNumber) + * Capped at maxInterval to prevent unbounded growth. + * + * Only applies to exponential backoff. + * If not specified, defaults to 2. + */ + coefficient?: number; + }; + + /** + * List of error patterns that should not trigger retries. + * + * Errors matching these patterns will cause the message to be immediately + * rejected (sent to DLX if configured, otherwise discarded). + * + * Matching is done by: + * - Error constructor name (e.g., 'ValidationError') + * - Error message substring (e.g., 'invalid format') + * + * Inspired by Temporal.io's NonRetryableErrorTypes. + * + * @example + * ```typescript + * nonRetryableErrors: ['ValidationError', 'AuthenticationError', 'invalid format'] + * ``` + */ + nonRetryableErrors?: readonly string[]; +}; + /** * Infer the TypeScript type from a schema */ @@ -63,8 +162,8 @@ export type WorkerInferConsumerBatchHandler< * * Three patterns are supported: * 1. Simple handler: `async (message) => { ... }` - * 2. Handler with prefetch: `[async (message) => { ... }, { prefetch: 10 }]` - * 3. Batch handler: `[async (messages) => { ... }, { batchSize: 5, batchTimeout: 1000 }]` + * 2. Handler with options: `[async (message) => { ... }, { prefetch: 10, retryPolicy: {...} }]` + * 3. Batch handler: `[async (messages) => { ... }, { batchSize: 5, batchTimeout: 1000, retryPolicy: {...} }]` */ export type WorkerInferConsumerHandlerEntry< TContract extends ContractDefinition, @@ -73,11 +172,11 @@ export type WorkerInferConsumerHandlerEntry< | WorkerInferConsumerHandler | readonly [ WorkerInferConsumerHandler, - { prefetch?: number; batchSize?: never; batchTimeout?: never }, + { prefetch?: number; batchSize?: never; batchTimeout?: never; retryPolicy?: RetryPolicy }, ] | readonly [ WorkerInferConsumerBatchHandler, - { prefetch?: number; batchSize: number; batchTimeout?: number }, + { prefetch?: number; batchSize: number; batchTimeout?: number; retryPolicy?: RetryPolicy }, ]; /** diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 87829acd..761a30b1 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -8,6 +8,7 @@ import type { } from "@amqp-contract/contract"; import { Future, Result } from "@swan-io/boxed"; import { MessageValidationError, TechnicalError } from "./errors.js"; +import { RETRY_COUNT_HEADER, isNonRetryableError, shouldRetry } from "./retry.js"; import type { WorkerInferConsumerBatchHandler, WorkerInferConsumerHandler, @@ -22,6 +23,7 @@ import { decompressBuffer } from "./decompression.js"; * Uses discriminated union to enforce mutual exclusivity: * - Prefetch-only mode: Cannot have batchSize or batchTimeout * - Batch mode: Requires batchSize, allows batchTimeout and prefetch + * Both modes support retryPolicy configuration. */ type ConsumerOptions = | { @@ -29,14 +31,18 @@ type ConsumerOptions = prefetch?: number; batchSize?: never; batchTimeout?: never; + retryPolicy?: RetryPolicy; } | { /** Batch-based processing */ prefetch?: number; batchSize: number; batchTimeout?: number; + retryPolicy?: RetryPolicy; }; +type RetryPolicy = import("./types.js").RetryPolicy; + /** * Options for creating a type-safe AMQP worker. * @@ -476,6 +482,171 @@ export class TypedAmqpWorker { >; } + /** + * Handle message retry logic based on consumer retry policy. + * This method decides whether to retry, send to DLX, or reject the message. + * + * @param msg - The AMQP message + * @param consumer - The consumer definition + * @param consumerName - The consumer name for logging + * @param error - The error that caused the failure (for non-retryable error detection) + * @returns Promise that resolves when retry handling is complete + */ + private async handleMessageRetry>( + msg: Message, + consumer: ConsumerDefinition, + consumerName: TName, + error?: unknown, + ): Promise { + // Get retry policy from consumer options (worker-level configuration) + const retryPolicy = this.consumerOptions[consumerName]?.retryPolicy; + + // No retry policy configured - default to single attempt (no retries) + if (!retryPolicy) { + this.logger?.info("Rejecting message (no retry policy configured, default behavior)", { + consumerName: String(consumerName), + queueName: consumer.queue.name, + }); + this.amqpClient.channel.nack(msg, false, false); + return; + } + + // Check if error is non-retryable + if (error && isNonRetryableError(error, retryPolicy)) { + this.logger?.warn("Message has non-retryable error, rejecting immediately", { + consumerName: String(consumerName), + queueName: consumer.queue.name, + error, + errorType: error instanceof Error ? error.constructor.name : typeof error, + }); + this.amqpClient.channel.nack(msg, false, false); + return; + } + + const { + shouldRetry: shouldRetryMessage, + delay, + currentRetryCount, + } = shouldRetry(msg, retryPolicy); + + if (!shouldRetryMessage) { + // Max attempts exceeded - reject without requeue + // The message will go to DLX if configured on the queue, otherwise discarded + this.logger?.warn("Message attempt limit exceeded, rejecting", { + consumerName: String(consumerName), + queueName: consumer.queue.name, + attemptCount: currentRetryCount, + maxAttempts: retryPolicy.maxAttempts, + }); + this.amqpClient.channel.nack(msg, false, false); + return; + } + + // Increment attempt count and schedule retry + const newAttemptCount = currentRetryCount + 1; + + // Update attempt count in headers for next attempt + // Store original exchange and routing key for proper retry routing + const headers = { + ...msg.properties.headers, + [RETRY_COUNT_HEADER]: newAttemptCount, + "x-original-exchange": msg.properties.headers?.["x-original-exchange"] ?? msg.fields.exchange, + "x-original-routing-key": + msg.properties.headers?.["x-original-routing-key"] ?? msg.fields.routingKey, + }; + + // Get the exchange and routing key to use for republishing + // For retry, publish directly to the consumer queue to avoid broadcasting + // to all queues bound to the exchange. Use empty exchange (default) and queue name as routing key. + // TODO: Consider using RabbitMQ's delayed message exchange or similar for more robust retry + // handling that persists across application crashes (currently ACK then schedule in memory). + const exchange = ""; + const routingKey = consumer.queue.name; + + // Acknowledge the original message immediately to free up the consumer + // callback and avoid blocking the consumer while waiting for retry interval + this.amqpClient.channel.ack(msg); + + // Schedule asynchronous retry after backoff interval + // Use void IIFE to avoid blocking the consumer callback + void (async () => { + try { + if (delay > 0) { + // Apply backoff interval before requeuing + this.logger?.info("Scheduling message retry with backoff", { + consumerName: String(consumerName), + queueName: consumer.queue.name, + attemptCount: newAttemptCount, + intervalMs: delay, + routingKey, + }); + + // Wait for backoff interval + await new Promise((resolve) => setTimeout(resolve, delay)); + } else { + this.logger?.info("Requeuing message for immediate retry", { + consumerName: String(consumerName), + queueName: consumer.queue.name, + attemptCount: newAttemptCount, + routingKey, + }); + } + + // Republish the message directly to the consumer queue with incremented attempt count + const published = await this.amqpClient.channel.publish(exchange, routingKey, msg.content, { + contentType: msg.properties.contentType, + contentEncoding: msg.properties.contentEncoding, + headers, + deliveryMode: msg.properties.deliveryMode, + priority: msg.properties.priority, + correlationId: msg.properties.correlationId, + replyTo: msg.properties.replyTo, + expiration: msg.properties.expiration, + messageId: msg.properties.messageId, + timestamp: msg.properties.timestamp, + type: msg.properties.type, + userId: msg.properties.userId, + appId: msg.properties.appId, + }); + + if (published) { + this.logger?.info("Message republished for retry", { + consumerName: String(consumerName), + queueName: consumer.queue.name, + attemptCount: newAttemptCount, + exchange, + routingKey, + }); + } else { + // The message could not be queued because the channel write buffer is full. + // The original message has already been acknowledged, so this message is lost. + this.logger?.error( + "Failed to republish message for retry - channel write buffer full (message lost)", + { + consumerName: String(consumerName), + queueName: consumer.queue.name, + attemptCount: newAttemptCount, + exchange, + routingKey, + }, + ); + } + } catch (error) { + // The original message has already been acknowledged at this point. + // We can only log the failure; it is not safe to nack after ack. + // This represents permanent message loss. + this.logger?.error("Failed to republish message for retry (message lost)", { + consumerName: String(consumerName), + queueName: consumer.queue.name, + attemptCount: newAttemptCount, + exchange, + routingKey, + error, + }); + } + })(); + } + /** * Consume messages one at a time */ @@ -499,20 +670,15 @@ export class TypedAmqpWorker { // Parse and validate message await this.parseAndValidateMessage(msg, consumer, consumerName) .flatMapOk((validatedMessage) => - Future.fromPromise(handler(validatedMessage)).tapError((error) => { + Future.fromPromise(handler(validatedMessage)).tapError(async (error) => { this.logger?.error("Error processing message", { consumerName: String(consumerName), queueName: consumer.queue.name, error, }); - // Requeue failed messages for retry - // NOTE: This strategy assumes transient failures that may succeed on retry. - // For production use, consider: - // - Implementing retry limits to prevent infinite loops - // - Using dead letter exchanges for permanently failed messages - // - Adding exponential backoff between retries - this.amqpClient.channel.nack(msg, false, true); + // Handle retry logic based on retry policy + await this.handleMessageRetry(msg, consumer, consumerName, error); }), ) .tapOk(() => { @@ -608,15 +774,26 @@ export class TypedAmqpWorker { error, }); - // Requeue all failed messages for retry - // NOTE: This strategy assumes transient failures that may succeed on retry. - // For production use, consider: - // - Implementing retry limits to prevent infinite loops - // - Using dead letter exchanges for permanently failed messages - // - Adding exponential backoff between retries - // - Implementing partial batch success handling + /** + * Handle retry for all messages in the failed batch. + * + * Batch processing is an all-or-nothing operation: if the batch handler + * throws, every message in the batch is treated as failed for this + * attempt and passed through the retry logic. + * + * Retry decisions (based on attempt count and policy) are evaluated + * per message inside handleMessageRetry. However, all messages in a + * failing batch are retried or rejected together according to their + * individual attempt counts. + * + * This is an intentional limitation of batch mode: messages that might + * have succeeded individually are retried/rejected with the batch. + * + * Note: handleMessageRetry ACKs immediately and schedules async retry, + * so this loop completes quickly without blocking on retry intervals. + */ for (const item of currentBatch) { - this.amqpClient.channel.nack(item.amqpMessage, false, true); + await this.handleMessageRetry(item.amqpMessage, consumer, consumerName, error); } } finally { isProcessing = false; diff --git a/samples/basic-order-processing-contract/src/index.ts b/samples/basic-order-processing-contract/src/index.ts index e87895f2..bcf11e59 100644 --- a/samples/basic-order-processing-contract/src/index.ts +++ b/samples/basic-order-processing-contract/src/index.ts @@ -91,8 +91,21 @@ const { publisher: orderCreatedPublisher, createConsumer: createOrderCreatedCons }); // Create consumer for processing queue using publisher-first pattern -const { consumer: processOrderConsumer, binding: processOrderBinding } = - createOrderCreatedConsumer(orderProcessingQueue); +// We use the binding from this but define the actual consumer with retry policy below +const { binding: processOrderBinding } = createOrderCreatedConsumer(orderProcessingQueue); + +// Add retry policy to the consumer for robust error handling +const processOrderConsumerWithRetry = defineConsumer(orderProcessingQueue, orderMessage, { + retryPolicy: { + maxAttempts: 3, + backoff: { + type: "exponential", + initialInterval: 1000, + maxInterval: 60000, + coefficient: 2, + }, + }, +}); /** * RECOMMENDED APPROACH: Consumer-First Pattern (Command-Oriented) @@ -115,6 +128,7 @@ const { * 2. Consumer-First Pattern: shipOrderConsumer ensures publisher matches consumer * 3. Traditional Approach: For advanced scenarios like exchange-to-exchange bindings * 4. Dead Letter Exchange: Failed messages from orderProcessingQueue are routed to DLX + * 5. Retry Policies: Configured with exponential backoff to prevent infinite loops * * Benefits of Publisher-First / Consumer-First: * - Guaranteed message schema consistency @@ -125,6 +139,11 @@ const { * - Failed or rejected messages are automatically routed to a DLX * - Messages that exceed TTL are moved to DLX * - Enables message retry and error handling strategies + * + * Retry Policy Pattern: + * - Prevents infinite retry loops with maxAttempts limit + * - Exponential backoff reduces load during outages + * - Messages exceeding attempt limit are sent to DLX for inspection */ export const orderContract = defineContract({ exchanges: { @@ -207,8 +226,8 @@ export const orderContract = defineContract({ }), }, consumers: { - // Consumer from Publisher-First pattern (same message schema guaranteed) - processOrder: processOrderConsumer, + // Consumer with retry policy for production use (prevents infinite loops) + processOrder: processOrderConsumerWithRetry, // Traditional consumer for notifications (receives all order events via wildcard) notifyOrder: defineConsumer(orderNotificationsQueue, orderUnionMessage), @@ -221,6 +240,7 @@ export const orderContract = defineContract({ processAnalytics: defineConsumer(analyticsProcessingQueue, orderUnionMessage), // Consumer for dead letter queue (handles failed messages) + // No retry policy needed here as these are already failed messages handleFailedOrders: defineConsumer(ordersDlxQueue, orderMessage), }, });