Skip to content

Commit db1e1e3

Browse files
committed
Prefilter entries if any exist in accumulator.
1 parent 94bbe85 commit db1e1e3

File tree

2 files changed

+37
-2
lines changed

2 files changed

+37
-2
lines changed

packages/outbox-core/lib/outbox.spec.ts

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ describe('outbox', () => {
131131
let eventEmitter: DomainEventEmitter<TestEventsType>
132132
let outboxEventEmitter: OutboxEventEmitter<TestEventsType>
133133
let outboxStorage: InMemoryOutboxStorage<TestEventsType>
134+
let inMemoryOutboxAccumulator: InMemoryOutboxAccumulator<TestEventsType>
134135

135136
beforeEach(() => {
136137
eventEmitter = new DomainEventEmitter({
@@ -144,11 +145,12 @@ describe('outbox', () => {
144145

145146
outboxStorage = new InMemoryOutboxStorage<TestEventsType>()
146147
outboxEventEmitter = new OutboxEventEmitter<TestEventsType>(outboxStorage)
148+
inMemoryOutboxAccumulator = new InMemoryOutboxAccumulator()
147149
outboxProcessor = new OutboxProcessor<TestEventsType>(
148150
{
149151
outboxStorage,
150152
//@ts-ignore
151-
outboxAccumulator: new InMemoryOutboxAccumulator(),
153+
outboxAccumulator: inMemoryOutboxAccumulator,
152154
eventEmitter,
153155
} satisfies OutboxDependencies<TestEventsType>,
154156
{ maxRetryCount: MAX_RETRY_COUNT, emitBatchSize: 1 },
@@ -294,4 +296,32 @@ describe('outbox', () => {
294296
},
295297
])
296298
})
299+
300+
it("doesn't emit event again if it's already present in accumulator", async () => {
301+
const mockedEventEmitter = vi.spyOn(eventEmitter, 'emit')
302+
303+
await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, {
304+
correlationId: randomUUID(),
305+
})
306+
307+
await inMemoryOutboxAccumulator.add(outboxStorage.entries[0])
308+
309+
await outboxProcessor.processOutboxEntries({
310+
logger: TestLogger,
311+
reqId: randomUUID(),
312+
executorId: randomUUID(),
313+
})
314+
315+
//We pretended that event was emitted in previous run by adding state to accumulator
316+
expect(mockedEventEmitter).toHaveBeenCalledTimes(0)
317+
318+
//But after the loop, if successful, it should be marked as success anyway
319+
expect(outboxStorage.entries).toMatchObject([
320+
{
321+
status: 'SUCCESS',
322+
},
323+
])
324+
//And accumulator should be cleared
325+
expect(await inMemoryOutboxAccumulator.getEntries()).toHaveLength(0)
326+
})
297327
})

packages/outbox-core/lib/outbox.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,12 @@ export class OutboxProcessor<SupportedEvents extends CommonEventDefinition[]> {
4242

4343
const entries = await outboxStorage.getEntries(this.outboxProcessorConfiguration.maxRetryCount)
4444

45-
await PromisePool.for(entries)
45+
const currentEntriesInAccumulator = new Set(
46+
(await outboxAccumulator.getEntries()).map((entry) => entry.id),
47+
)
48+
const filteredEntries = entries.filter((entry) => !currentEntriesInAccumulator.has(entry.id))
49+
50+
await PromisePool.for(filteredEntries)
4651
.withConcurrency(this.outboxProcessorConfiguration.emitBatchSize)
4752
.process(async (entry) => {
4853
try {

0 commit comments

Comments
 (0)