Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 102 additions & 1 deletion docs/ts/primitives/pubsub.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ export const signups = new Topic<SignupEvent>("signups", {
});
```

### Topic Configuration

The `TopicConfig` object accepts the following parameters:

- **`deliveryGuarantee`** (required): `"at-least-once"` | `"exactly-once"`
- Controls the delivery guarantee for messages published to this topic
- See [Delivery guarantees](#customizing-message-delivery) for more details

- **`orderingAttribute`** (optional): A string matching the name of an `Attribute` field in your event type
- Configures message ordering by attribute value
- Messages with the same attribute value are delivered in publish order
- See [Ordered Topics](#ordered-topics) for more details

## Publishing events

To publish an **Event**, call `publish` on the topic passing in the event object (which is the type specified in the `new Topic<Type>` constructor).
Expand Down Expand Up @@ -89,12 +102,100 @@ subscription to a single topic receives the events independently of any other su
that if one subscription is running very slowly, it will grow a backlog of unprocessed events.
However, any other subscriptions will still be processing events in real-time as they are published.

### Subscription Configuration

The subscription configuration object supports several optional parameters to customize message processing behavior:

- **`handler`** (required): `(msg: Msg) => Promise<unknown>`
- The function that processes each message
- If the handler throws an error or returns a rejected promise, the message will be retried based on the retry policy

- **`maxConcurrency`** (optional): `number`
- Maximum number of messages processed simultaneously per service instance
- If set to a negative value, there's no limit on concurrent processing
- Note: This is per instance, so with 10 instances and `maxConcurrency: 10`, 100 messages could be processed simultaneously
- This setting has no effect on Encore Cloud environments
- Not supported by all cloud providers (GCP push subscriptions use adaptive concurrency)
- Defaults to a reasonable provider-specific value if not set

- **`ackDeadline`** (optional): `DurationString` (e.g., `"60s"`, `"2m"`, `"1h"`)
- Time a consumer has to process a message before it's returned to the subscription for redelivery
- Must be at least 1 second
- Default: 30 seconds

- **`messageRetention`** (optional): `DurationString` (e.g., `"3d"`, `"14d"`, `"24h"`)
- How long an undelivered message is kept on the topic before it's purged
- Default: 7 days

- **`retryPolicy`** (optional): `RetryPolicy`
- Configuration for how messages are retried after errors
- See [Retry Policy](#retry-policy) for details

#### Retry Policy

The `RetryPolicy` object allows you to configure retry behavior for failed message processing:

```ts
const _ = new Subscription(signups, "send-welcome-email", {
handler: async (event) => {
// Process the event
},
retryPolicy: {
minBackoff: "10s", // Minimum wait time between retries
maxBackoff: "10m", // Maximum wait time between retries
maxRetries: 5, // Maximum number of retries before DLQ
},
});
```

- **`minBackoff`** (optional): `DurationString`
- Minimum time to wait between retry attempts
- Default: 10 seconds

- **`maxBackoff`** (optional): `DurationString`
- Maximum time to wait between retry attempts
- Default: 10 minutes

- **`maxRetries`** (optional): `number`
- Controls dead-letter queue behavior:
- `0` or `undefined`: Uses default of 100 retries
- `n > 0`: Forwards message to dead-letter queue after `n` retries
- After max retries, events are placed in a dead-letter queue (DLQ) for manual inspection

<Callout type="info">

Retry policy values may be clamped to supported ranges by the target cloud provider.

</Callout>

#### Complete Subscription Example

Here's an example showing all available subscription configuration options:

```ts
import { Subscription } from "encore.dev/pubsub";

const _ = new Subscription(signups, "send-welcome-email", {
handler: async (event) => {
// Send a welcome email using the event
},
maxConcurrency: 10,
ackDeadline: "60s",
messageRetention: "7d",
retryPolicy: {
minBackoff: "10s",
maxBackoff: "10m",
maxRetries: 5,
},
});
```

### Error Handling

If a subscription function returns an error, the event being processed will be retried, based on the retry policy
configured on that subscription.

After the max number of retries is reached,the event will be placed into a dead-letter queue (DLQ) for that subscriber.
After the max number of retries is reached, the event will be placed into a dead-letter queue (DLQ) for that subscriber.
This allows the subscription to continue processing events until the bug which caused the event to fail can be fixed.
Once fixed, the messages on the dead-letter queue can be manually released to be processed again by the subscriber.

Expand Down