Skip to content
Merged
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 46 additions & 18 deletions java/outbox.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,29 +424,58 @@ It is crucial to make the service `OutboxDeadLetterQueueService` accessible for

:::

### Filter for Dead Entries
### Reading Dead Entries

This filtering can't be done on the database since the maximum number of attempts is only available from the CDS properties.

To ensure that only dead outbox entries are returned when reading `DeadOutboxMessages`, the following code provides the handler for the `DeadLetterQueueService` and the `@After-READ` handler that filters for the dead outbox entries:
Filtering the dead entries is done by adding an appropriate `where`-clause to all `READ`-queries which covers all outbox message entries with maximum number of retries. The following code provides an example handler implementation defining this behaviour for the `DeadLetterQueueService`:

```java
@Component
@ServiceName(OutboxDeadLetterQueueService_.CDS_NAME)
public class DeadOutboxMessagesHandler implements EventHandler {

@After(entity = DeadOutboxMessages_.CDS_NAME)
public void filterDeadEntries(CdsReadEventContext context) {
CdsProperties.Outbox outboxConfigs = context.getCdsRuntime().getEnvironment().getCdsProperties().getOutbox();
List<DeadOutboxMessages> deadEntries = context
.getResult()
.listOf(DeadOutboxMessages.class)
.stream()
.filter(entry -> entry.getAttempts() >= outboxConfigs.getService(entry.getTarget()).getMaxAttempts())
.toList();

context.setResult(deadEntries);
}
private final PersistenceService db;

public DeadOutboxMessagesHandler(@Qualifier(PersistenceService.DEFAULT_NAME) PersistenceService db) {
this.db = db;
}

@Before(entity = DeadOutboxMessages_.CDS_NAME)
public void addDeadEntryFilter(CdsReadEventContext context) {
Optional<Predicate> outboxFilters = this.createOutboxFilters(context.getCdsRuntime());

if (outboxFilters.isPresent()) {
CqnSelect modifiedCqn =
copy(
context.getCqn(),
new Modifier() {
@Override
public CqnPredicate where(Predicate where) {
return outboxFilters.get().and(where);
}
});
context.setCqn(modifiedCqn);
}
}

private Optional<Predicate> createOutboxFilters(CdsRuntime runtime) {
List<OutboxService> outboxServices = runtime.getServiceCatalog().getServices(OutboxService.class)
.filter(s -> !s.getName().equals(OutboxService.INMEMORY_NAME)).toList();
CdsProperties.Outbox outboxConfigs = runtime.getEnvironment().getCdsProperties().getOutbox();

Predicate where = null;
for(OutboxService service : outboxServices) {
OutboxServiceConfig config = outboxConfigs.getService(service.getName());
Predicate targetPredicate = CQL.get(Messages.TARGET).eq(service.getName()).and(CQL.get(Messages.ATTEMPTS).ge(config.getMaxAttempts()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to filter by individual outbox and then OR the condition meeting all outboxes?

Copy link
Copy Markdown
Contributor Author

@t-bonk t-bonk Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BraunMatthias Every outbox can have its own configuration for maxAttempts. To read only the dead entries per outbox, this filter is required.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same reason - I just overlooked this aspect


if (where == null) {
where = targetPredicate;
} else {
where = where.or(targetPredicate);
}
}

return Optional.ofNullable(where);
}
}
```

Expand Down Expand Up @@ -488,8 +517,7 @@ The injected `PersistenceService` instance is used to perform the operations on
[Learn more about CQL statement inspection.](./working-with-cql/query-introspection#cqnanalyzer){.learn-more}

::: tip Use paging logic
Avoid to read all entries of the `cds.outbox.Messages` or `OutboxDeadLetterQueueService.DeadOutboxMessages` table at once, as the size of an entry is unpredictable
and depends on the size of the payload. Prefer paging logic instead.
Avoid to read all outbox entries at once as a single entry can have significant size reflecting the request's payload. Prefer `READ`-queries with paging instead.
:::

## Observability using Open Telemetry
Expand Down