Skip to content

Commit 3b880f9

Browse files
committed
feat: basic mdc support, fix correlationId reset
1 parent 6149923 commit 3b880f9

File tree

5 files changed

+160
-28
lines changed

5 files changed

+160
-28
lines changed

src/integration.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
IrisChannels,
44
} from './lib/asyncapi/schema/channels'
55
import { type CustomErrorI, registerRejectableErrors } from './lib/errors'
6+
import { MdcI, registerMdcProvider } from './lib/mdc'
67
import Logger, { type LoggerI } from './logger'
78

89
export interface IrisIntegrationI {
@@ -11,6 +12,19 @@ export interface IrisIntegrationI {
1112
asyncapi?: {
1213
customChannelClasses?: CustomChannelClassesI
1314
}
15+
16+
/**
17+
* Since MDC is not integrated in same way as with Java IRIS
18+
* counterpart it's up to integration whether to use it or not.
19+
* In case it's used, this method can be used to obtain MDC instance.
20+
*
21+
* Current usage within node IRIS is to:
22+
* - use reqiestId value as correlationId (for HTTP requests that
23+
* produce events)
24+
* - be able to omit userId agument with the `publish.publishToUser()`
25+
* method
26+
*/
27+
mdcProvider?: () => MdcI | undefined | Promise<MdcI | undefined>
1428
}
1529

1630
/**
@@ -26,4 +40,7 @@ export function update(config: IrisIntegrationI): void {
2640
if (config.asyncapi?.customChannelClasses !== undefined) {
2741
IrisChannels.setCustomChannelClasses(config.asyncapi.customChannelClasses)
2842
}
43+
if (config.mdcProvider !== undefined) {
44+
registerMdcProvider(config.mdcProvider)
45+
}
2946
}

src/lib/mdc.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,24 @@ import { MDC_CLASS, SetMetadata } from './storage'
1111

1212
const { MESSAGE: MH } = MESSAGE_HEADERS
1313

14+
let mdcProvider: undefined | (() => MDC | undefined | Promise<MDC | undefined>)
15+
1416
export type MdcI = {
1517
sessionId?: string
1618
userId?: string
1719
clientTraceId?: string
1820
correlationId?: string
1921
eventType?: string
2022
clientVersion?: string
23+
24+
/**
25+
* HTTP related
26+
* if MDC provider funciton is provided then
27+
* when requestId is available within the MDC
28+
* it is used as a correlationId instaed of
29+
* generating a new one.
30+
*/
31+
requestId?: string
2132
}
2233

2334
export class MDC implements MdcI {
@@ -27,6 +38,7 @@ export class MDC implements MdcI {
2738
correlationId?: string
2839
eventType?: string
2940
clientVersion?: string
41+
requestId?: string
3042
}
3143

3244
SetMetadata(MDC_CLASS, true)(MDC)
@@ -60,6 +72,16 @@ export const isMDCClass = (target: unknown): boolean => {
6072
return false
6173
}
6274

75+
export async function getMdc(): Promise<MdcI | undefined> {
76+
if (mdcProvider !== undefined) {
77+
return await mdcProvider()
78+
}
79+
}
80+
81+
export function registerMdcProvider(provider: typeof mdcProvider) {
82+
mdcProvider = provider
83+
}
84+
6385
function setFromHeader(
6486
amqpMessage: Pick<AmqpMessage, 'properties'>,
6587
ref: MdcI,

src/lib/message_handler.param.decorator.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ export class AmqpMessage implements amqplib.ConsumeMessage {
1212

1313
SetMetadata(AMQP_MESSAGE_CLASS, true)(AmqpMessage)
1414

15-
export const isAmqpMessageClass = (target: unknown): boolean => {
15+
export const isAmqpMessageClass = (target: unknown): target is AmqpMessage => {
1616
if (typeof target === 'object' && target !== null) {
1717
if (target instanceof AmqpMessage) {
1818
return true

src/lib/publish.ts

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import * as constants from './constants'
99
import { asError } from './errors'
1010
import flags from './flags'
1111
import * as helper from './helper'
12+
import { amqpToMDC, getMdc } from './mdc'
1213
import * as message from './message'
1314
import { getPublishExchangeProps } from './message.process'
1415
import {
@@ -42,7 +43,7 @@ export function getUserPublisher<T>(messageClass: ClassConstructor<T>) {
4243

4344
return async (
4445
msg: T,
45-
user: string | AmqpMessage,
46+
user?: string | AmqpMessage,
4647
pubOpts?: publishI.PublishOptionsI,
4748
): Promise<boolean> => publishToUser(messageClass, msg, user, pubOpts)
4849
}
@@ -57,21 +58,21 @@ export const publish = async <T>(
5758
* Send the mesaage to `user` exchange,
5859
* ignoring the exchange set on event being published.
5960
*
60-
* @{param user} - user id or AmqpMessage
61+
* @param {string} [user] - user id or AmqpMessage
6162
* when processing a message from user, AmqpMessage can
6263
* be used to get user id from consumed message.
64+
*
65+
* When omitted it is obtained from the MDC via mdcProvider.
66+
*
67+
*
6368
*/
6469
export const publishToUser = async <T>(
6570
messageClass: ClassConstructor<T>,
6671
msg: T,
67-
user: string | AmqpMessage,
72+
user?: string | AmqpMessage,
6873
pubOpts?: publishI.PublishOptionsI,
6974
): Promise<boolean> => {
70-
const userIdString = typeof user === 'string'
71-
const hasOriginalMsg = !userIdString && isAmqpMessageClass(user)
72-
const userId = userIdString
73-
? user
74-
: _.get(user, `properties.headers[${MESSAGE_HEADERS.MESSAGE.USER_ID}]`)
75+
const userId = await obtainUserId(user || pubOpts?.amqpPublishOpts)
7576

7677
if (!_.isString(userId) || _.isEmpty(userId)) {
7778
throw new Error('ERR_IRIS_PUBLISHER_USER_ID_NOT_RESOLVED')
@@ -81,7 +82,7 @@ export const publishToUser = async <T>(
8182
messageClass,
8283
msg,
8384
Object.assign({}, pubOpts, { userId }),
84-
hasOriginalMsg ? <AmqpMessage>user : undefined,
85+
isAmqpMessageClass(user) ? <AmqpMessage>user : undefined,
8586
message.Scope.USER,
8687
)
8788
}
@@ -122,7 +123,12 @@ async function internalPublish<T>(
122123
msgMeta,
123124
msgString,
124125
routingKey,
125-
getAmqpBasicProperties(exchangeName, msgMeta, originalMessage, pubOpts),
126+
await getAmqpBasicProperties(
127+
exchangeName,
128+
msgMeta,
129+
originalMessage,
130+
pubOpts,
131+
),
126132
overrideScope,
127133
)
128134
}
@@ -133,6 +139,7 @@ export async function doPublish(
133139
routingKeyArg: string,
134140
options?: amqplib.Options.Publish,
135141
overrideScope?: message.Scope,
142+
originalMessage?: Pick<amqplib.Message, 'properties'>,
136143
): Promise<boolean> {
137144
validateBeforePublish(msgMeta, options, overrideScope)
138145

@@ -146,12 +153,25 @@ export async function doPublish(
146153

147154
const routingKey = publishingExchangeRoutingKey ?? routingKeyArg
148155

149-
logger.debug(TAG, `Publishing message to "${publishingExchangeName}"`, {
156+
const logInfo: any = {
150157
evt: msg,
151158
routingKey,
152159
publishingExchangeName,
153160
options: amqpHelper.safeAmqpObjectForLogging(options),
154-
})
161+
}
162+
163+
const mdc =
164+
originalMessage !== undefined ? amqpToMDC(originalMessage) : await getMdc()
165+
166+
if (mdc !== undefined) {
167+
logInfo.mdc = mdc
168+
}
169+
170+
logger.debug(
171+
TAG,
172+
`Publishing message to "${publishingExchangeName}"`,
173+
logInfo,
174+
)
155175

156176
const channel = await connection.assureDefaultChannel()
157177

@@ -190,13 +210,13 @@ async function msg2String<T>(
190210
return JSON.stringify(msg)
191211
}
192212

193-
function getAmqpBasicProperties(
213+
async function getAmqpBasicProperties(
194214
exchangeName: string,
195215
msgMeta: message.ProcessedMessageMetadataI,
196216
originalMsg?: Pick<amqplib.Message, 'properties'>,
197217
pubOpts?: publishI.PublishOptionsI,
198-
): Partial<amqplib.MessageProperties> {
199-
const amqpProperties = getAmqpPropsWithoutHeaders(originalMsg, pubOpts)
218+
): Promise<Partial<amqplib.MessageProperties>> {
219+
const amqpProperties = await getAmqpPropsWithoutHeaders(originalMsg, pubOpts)
200220
const amqpHeaders = getAmqpHeaders(exchangeName, originalMsg, pubOpts)
201221

202222
// TODO: subscription updates, set cache ttl, subcription id etc.
@@ -208,11 +228,7 @@ function getAmqpBasicProperties(
208228

209229
if (pubOpts?.userId !== undefined) {
210230
const serviceId = helper.getServiceName()
211-
const correlationId = randomUUID()
212231

213-
// when overriding user header make sure
214-
// to clean possible existing event context properties
215-
amqpProperties.correlationId = correlationId
216232
amqpHeaders[MESSAGE_HEADERS.MESSAGE.ORIGIN_SERVICE_ID] = serviceId
217233
amqpHeaders[MESSAGE_HEADERS.MESSAGE.USER_ID] = pubOpts.userId
218234
delete amqpHeaders[MESSAGE_HEADERS.MESSAGE.ROUTER]
@@ -225,12 +241,12 @@ function getAmqpBasicProperties(
225241
}
226242
}
227243

228-
function getAmqpPropsWithoutHeaders(
244+
async function getAmqpPropsWithoutHeaders(
229245
originalMsg?: Pick<amqplib.Message, 'properties'>,
230246
pubOpts?: publishI.PublishOptionsI,
231-
): Partial<Omit<amqplib.MessageProperties, 'headers'>> {
247+
): Promise<Partial<Omit<amqplib.MessageProperties, 'headers'>>> {
232248
const forceOptions = pubOpts?.amqpPublishOpts
233-
const correlationId = randomUUID()
249+
const correlationId = (await getMdc())?.requestId ?? randomUUID()
234250

235251
const inheritedProperties:
236252
| Omit<amqplib.MessageProperties, 'headers'>
@@ -295,3 +311,35 @@ function validateBeforePublish(
295311
}
296312
}
297313
}
314+
315+
async function obtainUserId(
316+
user?:
317+
| string
318+
| AmqpMessage
319+
| Pick<amqplib.Options.Publish, 'headers' | 'userId'>,
320+
): Promise<string | undefined> {
321+
if (typeof user === 'string') {
322+
return user
323+
}
324+
325+
if (isAmqpMessageClass(user)) {
326+
return (
327+
_.get(user, 'properties.userId') ||
328+
_.get(user, `properties.headers[${MESSAGE_HEADERS.MESSAGE.USER_ID}]`)
329+
)
330+
}
331+
332+
// MessageProperties
333+
if (typeof user === 'object') {
334+
return (
335+
_.get(user, 'userId') ||
336+
_.get(user, `headers[${MESSAGE_HEADERS.MESSAGE.USER_ID}]`)
337+
)
338+
}
339+
340+
const mdc = await getMdc()
341+
342+
if (mdc !== undefined) {
343+
return mdc.userId
344+
}
345+
}

test/e2e/publish_to_user.spec.ts

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
import { randomUUID } from 'node:crypto'
12
import { IsString } from 'class-validator'
23
import {
34
constants,
45
AmqpMessage,
56
Message,
67
MessageHandler,
78
Scope,
9+
mdc,
810
publish,
911
} from '../../src'
1012
import { irisTesting } from '../setup'
@@ -54,13 +56,14 @@ describe('Publishing non Scope.USER messages to user via publish.[publishToUser/
5456
})
5557

5658
test('Sending SESSION event to USER should work', async () => {
59+
const userId = randomUUID()
5760
const spyHandler = vi.spyOn(handler, 'publishInternalToUser')
5861
const headers = { foo: `bar_${Date.now()}` }
5962

6063
await publish.publishToUser(
6164
EvtSession,
6265
{ msg: 'do-publish-internal-to-user' },
63-
'user-id',
66+
userId,
6467
{ amqpPublishOpts: { headers } },
6568
)
6669

@@ -78,21 +81,28 @@ describe('Publishing non Scope.USER messages to user via publish.[publishToUser/
7881
fields: expect.any(Object),
7982
content: expect.any(Buffer),
8083
properties: expect.objectContaining({
81-
headers: expect.objectContaining(headers),
84+
headers: expect.objectContaining({
85+
...headers,
86+
[constants.MESSAGE_HEADERS.MESSAGE.USER_ID]: userId,
87+
}),
8288
}),
8389
}),
8490
)
8591
})
8692
})
8793

8894
test('Sending INTERNAL event to USER should work', async () => {
95+
const userId = randomUUID()
8996
const spyHandler = vi.spyOn(handler, 'publishInternalToUser')
90-
const headers = { foo: `bar_${Date.now()}` }
97+
const headers = {
98+
foo: `bar_${Date.now()}`,
99+
[constants.MESSAGE_HEADERS.MESSAGE.USER_ID]: userId,
100+
}
91101

92102
await publish.publishToUser(
93103
EvtInternal,
94104
{ msg: 'do-publish-internal-to-user' },
95-
'user-id',
105+
undefined,
96106
{ amqpPublishOpts: { headers } },
97107
)
98108

@@ -117,7 +127,42 @@ describe('Publishing non Scope.USER messages to user via publish.[publishToUser/
117127
})
118128
})
119129

120-
test('Should throw if user-id is missing', async () => {
130+
test('Sending INTERNAL event to USER should work via registered mdcProvider', async () => {
131+
const userId = randomUUID()
132+
const spyHandler = vi.spyOn(handler, 'publishInternalToUser')
133+
134+
mdc.registerMdcProvider(() => ({ userId }))
135+
136+
await publish.publishToUser(EvtInternal, {
137+
msg: 'do-publish-internal-to-user',
138+
})
139+
140+
await vi.waitFor(() => {
141+
expect(spyHandler).toHaveBeenCalledTimes(2)
142+
expect(spyHandler).toHaveBeenNthCalledWith(
143+
1,
144+
{ msg: 'do-publish-internal-to-user' },
145+
expect.anything(),
146+
)
147+
expect(spyHandler).toHaveBeenNthCalledWith(
148+
2,
149+
{ msg: 'hello' },
150+
expect.objectContaining({
151+
fields: expect.any(Object),
152+
content: expect.any(Buffer),
153+
properties: expect.objectContaining({
154+
headers: expect.objectContaining({
155+
[constants.MESSAGE_HEADERS.MESSAGE.USER_ID]: userId,
156+
}),
157+
}),
158+
}),
159+
)
160+
})
161+
162+
mdc.registerMdcProvider(undefined)
163+
})
164+
165+
test('Should throw if userId is missing', async () => {
121166
await expect(
122167
publish.publishToUser(
123168
EvtInternal,

0 commit comments

Comments
 (0)