Skip to content
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,25 @@ packages/[package-name]/
### ✅ Required Practices

1. **README Files**
- Every package must have a README.md
- Include: description, installation, usage examples, API overview
- Keep examples up-to-date with code
- README files should stay simple with just an introduction to concepts
- Keep READMEs concise: key features, quick examples, and links to full documentation
- Full documentation belongs in the `docs/` directory to maintain a single source of truth
- README example: brief feature list → quick code snippet → link to docs site
- This prevents documentation drift and duplication

```markdown
## Feature Name

Brief introduction to the feature (1-2 sentences).

**Quick Example:**

\`\`\`typescript
// Minimal working example
\`\`\`

📖 **[Learn more →](https://btravers.github.io/amqp-contract/guide/feature)**
```

2. **Code Comments**
- Use JSDoc for all public APIs
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"setup": "pnpm install && pnpm build",
"sort-package-json": "sort-package-json '**/package.json' '!**/node_modules/**'",
"test": "turbo run test",
"test:integration": "turbo run test:integration",
"test:integration": "turbo run test:integration --concurrency=1",
"typecheck": "turbo run typecheck",
"version": "changeset version"
},
Expand Down
17 changes: 17 additions & 0 deletions packages/contract/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ const contract = defineContract({
- ✅ Event-oriented (publisher-first) and command-oriented (consumer-first) patterns
- ✅ Flexible routing key patterns for topic exchanges

## Error Handling & Retry Policies

Configure retry policies on consumers to prevent infinite retry loops and handle permanent failures gracefully. Supports exponential backoff and dead letter exchange integration.

**Quick Example:**

```typescript
const consumer = defineConsumer(queue, message, {
retryPolicy: {
maxAttempts: 3,
backoff: { type: "exponential", initialInterval: 1000 },
},
});
```

📖 **[Learn more about retry policies and error handling →](https://btravers.github.io/amqp-contract/guide/worker-usage.html#retry-policies)**

## Documentation

📖 **[Read the full documentation →](https://btravers.github.io/amqp-contract)**
Expand Down
17 changes: 16 additions & 1 deletion packages/contract/src/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,8 @@ export function definePublisher<TMessage extends MessageDefinition>(
*
* @param queue - The queue definition to consume from
* @param message - The message definition with payload schema
* @param options - Optional consumer configuration
* @param options - Optional consumer configuration including retry policy
* @param options.retryPolicy - Retry policy for handling failed message processing
* @returns A consumer definition with inferred message types
*
* @example
Expand All @@ -616,8 +617,22 @@ export function definePublisher<TMessage extends MessageDefinition>(
* })
* );
*
* // Basic consumer
* const processOrderConsumer = defineConsumer(orderQueue, orderMessage);
*
* // Consumer with retry policy for production use
* const robustConsumer = defineConsumer(orderQueue, orderMessage, {
* retryPolicy: {
* maxAttempts: 3,
* backoff: {
* type: 'exponential',
* initialInterval: 1000,
* maxInterval: 60000,
* coefficient: 2
* }
* }
* });
*
* // Later, when creating a worker, you'll provide a handler for this consumer:
* // const worker = await TypedAmqpWorker.create({
* // contract,
Expand Down
1 change: 1 addition & 0 deletions packages/contract/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ export type {
QueueDefinition,
InferPublisherNames,
InferConsumerNames,
RetryPolicy,
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contract package exports RetryPolicy in its index file (line 41), but the type is not actually defined in the contract package's types.ts file. According to the PR description, RetryPolicy was moved from the contract package to the worker package as a worker-level configuration option.

This export should be removed from the contract package index, as the type should only be exported from the worker package where it is defined.

Suggested change
RetryPolicy,

Copilot uses AI. Check for mistakes.
} from "./types.js";
124 changes: 124 additions & 0 deletions packages/contract/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,101 @@ export type PublisherDefinition<TMessage extends MessageDefinition = MessageDefi
}
);

/**
* Retry policy configuration for handling failed message processing.
*
* Inspired by Temporal's retry policy design, this configuration allows
* fine-grained control over retry behavior for failed messages.
*
* @example
* ```typescript
* const retryPolicy: RetryPolicy = {
* maxAttempts: 3,
* backoff: {
* type: 'exponential',
* initialInterval: 1000,
* maxInterval: 60000,
* coefficient: 2
* }
* };
* ```
*/
export type RetryPolicy = {
/**
* Maximum number of attempts (initial attempt + retries).
* After this limit is reached, the message will be:
* - Sent to the dead letter exchange if configured on the queue
* - Rejected (nacked without requeue) if no dead letter exchange
*
* Set to 1 to process once with no retries on failure (fail fast).
* Set to 0 to process once with no retries (effectively same as 1).
* If not specified, defaults to 1 (no retries).
*/
maxAttempts?: number;

/**
* Backoff strategy for retry intervals.
* Adds delay between retry attempts to avoid overwhelming the system.
*/
backoff?: {
/**
* Type of backoff strategy.
* - `fixed`: Same interval for every retry
* - `exponential`: Interval increases exponentially with each retry
*
* If not specified, defaults to 'fixed'.
*/
type?: "fixed" | "exponential";

/**
* Initial interval in milliseconds before the first retry.
* For exponential backoff, this is the base interval.
*
* If not specified, defaults to 1000ms (1 second).
*/
initialInterval?: number;

/**
* Maximum interval in milliseconds between retries.
* Prevents exponential backoff from growing indefinitely.
*
* Only applies to exponential backoff.
* If not specified, defaults to 60000ms (60 seconds).
*/
maxInterval?: number;

/**
* Multiplication coefficient for exponential backoff.
* Each retry interval is multiplied by this value.
*
* Formula: interval = initialInterval * (coefficient ^ attemptNumber)
*
* Only applies to exponential backoff.
* If not specified, defaults to 2.
*/
coefficient?: number;
};

/**
* List of error types (constructor names or error messages) that should NOT be retried.
* Similar to Temporal's NonRetryableErrorTypes.
*
* When a handler throws an error matching one of these patterns, the message will be:
* - Sent to the dead letter exchange if configured on the queue
* - Rejected (nacked without requeue) if no dead letter exchange
*
* Patterns can be:
* - Error constructor name (e.g., 'ValidationError', 'TypeError')
* - Substring of error message (case-insensitive)
*
* @example
* ```typescript
* nonRetryableErrors: ['ValidationError', 'AuthenticationError', 'invalid format']
* ```
*/
nonRetryableErrors?: readonly string[];
};

/**
* Definition of a message consumer.
*
Expand All @@ -422,10 +517,26 @@ export type PublisherDefinition<TMessage extends MessageDefinition = MessageDefi
*
* @example
* ```typescript
* // Basic consumer
* const consumer: ConsumerDefinition = {
* queue: orderProcessingQueue,
* message: orderMessage
* };
*
* // Consumer with retry policy
* const consumerWithRetry: ConsumerDefinition = {
* queue: orderProcessingQueue,
* message: orderMessage,
* retryPolicy: {
* maxAttempts: 3,
* backoff: {
* type: 'exponential',
* initialInterval: 1000,
* maxInterval: 60000,
* coefficient: 2
* }
* }
* };
* ```
*/
export type ConsumerDefinition<TMessage extends MessageDefinition = MessageDefinition> = {
Expand All @@ -434,6 +545,19 @@ export type ConsumerDefinition<TMessage extends MessageDefinition = MessageDefin

/** The message definition including the payload schema */
message: TMessage;

/**
* Retry policy for handling failed message processing.
*
* When configured, failed messages will be retried up to maxAttempts times
* with optional exponential backoff between attempts. After exhausting retries,
* messages will be sent to the dead letter exchange if configured on the queue,
* or rejected without requeue.
*
* If not specified, messages will be requeued indefinitely on failure (legacy behavior).
* **For production use, always configure a retry policy to prevent infinite loops.**
*/
retryPolicy?: RetryPolicy;
};

/**
Expand Down
67 changes: 66 additions & 1 deletion packages/worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pnpm add @amqp-contract/worker

- ✅ **Type-safe message consumption** — Handlers are fully typed based on your contract
- ✅ **Automatic validation** — Messages are validated before reaching your handlers
- ✅ **Retry policies** — Configurable retry limits with exponential backoff to prevent infinite loops
- ✅ **Dead letter exchange support** — Automatically route permanently failed messages to DLX
- ✅ **Prefetch configuration** — Control message flow with per-consumer prefetch settings
- ✅ **Batch processing** — Process multiple messages at once for better throughput
- ✅ **Automatic reconnection** — Built-in connection management with failover support
Expand Down Expand Up @@ -75,6 +77,59 @@ You can define handlers outside of the worker creation using `defineHandler` and

## Error Handling

### Retry Policies (Production-Ready)

**For production use, always configure a retry policy** to prevent infinite retry loops and handle permanently failed messages gracefully.

```typescript
import { defineConsumer, defineQueue, defineMessage } from "@amqp-contract/contract";
import { z } from "zod";

const orderQueue = defineQueue("order-processing", {
durable: true,
deadLetter: {
exchange: dlxExchange, // Messages that exceed retry limit go here
routingKey: "order.failed",
},
});

const orderMessage = defineMessage(
z.object({
orderId: z.string(),
amount: z.number(),
}),
);

const processOrderConsumer = defineConsumer(orderQueue, orderMessage, {
retryPolicy: {
maxAttempts: 3, // Maximum 3 attempts (initial + 2 retries)
backoff: {
type: "exponential", // or "fixed"
initialInterval: 1000, // Start with 1 second
maxInterval: 60000, // Cap at 60 seconds
coefficient: 2, // Double interval each retry (1s, 2s, 4s, ...)
},
},
});
```
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The README example shows contract-level retry policy configuration (passing retryPolicy to defineConsumer), but according to the PR description and implementation, retry policy has been moved to worker-level configuration. This documentation should be updated to show the correct worker-level pattern:

const processOrderConsumer = defineConsumer(orderQueue, orderMessage);

// Later, when creating the worker:
const worker = await TypedAmqpWorker.create({
  contract,
  handlers: {
    processOrder: [
      async (message) => { /* handler */ },
      {
        retryPolicy: {
          maxAttempts: 3,
          backoff: { type: "exponential", initialInterval: 1000 }
        }
      }
    ]
  },
  urls: ['amqp://localhost']
});

Copilot uses AI. Check for mistakes.

**Retry Policy Options:**

- `maxAttempts`: Maximum number of attempts (initial + retries, set to `0` for fail-fast behavior)
- `backoff.type`: `"fixed"` (same interval) or `"exponential"` (increasing interval)
- `backoff.initialInterval`: Interval in milliseconds before first retry (default: 1000)
- `backoff.maxInterval`: Maximum interval for exponential backoff (default: 60000)
- `backoff.coefficient`: Multiplier for exponential backoff (default: 2)

**Behavior:**

- Messages are retried up to `maxAttempts` times with configurable backoff intervals
- Attempt count is tracked in message headers (`x-retry-count`)
- After exhausting attempts, messages are sent to the dead letter exchange (if configured)
- If no DLX is configured, messages are rejected without requeue

### Basic Error Handling

Worker handlers use standard Promise-based async/await pattern:

```typescript
Expand All @@ -86,7 +141,7 @@ handlers: {
// Message acknowledged automatically on success
} catch (error) {
// Exception automatically caught by worker
// Message is requeued for retry
// Message is retried according to retry policy
throw error;
}
};
Expand All @@ -102,6 +157,16 @@ Worker defines error classes for internal use:

These errors are logged but **handlers don't need to use them** - just throw standard exceptions.

### Migration from Legacy Behavior

If you have existing consumers without retry policies, they will continue to work with the legacy behavior (infinite retries). However, **this is not recommended for production** as it can lead to infinite retry loops.

To migrate:

1. Add a dead letter exchange to your queue configuration (optional but recommended)
2. Configure a retry policy on your consumer definition
3. Test with your actual failure scenarios to tune the retry parameters

## API

For complete API documentation, see the [Worker API Reference](https://btravers.github.io/amqp-contract/api/worker).
Expand Down
44 changes: 44 additions & 0 deletions packages/worker/src/__tests__/context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import type { ContractDefinition } from "@amqp-contract/contract";
import { TypedAmqpWorker } from "../worker.js";
import type { WorkerInferConsumerHandlers } from "../types.js";
import { it as baseIt } from "@amqp-contract/testing/extension";

export const it = baseIt.extend<{
workerFactory: <TContract extends ContractDefinition>(
contract: TContract,
handlers: WorkerInferConsumerHandlers<TContract>,
) => Promise<TypedAmqpWorker<TContract>>;
}>({
workerFactory: async ({ amqpConnectionUrl }, use) => {
const workers: Array<TypedAmqpWorker<ContractDefinition>> = [];

try {
await use(
async <TContract extends ContractDefinition>(
contract: TContract,
handlers: WorkerInferConsumerHandlers<TContract>,
) => {
const worker = await TypedAmqpWorker.create({
contract,
handlers,
urls: [amqpConnectionUrl],
}).resultToPromise();

workers.push(worker);
return worker;
},
);
} finally {
await Promise.all(
workers.map(async (worker) => {
try {
await worker.close().resultToPromise();
} catch (error) {
// Swallow errors during cleanup
console.error("Failed to close worker during fixture cleanup:", error);
}
}),
);
}
},
});
Loading