Skip to content

Commit ed4ce02

Browse files
committed
Simplified outbox entry.
1 parent ab58167 commit ed4ce02

File tree

4 files changed

+15
-13
lines changed

4 files changed

+15
-13
lines changed

packages/outbox-core/lib/objects.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ export type OutboxEntryStatus = 'CREATED' | 'ACKED' | 'SUCCESS' | 'FAILED'
1515

1616
export type OutboxEntry<SupportedEvent extends CommonEventDefinition> = {
1717
id: string
18-
event: SupportedEvent
19-
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>
18+
event: CommonEventDefinitionPublisherSchemaType<SupportedEvent>
2019
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>
2120
status: OutboxEntryStatus
2221
created: Date

packages/outbox-core/lib/outbox.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type {
66
ConsumerMessageMetadataType,
77
DomainEventEmitter,
88
} from '@message-queue-toolkit/core'
9+
import { enrichMessageSchemaWithBase } from '@message-queue-toolkit/schemas'
910
import { PromisePool } from '@supercharge/promise-pool'
1011
import { uuidv7 } from 'uuidv7'
1112
import type { OutboxAccumulator } from './accumulators'
@@ -50,7 +51,11 @@ export class OutboxProcessor<SupportedEvents extends CommonEventDefinition[]> {
5051
.withConcurrency(this.outboxProcessorConfiguration.emitBatchSize)
5152
.process(async (entry) => {
5253
try {
53-
await eventEmitter.emit(entry.event, entry.data, entry.precedingMessageMetadata)
54+
const event = entry.event
55+
56+
const schema = { ...enrichMessageSchemaWithBase(event.type, event.payload) }
57+
58+
await eventEmitter.emit(schema, entry.event, entry.precedingMessageMetadata)
5459
await outboxAccumulator.add(entry)
5560
} catch (e) {
5661
context.logger.error({ error: e }, 'Failed to process outbox entry.')
@@ -133,14 +138,12 @@ export class OutboxEventEmitter<SupportedEvents extends CommonEventDefinition[]>
133138
* @param precedingMessageMetadata
134139
*/
135140
public async emit<SupportedEvent extends SupportedEvents[number]>(
136-
supportedEvent: SupportedEvent,
137-
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>,
141+
data: CommonEventDefinitionPublisherSchemaType<SupportedEvent>,
138142
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>,
139143
) {
140144
await this.storage.createEntry({
141145
id: uuidv7(),
142-
event: supportedEvent,
143-
data,
146+
event: data,
144147
precedingMessageMetadata,
145148
status: 'CREATED',
146149
created: new Date(),

packages/outbox-core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/outbox-core",
3-
"version": "0.1.0",
3+
"version": "0.2.0",
44
"private": false,
55
"license": "MIT",
66
"description": "Outbox pattern implementation for message queue toolkit",

packages/outbox-core/test/outbox.spec.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ describe('outbox', () => {
9191
})
9292

9393
it('saves outbox entry to storage', async () => {
94-
await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, {
94+
await outboxEventEmitter.emit(createdEventPayload, {
9595
correlationId: randomUUID(),
9696
})
9797

@@ -101,7 +101,7 @@ describe('outbox', () => {
101101
})
102102

103103
it('saves outbox entry and process it', async () => {
104-
await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, {
104+
await outboxEventEmitter.emit(createdEventPayload, {
105105
correlationId: randomUUID(),
106106
})
107107

@@ -140,7 +140,7 @@ describe('outbox', () => {
140140
}),
141141
)
142142

143-
await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, {
143+
await outboxEventEmitter.emit(createdEventPayload, {
144144
correlationId: randomUUID(),
145145
})
146146

@@ -184,7 +184,7 @@ describe('outbox', () => {
184184
})
185185

186186
//Persist the event
187-
await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, {
187+
await outboxEventEmitter.emit(createdEventPayload, {
188188
correlationId: randomUUID(),
189189
})
190190

@@ -229,7 +229,7 @@ describe('outbox', () => {
229229
it("doesn't emit event again if it's already present in accumulator", async () => {
230230
const mockedEventEmitter = vi.spyOn(eventEmitter, 'emit')
231231

232-
await outboxEventEmitter.emit(TestEvents.created, createdEventPayload, {
232+
await outboxEventEmitter.emit(createdEventPayload, {
233233
correlationId: randomUUID(),
234234
})
235235

0 commit comments

Comments
 (0)