Skip to content

Commit 807227a

Browse files
Autofill metadata on domain event emitter (#194)
Co-authored-by: CarlosGamero <[email protected]>
1 parent 267f9ba commit 807227a

File tree

9 files changed

+156
-116
lines changed

9 files changed

+156
-116
lines changed

packages/core/lib/events/DomainEventEmitter.spec.ts

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,41 @@ describe('AutopilotEventEmitter', () => {
8282
expect(fakeListener.receivedEvents[0]).toMatchObject(expectedCreatedPayload)
8383
})
8484

85+
it('emits event to anyListener and populates metadata', async () => {
86+
const { eventEmitter } = diContainer.cradle
87+
const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents)
88+
eventEmitter.onAny(fakeListener)
89+
90+
const emittedEvent = await eventEmitter.emit(TestEvents.created, {
91+
payload: {
92+
message: 'msg',
93+
},
94+
})
95+
96+
const processedEvent = await eventEmitter.handlerSpy.waitForMessageWithId<
97+
ConsumerMessageSchema<typeof TestEvents.created>
98+
>(emittedEvent.id)
99+
100+
expect(processedEvent.message.type).toBe(TestEvents.created.consumerSchema.shape.type.value)
101+
102+
await waitAndRetry(() => {
103+
return fakeListener.receivedEvents.length > 0
104+
})
105+
106+
expect(fakeListener.receivedEvents).toHaveLength(1)
107+
expect(fakeListener.receivedEvents[0]).toMatchObject({
108+
id: expect.any(String),
109+
payload: {
110+
message: 'msg',
111+
},
112+
timestamp: expect.any(String),
113+
type: 'entity.created',
114+
})
115+
expect(fakeListener.receivedEvents[0].metadata).toMatchObject({
116+
correlationId: expect.any(String),
117+
})
118+
})
119+
85120
it('can check spy for messages not being sent', async () => {
86121
const { eventEmitter } = diContainer.cradle
87122
const fakeListener = new FakeListener(diContainer.cradle.eventRegistry.supportedEvents)
@@ -127,7 +162,10 @@ describe('AutopilotEventEmitter', () => {
127162
expect(emitResult.message).toEqual({
128163
id: expect.any(String),
129164
metadata: {
130-
correlationId: 'dummy',
165+
correlationId: createdEventPayload.metadata!.correlationId!,
166+
originatedFrom: 'service',
167+
producedBy: undefined,
168+
schemaVersion: '1',
131169
},
132170
payload: {
133171
message: 'msg',
@@ -150,10 +188,6 @@ describe('AutopilotEventEmitter', () => {
150188
timestamp: expect.any(String),
151189
type: 'entity.created',
152190
})
153-
expect(fakeListener.receivedMetadata).toHaveLength(1)
154-
expect(fakeListener.receivedMetadata[0]).toMatchObject({
155-
correlationId: 'dummy',
156-
})
157191
})
158192

159193
it('emits event to singleListener', async () => {

packages/core/lib/events/DomainEventEmitter.ts

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type { MetadataFiller } from '../messages/MetadataFiller'
44
import type { HandlerSpy, HandlerSpyParams, PublicHandlerSpy } from '../queues/HandlerSpy'
55
import { resolveHandlerSpy } from '../queues/HandlerSpy'
66

7-
import type { PublisherMessageMetadataType } from '@message-queue-toolkit/schemas'
7+
import type { ConsumerMessageMetadataType } from '@message-queue-toolkit/schemas'
88
import type { EventRegistry } from './EventRegistry'
99
import type {
1010
AnyEventHandler,
@@ -62,7 +62,7 @@ export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]>
6262
public async emit<SupportedEvent extends SupportedEvents[number]>(
6363
supportedEvent: SupportedEvent,
6464
data: Omit<CommonEventDefinitionPublisherSchemaType<SupportedEvent>, 'type'>,
65-
metadata?: PublisherMessageMetadataType,
65+
precedingMessageMetadata?: Partial<ConsumerMessageMetadataType>,
6666
): Promise<Omit<CommonEventDefinitionConsumerSchemaType<SupportedEvent>, 'type'>> {
6767
if (!data.timestamp) {
6868
data.timestamp = this.metadataFiller.produceTimestamp()
@@ -71,6 +71,22 @@ export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]>
7171
data.id = this.metadataFiller.produceId()
7272
}
7373

74+
if (!data.metadata) {
75+
data.metadata = precedingMessageMetadata
76+
? // @ts-ignore
77+
this.metadataFiller.produceMetadata(data, supportedEvent, precedingMessageMetadata)
78+
: {
79+
correlationId: this.metadataFiller.produceId(),
80+
schemaVersion: supportedEvent.schemaVersion,
81+
producedBy: this.metadataFiller.produceCurrentServiceId(),
82+
originatedFrom: this.metadataFiller.produceCurrentServiceId(),
83+
}
84+
}
85+
86+
if (!data.metadata.correlationId) {
87+
data.metadata.correlationId = this.metadataFiller.produceId()
88+
}
89+
7490
const eventTypeName = supportedEvent.publisherSchema.shape.type.value
7591

7692
if (!this.eventRegistry.isSupportedEvent(eventTypeName)) {
@@ -97,12 +113,12 @@ export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]>
97113

98114
if (eventHandlers) {
99115
for (const handler of eventHandlers) {
100-
await handler.handleEvent(validatedEvent, metadata)
116+
await handler.handleEvent(validatedEvent)
101117
}
102118
}
103119

104120
for (const handler of this.anyHandlers) {
105-
await handler.handleEvent(validatedEvent, metadata)
121+
await handler.handleEvent(validatedEvent)
106122
}
107123

108124
if (this._handlerSpy) {
@@ -111,7 +127,6 @@ export class DomainEventEmitter<SupportedEvents extends CommonEventDefinition[]>
111127
// @ts-ignore
112128
message: {
113129
...validatedEvent,
114-
...(metadata !== undefined ? { metadata } : {}),
115130
},
116131
processingResult: 'consumed',
117132
},
Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,15 @@
1-
import type { PublisherMessageMetadataType } from '@message-queue-toolkit/schemas'
21
import type { AnyEventHandler, CommonEventDefinition } from '../eventTypes'
32

43
export class FakeListener<SupportedEvents extends CommonEventDefinition[]>
54
implements AnyEventHandler<SupportedEvents>
65
{
76
public receivedEvents: SupportedEvents[number]['publisherSchema']['_output'][] = []
8-
public receivedMetadata: PublisherMessageMetadataType[] = []
97

108
constructor(_supportedEvents: SupportedEvents) {
119
this.receivedEvents = []
1210
}
1311

14-
handleEvent(
15-
event: SupportedEvents[number]['publisherSchema']['_output'],
16-
metadata: PublisherMessageMetadataType,
17-
): void | Promise<void> {
12+
handleEvent(event: SupportedEvents[number]['publisherSchema']['_output']): void | Promise<void> {
1813
this.receivedEvents.push(event)
19-
this.receivedMetadata.push(metadata)
2014
}
2115
}

packages/core/lib/messages/MetadataFiller.spec.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@ describe('MetadataFiller', () => {
5353
const metadata = filler.produceMetadata(TEST_MESSAGE, EVENT_DEFINITION, providedMetadata)
5454

5555
// Then
56-
expect(metadata).toEqual({ ...providedMetadata, producedBy: SERVICE_ID })
56+
expect(metadata).toEqual({
57+
...providedMetadata,
58+
producedBy: SERVICE_ID,
59+
schemaVersion: '0.0.0',
60+
})
5761
})
5862
})
5963
})

packages/core/lib/messages/MetadataFiller.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export type MetadataFiller<
2323
produceMetadata(currentMessage: T, eventDefinition: D, precedingMessageMetadata?: M): M
2424
produceId(): string
2525
produceTimestamp(): string
26+
produceCurrentServiceId(): string
2627
}
2728

2829
export class CommonMetadataFiller implements MetadataFiller {
@@ -46,6 +47,10 @@ export class CommonMetadataFiller implements MetadataFiller {
4647
})
4748
}
4849

50+
produceCurrentServiceId(): string {
51+
return this.serviceId
52+
}
53+
4954
produceMetadata(
5055
_currentMessage: PublisherBaseEventType,
5156
eventDefinition: Pick<CommonEventDefinition, 'schemaVersion'>,
@@ -54,10 +59,7 @@ export class CommonMetadataFiller implements MetadataFiller {
5459
return {
5560
producedBy: this.serviceId,
5661
originatedFrom: precedingMessageMetadata?.originatedFrom ?? this.serviceId,
57-
schemaVersion:
58-
precedingMessageMetadata?.schemaVersion ??
59-
eventDefinition.schemaVersion ??
60-
this.defaultVersion,
62+
schemaVersion: eventDefinition.schemaVersion ?? this.defaultVersion,
6163
correlationId: precedingMessageMetadata?.correlationId ?? this.produceId(),
6264
}
6365
}

packages/core/lib/messages/baseMessageSchemas.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
export {
22
PUBLISHER_MESSAGE_METADATA_SCHEMA,
33
CONSUMER_MESSAGE_METADATA_SCHEMA,
4-
PUBLISHER_MESSAGE_SCHEMA_EXTENSION,
5-
CONSUMER_MESSAGE_SCHEMA_EXTENSION,
64
PUBLISHER_BASE_MESSAGE_SCHEMA,
75
CONSUMER_BASE_MESSAGE_SCHEMA,
86
ConsumerBaseMessageType,

packages/schemas/lib/events/baseEventSchemas.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,55 @@
11
import type { ZodLiteral, ZodObject, ZodOptional, ZodString, ZodRawShape } from 'zod'
22
import { z } from 'zod'
33

4+
// External message metadata that describe the context in which the message was created, primarily used for debugging purposes
5+
export const PUBLISHER_MESSAGE_METADATA_SCHEMA = z
6+
.object({
7+
schemaVersion: z.optional(z.string().min(1).describe('message schema version')),
8+
// this is always set to a service that created the message
9+
producedBy: z.optional(z.string().min(1).describe('app/service that produced the message')),
10+
// this is always propagated within the message chain. For the first message in the chain it is equal to "producedBy"
11+
originatedFrom: z.optional(
12+
z
13+
.string()
14+
.min(1)
15+
.describe('app/service that initiated entire workflow that led to creating this message'),
16+
),
17+
// this is always propagated within the message chain.
18+
correlationId: z.optional(
19+
z.string().describe('unique identifier passed to all events in workflow chain'),
20+
),
21+
})
22+
.describe('external message metadata')
23+
24+
export const CONSUMER_MESSAGE_METADATA_SCHEMA = z
25+
.object({
26+
schemaVersion: z.string().min(1).describe('message schema version'),
27+
// this is always set to a service that created the message
28+
producedBy: z.string().min(1).describe('app/service that produced the message'),
29+
// this is always propagated within the message chain. For the first message in the chain it is equal to "producedBy"
30+
originatedFrom: z
31+
.string()
32+
.min(1)
33+
.describe('app/service that initiated entire workflow that led to creating this message'),
34+
// this is always propagated within the message chain.
35+
correlationId: z.string().describe('unique identifier passed to all events in workflow chain'),
36+
})
37+
.describe('external message metadata')
38+
439
// Base event fields that are typically autogenerated
540
export const GENERATED_BASE_EVENT_SCHEMA = z.object({
641
id: z.string().describe('event unique identifier'),
742
timestamp: z.string().datetime().describe('iso 8601 datetime'),
43+
// For internal domain events that did not originate within a message chain metadata field can be omitted, producer should then assume it is initiating a new chain
44+
metadata: CONSUMER_MESSAGE_METADATA_SCHEMA,
845
})
946

1047
// Base event fields that are typically autogenerated, marked as optional
1148
export const OPTIONAL_GENERATED_BASE_EVENT_SCHEMA = z.object({
1249
id: z.string().describe('event unique identifier').optional(),
1350
timestamp: z.string().datetime().describe('iso 8601 datetime').optional(),
51+
// For internal domain events that did not originate within a message chain metadata field can be omitted, producer should then assume it is initiating a new chain
52+
metadata: PUBLISHER_MESSAGE_METADATA_SCHEMA.optional(),
1453
})
1554

1655
// Base event fields that are always defined manually

packages/schemas/lib/events/eventTypes.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import type { ZodObject, ZodTypeAny } from 'zod'
22
import type z from 'zod'
33

4-
import type { PublisherMessageMetadataType } from '../messages/baseMessageSchemas'
5-
64
import type { CONSUMER_BASE_EVENT_SCHEMA, PUBLISHER_BASE_EVENT_SCHEMA } from './baseEventSchemas'
75

86
export type EventTypeNames<EventDefinition extends CommonEventDefinition> =
@@ -40,13 +38,8 @@ export type CommonEventDefinitionPublisherSchemaType<T extends CommonEventDefini
4038
export type EventHandler<
4139
EventDefinitionSchema extends
4240
CommonEventDefinitionPublisherSchemaType<CommonEventDefinition> = CommonEventDefinitionPublisherSchemaType<CommonEventDefinition>,
43-
MetadataDefinitionSchema extends
44-
Partial<PublisherMessageMetadataType> = Partial<PublisherMessageMetadataType>,
4541
> = {
46-
handleEvent(
47-
event: EventDefinitionSchema,
48-
metadata?: MetadataDefinitionSchema,
49-
): void | Promise<void>
42+
handleEvent(event: EventDefinitionSchema): void | Promise<void>
5043
}
5144

5245
export type AnyEventHandler<EventDefinitions extends CommonEventDefinition[]> = EventHandler<

0 commit comments

Comments
 (0)