-
Notifications
You must be signed in to change notification settings - Fork 149
Provided a better solution for reading dead outbox entries #1940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
ed8c785
af2aa4a
a7b6202
e01d281
ccd12db
5ac85af
37e31e0
0d5b5ed
589ee70
44fa225
ce44db5
b6c74ba
f6b9211
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -355,29 +355,65 @@ It is crucial to make the service `OutboxDeadLetterQueueService` accessible for | |
|
|
||
| ::: | ||
|
|
||
| ### Filter for Dead Entries | ||
| ### Reading Dead Entries | ||
|
|
||
| This filtering can't be done on the database since the maximum number of attempts is only available from the CDS properties. | ||
|
|
||
| To ensure that only dead outbox entries are returned when reading `DeadOutboxMessages`, the following code provides the handler for the `DeadLetterQueueService` and the `@After-READ` handler that filters for the dead outbox entries: | ||
| This filtering of dead entries is done on the database by adding `where` clauses for each outbox and their maximum number of retries. The following code provides the handler for the `DeadLetterQueueService` that modifies the where clause by adding | ||
| additional conditions for filtering the outbox entries: | ||
t-bonk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| ```java | ||
| @Component | ||
| @ServiceName(OutboxDeadLetterQueueService_.CDS_NAME) | ||
| public class DeadOutboxMessagesHandler implements EventHandler { | ||
|
|
||
| @After(entity = DeadOutboxMessages_.CDS_NAME) | ||
| public void filterDeadEntries(CdsReadEventContext context) { | ||
| CdsProperties.Outbox outboxConfigs = context.getCdsRuntime().getEnvironment().getCdsProperties().getOutbox(); | ||
| List<DeadOutboxMessages> deadEntries = context | ||
| .getResult() | ||
| .listOf(DeadOutboxMessages.class) | ||
| .stream() | ||
| .filter(entry -> entry.getAttempts() >= outboxConfigs.getService(entry.getTarget()).getMaxAttempts()) | ||
| .toList(); | ||
|
|
||
| context.setResult(deadEntries); | ||
| } | ||
| private final PersistenceService db; | ||
t-bonk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| public DeadOutboxMessagesHandler(@Qualifier(PersistenceService.DEFAULT_NAME) PersistenceService db) { | ||
| this.db = db; | ||
| } | ||
|
|
||
| @Before(entity = DeadOutboxMessages_.CDS_NAME) | ||
| public void modifyWhereClause(CdsReadEventContext context) { | ||
t-bonk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| CqnSelect cqn = context.getCqn(); | ||
t-bonk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Optional<Predicate> outboxFilters = this.createOutboxFilters(context.getCdsRuntime()); | ||
| CqnSelect modifiedCqn = copy( | ||
| cqn, | ||
t-bonk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| new Modifier() { | ||
| @Override | ||
| public CqnPredicate where(Predicate where) { | ||
t-bonk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (where != null && outboxFilters.isPresent()) { | ||
| return where.and(outboxFilters.get()); | ||
| } else if (where == null && outboxFilters.isPresent()) { | ||
| return outboxFilters.get(); | ||
| } else if (where != null && !outboxFilters.isPresent()) { | ||
| return where; | ||
| } else { | ||
| return null; | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| context.setCqn(modifiedCqn); | ||
| } | ||
|
|
||
| private Optional<Predicate> createOutboxFilters(CdsRuntime runtime) { | ||
| List<OutboxService> outboxServices = runtime.getServiceCatalog().getServices(OutboxService.class) | ||
t-bonk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| .filter(s -> !s.getName().equals(OutboxService.INMEMORY_NAME)).toList(); | ||
| CdsProperties.Outbox outboxConfigs = runtime.getEnvironment().getCdsProperties().getOutbox(); | ||
|
|
||
| Predicate where = null; | ||
| for(OutboxService service : outboxServices) { | ||
| OutboxServiceConfig config = outboxConfigs.getService(service.getName()); | ||
BraunMatthias marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Predicate targetPredicate = CQL.get(Messages.TARGET).eq(service.getName()).and(CQL.get(Messages.ATTEMPTS).ge(config.getMaxAttempts())); | ||
|
||
|
|
||
| if (where == null) { | ||
| where = targetPredicate; | ||
| } else { | ||
| where = where.or(targetPredicate); | ||
| } | ||
| } | ||
|
|
||
| return Optional.ofNullable(where); | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
|
|
@@ -419,8 +455,7 @@ The injected `PersistenceService` instance is used to perform the operations on | |
| [Learn more about CQL statement inspection.](./working-with-cql/query-introspection#cqnanalyzer){.learn-more} | ||
|
|
||
| ::: tip Use paging logic | ||
| Avoid to read all entries of the `cds.outbox.Messages` or `OutboxDeadLetterQueueService.DeadOutboxMessages` table at once, as the size of an entry is unpredictable | ||
| and depends on the size of the payload. Prefer paging logic instead. | ||
| Avoid to read all entries of the `cds.outbox.Messages` or `OutboxDeadLetterQueueService.DeadOutboxMessages` table at once, as the size of an entry is unpredictable and depends on the size of the payload. Prefer paging logic instead. | ||
t-bonk marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| ::: | ||
|
|
||
| ## Observability using Open Telemetry | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.