Skip to content

Commit 047dc08

Browse files
committed
Tests WIP.
1 parent cf87550 commit 047dc08

File tree

3 files changed

+83
-30
lines changed

3 files changed

+83
-30
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import {
2+
type SnsAwareEventDefinition,
3+
enrichEventSchemaWithBase,
4+
} from '@message-queue-toolkit/schemas'
5+
import { describe } from 'vitest'
6+
import { z } from 'zod'
7+
import type { OutboxStorage } from './outbox'
8+
9+
const TEST_EVENT_SCHEMA = z.object({
10+
name: z.string(),
11+
age: z.number(),
12+
})
13+
14+
const testEvents = {
15+
'test-event.something-happened': {
16+
...enrichEventSchemaWithBase('test-event.something-happened', TEST_EVENT_SCHEMA),
17+
snsTopic: 'TEST_TOPIC',
18+
producedBy: ['unit tes'],
19+
},
20+
} satisfies Record<string, SnsAwareEventDefinition>
21+
22+
console.log(testEvents)
23+
24+
const testEventsArray = [...Object.values(testEvents)] satisfies SnsAwareEventDefinition[]
25+
26+
type TestEvents = typeof testEventsArray
27+
28+
class InMemoryOutbox implements OutboxStorage<TestEvents> {}
29+
30+
describe('outbox', () => {
31+
it('saves outbox entry to process it later', async () => {})
32+
})

packages/outbox-core/lib/outbox.ts

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,41 @@ export interface OutboxStorage<SupportedEvents extends CommonEventDefinition[]>
5353
getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]>
5454
}
5555

56+
export class OutboxProcessor<SupportedEvents extends CommonEventDefinition[]> {
57+
constructor(
58+
private readonly outboxStorage: OutboxStorage<SupportedEvents>,
59+
private readonly eventEmitter: DomainEventEmitter<SupportedEvents>,
60+
private readonly maxRetryCount: number,
61+
) {}
62+
63+
public async processOutboxEntries(context: JobExecutionContext) {
64+
const entries = await this.outboxStorage.getEntries(this.maxRetryCount)
65+
66+
for (const entry of entries) {
67+
try {
68+
const updatedEntry = await this.outboxStorage.update({
69+
...entry,
70+
updated: new Date(),
71+
status: 'ACKED',
72+
})
73+
74+
await this.eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata)
75+
76+
await this.outboxStorage.update({ ...updatedEntry, updated: new Date(), status: 'SUCCESS' })
77+
} catch (e) {
78+
context.logger.error({ error: e }, 'Failed to process outbox entry.')
79+
80+
await this.outboxStorage.update({
81+
...entry,
82+
updated: new Date(),
83+
status: 'FAILED',
84+
retryCount: entry.retryCount + 1,
85+
})
86+
}
87+
}
88+
}
89+
}
90+
5691
/**
5792
* Periodic job that processes outbox entries every second. If processing takes longer than 1 second, another subsequent job WILL NOT be started.
5893
*
@@ -63,10 +98,12 @@ export interface OutboxStorage<SupportedEvents extends CommonEventDefinition[]>
6398
export class OutboxPeriodicJob<
6499
SupportedEvents extends CommonEventDefinition[],
65100
> extends AbstractPeriodicJob {
101+
private readonly outboxProcessor: OutboxProcessor<SupportedEvents>
102+
66103
constructor(
67-
private readonly outboxStorage: OutboxStorage<SupportedEvents>,
68-
private readonly eventEmitter: DomainEventEmitter<SupportedEvents>,
69-
private readonly maxRetryCount: number,
104+
outboxStorage: OutboxStorage<SupportedEvents>,
105+
eventEmitter: DomainEventEmitter<SupportedEvents>,
106+
maxRetryCount: number,
70107
dependencies: PeriodicJobDependencies,
71108
) {
72109
super(
@@ -87,33 +124,16 @@ export class OutboxPeriodicJob<
87124
scheduler: dependencies.scheduler,
88125
},
89126
)
127+
128+
this.outboxProcessor = new OutboxProcessor<SupportedEvents>(
129+
outboxStorage,
130+
eventEmitter,
131+
maxRetryCount,
132+
)
90133
}
91134

92135
protected async processInternal(context: JobExecutionContext): Promise<void> {
93-
const entries = await this.outboxStorage.getEntries(this.maxRetryCount)
94-
95-
for (const entry of entries) {
96-
try {
97-
const updatedEntry = await this.outboxStorage.update({
98-
...entry,
99-
updated: new Date(),
100-
status: 'ACKED',
101-
})
102-
103-
await this.eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata)
104-
105-
await this.outboxStorage.update({ ...updatedEntry, updated: new Date(), status: 'SUCCESS' })
106-
} catch (e) {
107-
context.logger.error({ error: e }, 'Failed to process outbox entry.')
108-
109-
await this.outboxStorage.update({
110-
...entry,
111-
updated: new Date(),
112-
status: 'FAILED',
113-
retryCount: entry.retryCount + 1,
114-
})
115-
}
116-
}
136+
await this.outboxProcessor.processOutboxEntries(context)
117137
}
118138
}
119139

packages/outbox-core/package.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,18 @@
2929
"@lokalise/id-utils": "^2.2.0"
3030
},
3131
"peerDependencies": {
32-
"@message-queue-toolkit/core": ">=14.0.0"
32+
"@message-queue-toolkit/core": ">=14.0.0",
33+
"@message-queue-toolkit/schemas": ">=4.0.0"
3334
},
3435
"devDependencies": {
3536
"@biomejs/biome": "1.8.3",
3637
"@kibertoad/biome-config": "^1.2.1",
37-
"@message-queue-toolkit/core": "*",
3838
"@types/node": "^22.0.0",
3939
"@vitest/coverage-v8": "^2.0.4",
4040
"del-cli": "^5.1.0",
4141
"typescript": "^5.5.3",
42-
"vitest": "^2.0.4"
42+
"vitest": "^2.0.4",
43+
"zod": "^3.23.8"
4344
},
4445
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
4546
"repository": {

0 commit comments

Comments
 (0)