Skip to content

Commit cf87550

Browse files
committed
AP-5046 Implementation + interfaces snippets.
1 parent 64756c4 commit cf87550

File tree

2 files changed

+143
-7
lines changed

2 files changed

+143
-7
lines changed

packages/outbox-core/lib/outbox.ts

Lines changed: 140 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,144 @@
1-
export interface OutboxStorage {
2-
saveMessage(message: unknown): Promise<void>
3-
getMessage(): Promise<unknown>
4-
deleteMessage(): Promise<void>
1+
import { AbstractPeriodicJob, type JobExecutionContext } from '@lokalise/background-jobs-common'
2+
import type { PeriodicJobDependencies } from '@lokalise/background-jobs-common/dist/periodic-jobs/periodicJobTypes'
3+
import { generateUuid7 } from '@lokalise/id-utils'
4+
import type {
5+
CommonEventDefinition,
6+
CommonEventDefinitionPublisherSchemaType,
7+
ConsumerMessageMetadataType,
8+
DomainEventEmitter,
9+
} from '@message-queue-toolkit/core'
10+
11+
/**
12+
* Status of the outbox entry.
13+
* - CREATED - entry was created and is waiting to be processed to publish actual event
14+
* - ACKED - entry was picked up by outbox job and is being processed
15+
* - SUCCESS - entry was successfully processed, event was published
16+
* - FAILED - entry processing failed, it will be retried
17+
*/
18+
export type OutboxEntryStatus = 'CREATED' | 'ACKED' | 'SUCCESS' | 'FAILED'
19+
20+
export type OutboxEntry<SupportedEvent extends CommonEventDefinition> = {
21+
id: string
22+
event: SupportedEvent
23+
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>
24+
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>
25+
status: OutboxEntryStatus
26+
created: Date
27+
updated?: Date
28+
retryCount: number
29+
}
30+
31+
/**
32+
* Takes care of persisting and retrieving outbox entries.
33+
*
34+
* Implementation is required:
35+
* - in order to fulfill at least once delivery guarantee, persisting entries should be performed inside isolated transaction
36+
* - to return entries in the order they were created (UUID7 is used to create entries in OutboxEventEmitter)
37+
* - returned entries should not include the ones with 'SUCCESS' status
38+
*/
39+
export interface OutboxStorage<SupportedEvents extends CommonEventDefinition[]> {
40+
create(
41+
outboxEntry: OutboxEntry<SupportedEvents[number]>,
42+
): Promise<OutboxEntry<SupportedEvents[number]>>
43+
44+
update(
45+
outboxEntry: OutboxEntry<SupportedEvents[number]>,
46+
): Promise<OutboxEntry<SupportedEvents[number]>>
47+
48+
/**
49+
* Returns entries in the order they were created. It doesn't return entries with 'SUCCESS' status. It doesn't return entries that have been retried more than maxRetryCount times.
50+
*
51+
* For example if entry retryCount is 1 and maxRetryCount is 1, entry MUST be returned. If it fails again then retry count is 2, in that case entry MUST NOT be returned.
52+
*/
53+
getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]>
54+
}
55+
56+
/**
57+
* Periodic job that processes outbox entries every second. If processing takes longer than 1 second, another subsequent job WILL NOT be started.
58+
*
59+
* Each entry is ACKed, then event is published, and then entry is marked as SUCCESS. If processing fails, entry is marked as FAILED and will be retried.
60+
*
61+
* Max retry count is defined by the user.
62+
*/
63+
export class OutboxPeriodicJob<
64+
SupportedEvents extends CommonEventDefinition[],
65+
> extends AbstractPeriodicJob {
66+
constructor(
67+
private readonly outboxStorage: OutboxStorage<SupportedEvents>,
68+
private readonly eventEmitter: DomainEventEmitter<SupportedEvents>,
69+
private readonly maxRetryCount: number,
70+
dependencies: PeriodicJobDependencies,
71+
) {
72+
super(
73+
{
74+
jobId: 'OutboxJob',
75+
schedule: {
76+
intervalInMs: 1000,
77+
},
78+
singleConsumerMode: {
79+
enabled: true,
80+
},
81+
},
82+
{
83+
redis: dependencies.redis,
84+
logger: dependencies.logger,
85+
transactionObservabilityManager: dependencies.transactionObservabilityManager,
86+
errorReporter: dependencies.errorReporter,
87+
scheduler: dependencies.scheduler,
88+
},
89+
)
90+
}
91+
92+
protected async processInternal(context: JobExecutionContext): Promise<void> {
93+
const entries = await this.outboxStorage.getEntries(this.maxRetryCount)
94+
95+
for (const entry of entries) {
96+
try {
97+
const updatedEntry = await this.outboxStorage.update({
98+
...entry,
99+
updated: new Date(),
100+
status: 'ACKED',
101+
})
102+
103+
await this.eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata)
104+
105+
await this.outboxStorage.update({ ...updatedEntry, updated: new Date(), status: 'SUCCESS' })
106+
} catch (e) {
107+
context.logger.error({ error: e }, 'Failed to process outbox entry.')
108+
109+
await this.outboxStorage.update({
110+
...entry,
111+
updated: new Date(),
112+
status: 'FAILED',
113+
retryCount: entry.retryCount + 1,
114+
})
115+
}
116+
}
117+
}
5118
}
6119

7-
export class OutboxProcessor {
120+
export class OutboxEventEmitter<SupportedEvents extends CommonEventDefinition[]> {
121+
constructor(private storage: OutboxStorage<SupportedEvents>) {}
8122

123+
/**
124+
* Persists outbox entry in persistence layer, later it will be picked up by outbox job.
125+
* @param supportedEvent
126+
* @param data
127+
* @param precedingMessageMetadata
128+
*/
129+
public async emit<SupportedEvent extends SupportedEvents[number]>(
130+
supportedEvent: SupportedEvent,
131+
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>,
132+
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>,
133+
) {
134+
await this.storage.create({
135+
id: generateUuid7(),
136+
event: supportedEvent,
137+
data,
138+
precedingMessageMetadata,
139+
status: 'CREATED',
140+
created: new Date(),
141+
retryCount: 0,
142+
})
143+
}
9144
}

packages/outbox-core/package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,16 @@
2525
"prepublishOnly": "npm run build:release"
2626
},
2727
"dependencies": {
28-
"@lokalise/background-jobs-common": "^7.6.1"
28+
"@lokalise/background-jobs-common": "^7.6.1",
29+
"@lokalise/id-utils": "^2.2.0"
2930
},
3031
"peerDependencies": {
3132
"@message-queue-toolkit/core": ">=14.0.0"
3233
},
3334
"devDependencies": {
34-
"@message-queue-toolkit/core": "*",
3535
"@biomejs/biome": "1.8.3",
3636
"@kibertoad/biome-config": "^1.2.1",
37+
"@message-queue-toolkit/core": "*",
3738
"@types/node": "^22.0.0",
3839
"@vitest/coverage-v8": "^2.0.4",
3940
"del-cli": "^5.1.0",

0 commit comments

Comments
 (0)