Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,25 @@ packages/[package-name]/
### ✅ Required Practices

1. **README Files**
- Every package must have a README.md
- Include: description, installation, usage examples, API overview
- Keep examples up-to-date with code
- README files should stay simple with just an introduction to concepts
- Keep READMEs concise: key features, quick examples, and links to full documentation
- Full documentation belongs in the `docs/` directory to maintain a single source of truth
- README example: brief feature list → quick code snippet → link to docs site
- This prevents documentation drift and duplication

```markdown
## Feature Name

Brief introduction to the feature (1-2 sentences).

**Quick Example:**

\`\`\`typescript
// Minimal working example
\`\`\`

📖 **[Learn more →](https://btravers.github.io/amqp-contract/guide/feature)**
```

2. **Code Comments**
- Use JSDoc for all public APIs
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"setup": "pnpm install && pnpm build",
"sort-package-json": "sort-package-json '**/package.json' '!**/node_modules/**'",
"test": "turbo run test",
"test:integration": "turbo run test:integration",
"test:integration": "turbo run test:integration --concurrency=1",
"typecheck": "turbo run typecheck",
"version": "changeset version"
},
Expand Down
6 changes: 6 additions & 0 deletions packages/contract/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ const contract = defineContract({
- ✅ Event-oriented (publisher-first) and command-oriented (consumer-first) patterns
- ✅ Flexible routing key patterns for topic exchanges

## Error Handling & Retry Policies

Error handling and retry policies are configured at the **worker level**, not in contracts. Contracts define message schemas and topology, while workers control retry behavior (including exponential backoff and dead letter integration) during message processing.

📖 **[Learn more about retry policies and error handling in workers →](https://btravers.github.io/amqp-contract/guide/worker-usage.html#retry-policies)**

## Documentation

📖 **[Read the full documentation →](https://btravers.github.io/amqp-contract)**
Expand Down
45 changes: 33 additions & 12 deletions packages/contract/src/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -598,9 +598,14 @@ export function definePublisher<TMessage extends MessageDefinition>(
* Consumers are associated with a specific queue and message type. When you create a worker
* with this consumer, it will process messages from the queue according to the schema.
*
* **Retry policy configuration has moved to worker-level options.**
* This separates contract definition (what messages look like) from deployment configuration
* (how to handle failures). Configure retry policies when creating the worker, similar to
* how you configure prefetch.
*
* @param queue - The queue definition to consume from
* @param message - The message definition with payload schema
* @param options - Optional consumer configuration
* @param options - Optional consumer configuration (currently unused, kept for future extensibility)
* @returns A consumer definition with inferred message types
*
* @example
Expand All @@ -616,19 +621,35 @@ export function definePublisher<TMessage extends MessageDefinition>(
* })
* );
*
* // Basic consumer (contract-level definition)
* const processOrderConsumer = defineConsumer(orderQueue, orderMessage);
*
* // Later, when creating a worker, you'll provide a handler for this consumer:
* // const worker = await TypedAmqpWorker.create({
* // contract,
* // handlers: {
* // processOrder: async (message) => {
* // // message is automatically typed based on the schema
* // console.log(message.orderId); // string
* // }
* // },
* // connection
* // });
* // Later, when creating a worker, configure retry policy (deployment-level):
* const worker = await TypedAmqpWorker.create({
* contract,
* handlers: {
* processOrder: [
* async (message) => {
* // message is automatically typed based on the schema
* console.log(message.orderId); // string
* },
* {
* prefetch: 10,
* retryPolicy: {
* maxAttempts: 3,
* backoff: {
* type: 'exponential',
* initialInterval: 1_000,
* maxInterval: 60_000,
* coefficient: 2
* },
* nonRetryableErrors: ['ValidationError']
* }
* }
* ]
* },
* urls: ['amqp://localhost']
* }).resultToPromise();
* ```
*/
export function defineConsumer<TMessage extends MessageDefinition>(
Expand Down
22 changes: 22 additions & 0 deletions packages/contract/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -418,14 +418,36 @@ export type PublisherDefinition<TMessage extends MessageDefinition = MessageDefi
* If the message is compressed (indicated by the content-encoding header), it will be
* automatically decompressed before validation.
*
* Retry policy configuration has been moved to worker-level options (similar to prefetch).
* This separates contract definition (message schemas) from deployment configuration (retry behavior).
*
* @template TMessage - The message definition with payload schema
*
* @example
* ```typescript
* // Basic consumer definition (contract-level)
* const consumer: ConsumerDefinition = {
* queue: orderProcessingQueue,
* message: orderMessage
* };
*
* // Retry policy configured at worker level (deployment-level)
* const worker = await TypedAmqpWorker.create({
* contract,
* handlers: {
* processOrder: [
* async (message) => { ... },
* {
* prefetch: 10,
* retryPolicy: {
* maxAttempts: 3,
* backoff: { type: 'exponential', initialInterval: 1_000 }
* }
* }
* ]
* },
* urls: ['amqp://localhost']
* }).resultToPromise();
* ```
*/
export type ConsumerDefinition<TMessage extends MessageDefinition = MessageDefinition> = {
Expand Down
85 changes: 84 additions & 1 deletion packages/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pnpm add @amqp-contract/worker

- ✅ **Type-safe message consumption** — Handlers are fully typed based on your contract
- ✅ **Automatic validation** — Messages are validated before reaching your handlers
- ✅ **Retry policies** — Configurable retry limits with exponential backoff to prevent infinite loops
- ✅ **Dead letter exchange support** — Automatically route permanently failed messages to DLX
- ✅ **Prefetch configuration** — Control message flow with per-consumer prefetch settings
- ✅ **Batch processing** — Process multiple messages at once for better throughput
- ✅ **Automatic reconnection** — Built-in connection management with failover support
Expand Down Expand Up @@ -75,6 +77,77 @@ You can define handlers outside of the worker creation using `defineHandler` and

## Error Handling

### Retry Policies (Production-Ready)

**For production use, always configure a retry policy** to prevent infinite retry loops and handle permanently failed messages gracefully.

```typescript
import { defineConsumer, defineQueue, defineMessage } from "@amqp-contract/contract";
import { z } from "zod";

const orderQueue = defineQueue("order-processing", {
durable: true,
deadLetter: {
exchange: dlxExchange, // Messages that exceed retry limit go here
routingKey: "order.failed",
},
});

const orderMessage = defineMessage(
z.object({
orderId: z.string(),
amount: z.number(),
}),
);

const processOrderConsumer = defineConsumer(orderQueue, orderMessage);

// Configure retry policy at worker level, not in contract
const worker = await TypedAmqpWorker.create({
contract,
handlers: {
processOrder: [
async (message) => {
/* handler */
},
{
prefetch: 10,
retryPolicy: {
maxAttempts: 3, // Maximum 3 attempts (initial + 2 retries)
backoff: {
type: "exponential", // or "fixed"
initialInterval: 1_000, // Start with 1 second
maxInterval: 60_000, // Cap at 60 seconds
coefficient: 2, // Double interval each retry (1s, 2s, 4s, ...)
},
nonRetryableErrors: ["ValidationError", "AuthenticationError"],
},
},
],
},
urls: ["amqp://localhost"],
}).resultToPromise();
```

**Retry Policy Options:**

- `maxAttempts`: Maximum number of attempts (initial + retries, defaults to 1 if not specified)
- `backoff.type`: `"fixed"` (same interval) or `"exponential"` (increasing interval)
- `backoff.initialInterval`: Interval in milliseconds before first retry (default: 1_000)
- `backoff.maxInterval`: Maximum interval for exponential backoff (default: 60_000)
- `backoff.coefficient`: Multiplier for exponential backoff (default: 2)
- `nonRetryableErrors`: Array of error constructor names or message substrings that should not be retried

**Behavior:**

- Messages are retried up to `maxAttempts` times with configurable backoff intervals
- Attempt count is tracked in message headers (`x-retry-count`)
- After exhausting attempts, messages are sent to the dead letter exchange (if configured)
- If no DLX is configured, messages are rejected without requeue
- Errors matching `nonRetryableErrors` are immediately rejected (not retried)

### Basic Error Handling

Worker handlers use standard Promise-based async/await pattern:

```typescript
Expand All @@ -86,7 +159,7 @@ handlers: {
// Message acknowledged automatically on success
} catch (error) {
// Exception automatically caught by worker
// Message is requeued for retry
// Message is retried according to retry policy
throw error;
}
};
Expand All @@ -102,6 +175,16 @@ Worker defines error classes for internal use:

These errors are logged but **handlers don't need to use them** - just throw standard exceptions.

### Migration from Legacy Behavior

**BREAKING CHANGE**: The default behavior has changed. Previously, consumers without retry policies would retry infinitely. Now they default to 1 attempt (no retries) to prevent infinite loops.

To migrate:

1. Add a dead letter exchange to your queue configuration (optional but recommended)
2. Configure a retry policy at the worker level in your handler options (not in contract definition)
3. Test with your actual failure scenarios to tune the retry parameters

## API

For complete API documentation, see the [Worker API Reference](https://btravers.github.io/amqp-contract/api/worker).
Expand Down
44 changes: 44 additions & 0 deletions packages/worker/src/__tests__/context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import type { ContractDefinition } from "@amqp-contract/contract";
import { TypedAmqpWorker } from "../worker.js";
import type { WorkerInferConsumerHandlers } from "../types.js";
import { it as baseIt } from "@amqp-contract/testing/extension";

export const it = baseIt.extend<{
workerFactory: <TContract extends ContractDefinition>(
contract: TContract,
handlers: WorkerInferConsumerHandlers<TContract>,
) => Promise<TypedAmqpWorker<TContract>>;
}>({
workerFactory: async ({ amqpConnectionUrl }, use) => {
const workers: Array<TypedAmqpWorker<ContractDefinition>> = [];

try {
await use(
async <TContract extends ContractDefinition>(
contract: TContract,
handlers: WorkerInferConsumerHandlers<TContract>,
) => {
const worker = await TypedAmqpWorker.create({
contract,
handlers,
urls: [amqpConnectionUrl],
}).resultToPromise();

workers.push(worker);
return worker;
},
);
} finally {
await Promise.all(
workers.map(async (worker) => {
try {
await worker.close().resultToPromise();
} catch (error) {
// Swallow errors during cleanup
console.error("Failed to close worker during fixture cleanup:", error);
}
}),
);
}
},
});
47 changes: 1 addition & 46 deletions packages/worker/src/__tests__/worker-prefetch-batch.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
/* oxlint-disable eslint/sort-imports */
import {
ContractDefinition,
defineConsumer,
defineContract,
defineExchange,
Expand All @@ -11,51 +9,8 @@ import {
} from "@amqp-contract/contract";
import { describe, expect, vi } from "vitest";
import { TypedAmqpWorker } from "../worker.js";
import { it as baseIt } from "@amqp-contract/testing/extension";
import { it } from "./context.js";
import { z } from "zod";
import type { WorkerInferConsumerHandlers } from "../types.js";

const it = baseIt.extend<{
workerFactory: <TContract extends ContractDefinition>(
contract: TContract,
handlers: WorkerInferConsumerHandlers<TContract>,
) => Promise<TypedAmqpWorker<TContract>>;
}>({
workerFactory: async ({ amqpConnectionUrl }, use) => {
const workers: Array<TypedAmqpWorker<ContractDefinition>> = [];

try {
await use(
async <TContract extends ContractDefinition>(
contract: TContract,
handlers: WorkerInferConsumerHandlers<TContract>,
) => {
const worker = await TypedAmqpWorker.create({
contract,
handlers,
urls: [amqpConnectionUrl],
}).resultToPromise();

workers.push(worker);
return worker;
},
);
} finally {
// Clean up all workers before fixture cleanup (which deletes the vhost)
await Promise.all(
workers.map(async (worker) => {
try {
await worker.close().resultToPromise();
} catch (error) {
// Swallow errors during cleanup to avoid unhandled rejections
// eslint-disable-next-line no-console
console.error("Failed to close TypedAmqpWorker during fixture cleanup:", error);
}
}),
);
}
},
});

describe("AmqpWorker Prefetch and Batch Integration", () => {
it("should apply prefetch configuration to consumer", async ({
Expand Down
Loading