Skip to content

Commit 3333be5

Browse files
committed
AP-5046 PromisePool
1 parent eb6aa5e commit 3333be5

File tree

4 files changed

+120
-23
lines changed

4 files changed

+120
-23
lines changed

packages/outbox-core/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# outbox-core
2+
3+
WIP

packages/outbox-core/lib/outbox.spec.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,14 @@ import {
1212
import pino, { type Logger } from 'pino'
1313
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
1414
import { z } from 'zod'
15-
import { type OutboxEntry, OutboxEventEmitter, OutboxProcessor, type OutboxStorage } from './outbox'
15+
import {
16+
InMemoryOutboxAccumulator,
17+
type OutboxAccumulator,
18+
type OutboxEntry,
19+
OutboxEventEmitter,
20+
OutboxProcessor,
21+
type OutboxStorage,
22+
} from './outbox'
1623

1724
const TestEvents = {
1825
created: {
@@ -84,6 +91,43 @@ class InMemoryOutboxStorage<SupportedEvents extends CommonEventDefinition[]>
8491

8592
return Promise.resolve(outboxEntry)
8693
}
94+
95+
public async flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void> {
96+
let successEntries = await outboxAccumulator.getEntries()
97+
successEntries = successEntries.map((entry) => {
98+
return {
99+
...entry,
100+
status: 'SUCCESS',
101+
updateAt: new Date(),
102+
}
103+
})
104+
this.entries = this.entries.map((entry) => {
105+
const foundEntry = successEntries.find((successEntry) => successEntry.id === entry.id)
106+
if (foundEntry) {
107+
return foundEntry
108+
}
109+
return entry
110+
})
111+
112+
let failedEntries = await outboxAccumulator.getFailedEntries()
113+
failedEntries = failedEntries.map((entry) => {
114+
return {
115+
...entry,
116+
status: 'FAILED',
117+
updateAt: new Date(),
118+
retryCount: entry.retryCount + 1,
119+
}
120+
})
121+
this.entries = this.entries.map((entry) => {
122+
const foundEntry = failedEntries.find((failedEntry) => failedEntry.id === entry.id)
123+
if (foundEntry) {
124+
return foundEntry
125+
}
126+
return entry
127+
})
128+
129+
outboxAccumulator.clear()
130+
}
87131
}
88132

89133
const MAX_RETRY_COUNT = 2
@@ -108,8 +152,11 @@ describe('outbox', () => {
108152
outboxEventEmitter = new OutboxEventEmitter<TestEventsType>(outboxStorage)
109153
outboxProcessor = new OutboxProcessor<TestEventsType>(
110154
outboxStorage,
155+
//@ts-ignore
156+
new InMemoryOutboxAccumulator(),
111157
eventEmitter,
112158
MAX_RETRY_COUNT,
159+
1,
113160
)
114161
})
115162

packages/outbox-core/lib/outbox.ts

Lines changed: 68 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type {
66
ConsumerMessageMetadataType,
77
DomainEventEmitter,
88
} from '@message-queue-toolkit/core'
9+
import { PromisePool } from '@supercharge/promise-pool'
910
import { uuidv7 } from 'uuidv7'
1011

1112
/**
@@ -28,6 +29,51 @@ export type OutboxEntry<SupportedEvent extends CommonEventDefinition> = {
2829
retryCount: number
2930
}
3031

32+
export interface OutboxAccumulator<SupportedEvents extends CommonEventDefinition[]> {
33+
add(outboxEntry: OutboxEntry<SupportedEvents[number]>): Promise<void>
34+
35+
addFailure(outboxEntry: OutboxEntry<SupportedEvents[number]>): Promise<void>
36+
37+
getEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]>
38+
39+
getFailedEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]>
40+
41+
clear(): Promise<void>
42+
}
43+
44+
export class InMemoryOutboxAccumulator<SupportedEvents extends CommonEventDefinition[]>
45+
implements OutboxAccumulator<SupportedEvents>
46+
{
47+
private entries: OutboxEntry<SupportedEvents[number]>[] = []
48+
private failedEntries: OutboxEntry<SupportedEvents[number]>[] = []
49+
50+
public add(outboxEntry: OutboxEntry<SupportedEvents[number]>) {
51+
this.entries = [...this.entries, outboxEntry]
52+
53+
return Promise.resolve()
54+
}
55+
56+
public addFailure(outboxEntry: OutboxEntry<SupportedEvents[number]>) {
57+
this.failedEntries = [...this.failedEntries, outboxEntry]
58+
59+
return Promise.resolve()
60+
}
61+
62+
getEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]> {
63+
return Promise.resolve(this.entries)
64+
}
65+
66+
getFailedEntries(): Promise<OutboxEntry<SupportedEvents[number]>[]> {
67+
return Promise.resolve(this.failedEntries)
68+
}
69+
70+
public clear(): Promise<void> {
71+
this.entries = []
72+
this.failedEntries = []
73+
return Promise.resolve()
74+
}
75+
}
76+
3177
/**
3278
* Takes care of persisting and retrieving outbox entries.
3379
*
@@ -41,6 +87,8 @@ export interface OutboxStorage<SupportedEvents extends CommonEventDefinition[]>
4187
outboxEntry: OutboxEntry<SupportedEvents[number]>,
4288
): Promise<OutboxEntry<SupportedEvents[number]>>
4389

90+
flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void>
91+
4492
update(
4593
outboxEntry: OutboxEntry<SupportedEvents[number]>,
4694
): Promise<OutboxEntry<SupportedEvents[number]>>
@@ -61,35 +109,29 @@ export interface OutboxStorage<SupportedEvents extends CommonEventDefinition[]>
61109
export class OutboxProcessor<SupportedEvents extends CommonEventDefinition[]> {
62110
constructor(
63111
private readonly outboxStorage: OutboxStorage<SupportedEvents>,
112+
private readonly outboxAccumulator: OutboxAccumulator<SupportedEvents>,
64113
private readonly eventEmitter: DomainEventEmitter<SupportedEvents>,
65114
private readonly maxRetryCount: number,
115+
private readonly emitBatchSize: number,
66116
) {}
67117

68118
public async processOutboxEntries(context: JobExecutionContext) {
69119
const entries = await this.outboxStorage.getEntries(this.maxRetryCount)
70120

71-
for (const entry of entries) {
72-
try {
73-
const updatedEntry = await this.outboxStorage.update({
74-
...entry,
75-
updated: new Date(),
76-
status: 'ACKED',
77-
})
78-
79-
await this.eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata)
80-
81-
await this.outboxStorage.update({ ...updatedEntry, updated: new Date(), status: 'SUCCESS' })
82-
} catch (e) {
83-
context.logger.error({ error: e }, 'Failed to process outbox entry.')
84-
85-
await this.outboxStorage.update({
86-
...entry,
87-
updated: new Date(),
88-
status: 'FAILED',
89-
retryCount: entry.retryCount + 1,
90-
})
91-
}
92-
}
121+
await PromisePool.for(entries)
122+
.withConcurrency(this.emitBatchSize)
123+
.process(async (entry) => {
124+
try {
125+
await this.eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata)
126+
await this.outboxAccumulator.add(entry)
127+
} catch (e) {
128+
context.logger.error({ error: e }, 'Failed to process outbox entry.')
129+
130+
await this.outboxAccumulator.addFailure(entry)
131+
}
132+
})
133+
134+
await this.outboxStorage.flush(this.outboxAccumulator)
93135
}
94136
}
95137

@@ -107,9 +149,11 @@ export class OutboxPeriodicJob<
107149

108150
constructor(
109151
outboxStorage: OutboxStorage<SupportedEvents>,
152+
outboxAccumulator: OutboxAccumulator<SupportedEvents>,
110153
eventEmitter: DomainEventEmitter<SupportedEvents>,
111154
dependencies: PeriodicJobDependencies,
112155
maxRetryCount: number,
156+
emitBatchSize: number,
113157
intervalInMs: number,
114158
) {
115159
super(
@@ -133,8 +177,10 @@ export class OutboxPeriodicJob<
133177

134178
this.outboxProcessor = new OutboxProcessor<SupportedEvents>(
135179
outboxStorage,
180+
outboxAccumulator,
136181
eventEmitter,
137182
maxRetryCount,
183+
emitBatchSize,
138184
)
139185
}
140186

packages/outbox-core/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
},
2727
"dependencies": {
2828
"@lokalise/background-jobs-common": "^7.6.1",
29+
"@supercharge/promise-pool": "^3.2.0",
2930
"uuidv7": "^1.0.2"
3031
},
3132
"peerDependencies": {

0 commit comments

Comments
 (0)