Skip to content

Commit 12d24cc

Browse files
authored
Feat: kafka coonsumer observability (#287)
* Adding request context to handlers * Adding request context support * Adding transactionObservabilityManager * Lint fix * Release prepare * Fixing minor issue * Coverage fix * Adding request id header config param * Removing todo * Adding request context to publisher * Improving coverage
1 parent 9963fb6 commit 12d24cc

File tree

10 files changed

+280
-54
lines changed

10 files changed

+280
-54
lines changed

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1+
import { randomUUID } from 'node:crypto'
12
import { setTimeout } from 'node:timers/promises'
2-
import { InternalError, stringValueSerializer } from '@lokalise/node-core'
3+
import {
4+
InternalError,
5+
type TransactionObservabilityManager,
6+
stringValueSerializer,
7+
} from '@lokalise/node-core'
8+
import type { QueueConsumerDependencies } from '@message-queue-toolkit/core'
39
import {
410
type ConsumeOptions,
511
Consumer,
@@ -10,17 +16,20 @@ import {
1016
stringDeserializer,
1117
} from '@platformatic/kafka'
1218
import { AbstractKafkaService, type BaseKafkaOptions } from './AbstractKafkaService.ts'
13-
import { KafkaHandlerContainer } from './handler-container/KafkaHandlerContainer.js'
14-
import type { KafkaHandlerRouting } from './handler-container/KafkaHandlerRoutingBuilder.js'
15-
import type { KafkaHandler } from './handler-container/index.js'
19+
import { KafkaHandlerContainer } from './handler-container/KafkaHandlerContainer.ts'
20+
import type { KafkaHandlerRouting } from './handler-container/KafkaHandlerRoutingBuilder.ts'
21+
import type { KafkaHandler, RequestContext } from './handler-container/index.ts'
1622
import type { KafkaDependencies, TopicConfig } from './types.ts'
1723

24+
export type KafkaConsumerDependencies = KafkaDependencies &
25+
Pick<QueueConsumerDependencies, 'transactionObservabilityManager'>
26+
1827
export type KafkaConsumerOptions<TopicsConfig extends TopicConfig[]> = BaseKafkaOptions &
1928
Omit<
20-
ConsumerOptions<string, object, string, object>,
29+
ConsumerOptions<string, object, string, string>,
2130
'deserializers' | 'autocommit' | 'bootstrapBrokers'
2231
> &
23-
Omit<ConsumeOptions<string, object, string, object>, 'topics'> & {
32+
Omit<ConsumeOptions<string, object, string, string>, 'topics'> & {
2433
handlers: KafkaHandlerRouting<TopicsConfig>
2534
}
2635

@@ -33,14 +42,19 @@ const MAX_IN_MEMORY_RETRIES = 3
3342
export abstract class AbstractKafkaConsumer<
3443
TopicsConfig extends TopicConfig[],
3544
> extends AbstractKafkaService<TopicsConfig, KafkaConsumerOptions<TopicsConfig>> {
36-
private readonly consumer: Consumer<string, object, string, object>
37-
private consumerStream?: MessagesStream<string, object, string, object>
45+
private readonly consumer: Consumer<string, object, string, string>
46+
private consumerStream?: MessagesStream<string, object, string, string>
3847

48+
private readonly transactionObservabilityManager: TransactionObservabilityManager
3949
private readonly handlerContainer: KafkaHandlerContainer<TopicsConfig>
4050

41-
constructor(dependencies: KafkaDependencies, options: KafkaConsumerOptions<TopicsConfig>) {
51+
constructor(
52+
dependencies: KafkaConsumerDependencies,
53+
options: KafkaConsumerOptions<TopicsConfig>,
54+
) {
4255
super(dependencies, options)
4356

57+
this.transactionObservabilityManager = dependencies.transactionObservabilityManager
4458
this.handlerContainer = new KafkaHandlerContainer<TopicsConfig>(
4559
options.handlers,
4660
options.messageTypeField,
@@ -54,7 +68,7 @@ export abstract class AbstractKafkaConsumer<
5468
key: stringDeserializer,
5569
value: jsonDeserializer,
5670
headerKey: stringDeserializer,
57-
headerValue: jsonDeserializer,
71+
headerValue: stringDeserializer,
5872
},
5973
})
6074
}
@@ -87,18 +101,15 @@ export abstract class AbstractKafkaConsumer<
87101
await this.consumer.close()
88102
}
89103

90-
/*
91-
TODO: https://lokalise.atlassian.net/browse/EDEXP-493
92-
- Improve logging with logger child on constructor + add request context?
93-
- Message logging
94-
- Observability
95-
*/
96-
97-
private async consume(message: Message<string, object, string, object>): Promise<void> {
104+
private async consume(message: Message<string, object, string, string>): Promise<void> {
98105
const handler = this.handlerContainer.resolveHandler(message.topic, message.value)
99106
// if there is no handler for the message, we ignore it (simulating subscription)
100107
if (!handler) return message.commit()
101108

109+
/* v8 ignore next */
110+
const transactionId = this.resolveMessageId(message.value) ?? randomUUID()
111+
this.transactionObservabilityManager?.start(this.buildTransactionName(message), transactionId)
112+
102113
const parseResult = handler.schema.safeParse(message.value)
103114
if (!parseResult.success) {
104115
this.handlerError(parseResult.error, {
@@ -116,13 +127,19 @@ export abstract class AbstractKafkaConsumer<
116127

117128
const validatedMessage = parseResult.data
118129

130+
const requestContext = this.getRequestContext(message)
131+
119132
let retries = 0
120133
let consumed = false
121134
do {
122135
// exponential backoff -> 2^(retry-1)
123136
if (retries > 0) await setTimeout(Math.pow(2, retries - 1))
124137

125-
consumed = await this.tryToConsume({ ...message, value: validatedMessage }, handler.handler)
138+
consumed = await this.tryToConsume(
139+
{ ...message, value: validatedMessage },
140+
handler.handler,
141+
requestContext,
142+
)
126143
if (consumed) break
127144

128145
retries++
@@ -142,15 +159,18 @@ export abstract class AbstractKafkaConsumer<
142159
})
143160
}
144161

162+
this.transactionObservabilityManager?.stop(transactionId)
163+
145164
return message.commit()
146165
}
147166

148167
private async tryToConsume<MessageValue extends object>(
149-
message: Message<string, MessageValue, string, object>,
168+
message: Message<string, MessageValue, string, string>,
150169
handler: KafkaHandler<MessageValue>,
170+
requestContext: RequestContext,
151171
): Promise<boolean> {
152172
try {
153-
await handler(message)
173+
await handler(message, requestContext)
154174
return true
155175
} catch (error) {
156176
this.handlerError(error, {
@@ -161,4 +181,27 @@ export abstract class AbstractKafkaConsumer<
161181

162182
return false
163183
}
184+
185+
private buildTransactionName(message: Message<string, object, string, string>) {
186+
const messageType = this.resolveMessageType(message.value)
187+
188+
let name = `kafka:${message.topic}`
189+
if (messageType?.trim().length) name += `:${messageType.trim()}`
190+
191+
return name
192+
}
193+
194+
private getRequestContext(message: Message<string, object, string, string>): RequestContext {
195+
let reqId = message.headers.get(this.resolveHeaderRequestIdField())
196+
if (!reqId || reqId.trim().length === 0) reqId = randomUUID()
197+
198+
return {
199+
reqId,
200+
logger: this.logger.child({
201+
'x-request-id': reqId,
202+
topic: message.topic,
203+
messageKey: message.key,
204+
}),
205+
}
206+
}
164207
}

packages/kafka/lib/AbstractKafkaPublisher.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
stringSerializer,
99
} from '@platformatic/kafka'
1010
import { AbstractKafkaService, type BaseKafkaOptions } from './AbstractKafkaService.ts'
11+
import type { RequestContext } from './handler-container/index.js'
1112
import type {
1213
KafkaDependencies,
1314
SupportedMessageValuesInput,
@@ -17,12 +18,12 @@ import type {
1718
} from './types.ts'
1819

1920
export type KafkaPublisherOptions<TopicsConfig extends TopicConfig[]> = BaseKafkaOptions &
20-
Omit<ProduceOptions<string, object, string, object>, 'serializers'> & {
21+
Omit<ProduceOptions<string, object, string, string>, 'serializers'> & {
2122
topicsConfig: TopicsConfig
2223
}
2324

2425
export type KafkaMessageOptions = Omit<
25-
MessageToProduce<string, object, string, object>,
26+
MessageToProduce<string, object, string, string>,
2627
'topic' | 'value'
2728
>
2829

@@ -35,7 +36,7 @@ export abstract class AbstractKafkaPublisher<
3536
MessageSchemaContainer<SupportedMessageValuesInput<TopicsConfig>>
3637
>
3738

38-
private readonly producer: Producer<string, object, string, object>
39+
private readonly producer: Producer<string, object, string, string>
3940
private isInitiated: boolean
4041

4142
constructor(dependencies: KafkaDependencies, options: KafkaPublisherOptions<TopicsConfig>) {
@@ -61,7 +62,7 @@ export abstract class AbstractKafkaPublisher<
6162
key: stringSerializer,
6263
value: jsonSerializer,
6364
headerKey: stringSerializer,
64-
headerValue: jsonSerializer,
65+
headerValue: stringSerializer,
6566
},
6667
})
6768
}
@@ -91,6 +92,7 @@ export abstract class AbstractKafkaPublisher<
9192
async publish<Topic extends SupportedTopics<TopicsConfig>>(
9293
topic: Topic,
9394
message: SupportedMessageValuesInputForTopic<TopicsConfig, Topic>,
95+
requestContext?: RequestContext,
9496
options?: KafkaMessageOptions,
9597
): Promise<void> {
9698
const schemaResult = this.schemaContainers[topic]?.resolveSchema(message)
@@ -102,8 +104,15 @@ export abstract class AbstractKafkaPublisher<
102104
try {
103105
const parsedMessage = schemaResult.result.parse(message)
104106

107+
const headers = {
108+
...options?.headers,
109+
[this.resolveHeaderRequestIdField()]: requestContext?.reqId ?? '',
110+
}
111+
105112
// biome-ignore lint/style/noNonNullAssertion: Should always exist due to lazy init
106-
await this.producer!.send({ messages: [{ ...options, topic, value: parsedMessage }] })
113+
await this.producer!.send({
114+
messages: [{ ...options, topic, value: parsedMessage, headers }],
115+
})
107116

108117
this.handleMessageProcessed({
109118
message: parsedMessage,

packages/kafka/lib/AbstractKafkaService.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ export type BaseKafkaOptions = {
2424
kafka: KafkaConfig
2525
messageTypeField?: string
2626
messageIdField?: string
27+
/**
28+
* The field in the message headers that contains the request ID.
29+
* This is used to correlate logs and transactions with the request.
30+
* Defaults to 'x-request-id'.
31+
*/
32+
headerRequestIdField?: string
2733
handlerSpy?: HandlerSpy<object> | HandlerSpyParams | boolean
2834
logMessages?: boolean
2935
} & Omit<BaseOptions, keyof KafkaConfig> // Exclude properties that are already in KafkaConfig
@@ -67,6 +73,10 @@ export abstract class AbstractKafkaService<
6773
return message[this.options.messageIdField]
6874
}
6975

76+
protected resolveHeaderRequestIdField(): string {
77+
return this.options.headerRequestIdField ?? 'x-request-id'
78+
}
79+
7080
protected handleMessageProcessed(params: {
7181
message: SupportedMessageValues<TopicsConfig>
7282
processingResult: MessageProcessingResult

packages/kafka/lib/handler-container/KafkaHandlerConfig.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
1+
import type { CommonLogger } from '@lokalise/node-core'
12
import type { Message } from '@platformatic/kafka'
23
import type { ZodSchema } from 'zod'
34

5+
export interface RequestContext {
6+
logger: CommonLogger
7+
reqId: string
8+
}
9+
410
export type KafkaHandler<MessageValue extends object> = (
5-
message: Message<string, MessageValue, string, object>,
11+
message: Message<string, MessageValue, string, string>,
12+
requestContext: RequestContext,
613
) => Promise<void> | void
714

815
export class KafkaHandlerConfig<MessageValue extends object> {

packages/kafka/lib/handler-container/KafkaHandlerRoutingBuilder.spec.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import type { Message } from '@platformatic/kafka'
22
import { expectTypeOf } from 'vitest'
33
import z from 'zod'
44
import type { TopicConfig } from '../types.ts'
5-
import { KafkaHandlerConfig } from './KafkaHandlerConfig.ts'
5+
import { KafkaHandlerConfig, type RequestContext } from './KafkaHandlerConfig.ts'
66
import { KafkaHandlerRoutingBuilder } from './KafkaHandlerRoutingBuilder.ts'
77

88
const CREATE_SCHEMA = z.object({ type: z.literal('create') })
@@ -17,25 +17,29 @@ type TopicsConfig = typeof topicsConfig
1717

1818
describe('KafkaHandlerRoutingBuilder', () => {
1919
it('should build routing config', () => {
20-
type ExpectedMessage<MessageValue> = Message<string, MessageValue, string, object>
20+
type ExpectedMessage<MessageValue> = Message<string, MessageValue, string, string>
21+
2122
// Given
2223
const builder = new KafkaHandlerRoutingBuilder<TopicsConfig>()
2324
.addConfig(
2425
'all',
25-
new KafkaHandlerConfig(CREATE_SCHEMA, (message) => {
26+
new KafkaHandlerConfig(CREATE_SCHEMA, (message, requestContext) => {
2627
expectTypeOf(message).toEqualTypeOf<ExpectedMessage<z.infer<typeof CREATE_SCHEMA>>>()
28+
expectTypeOf(requestContext).toEqualTypeOf<RequestContext>()
2729
}),
2830
)
2931
.addConfig(
3032
'all',
31-
new KafkaHandlerConfig(UPDATE_SCHEMA, (message) => {
33+
new KafkaHandlerConfig(UPDATE_SCHEMA, (message, requestContext) => {
3234
expectTypeOf(message).toEqualTypeOf<ExpectedMessage<z.infer<typeof UPDATE_SCHEMA>>>()
35+
expectTypeOf(requestContext).toEqualTypeOf<RequestContext>()
3336
}),
3437
)
3538
.addConfig(
3639
'empty',
37-
new KafkaHandlerConfig(EMPTY_SCHEMA, (message) => {
40+
new KafkaHandlerConfig(EMPTY_SCHEMA, (message, requestContext) => {
3841
expectTypeOf(message).toEqualTypeOf<ExpectedMessage<z.infer<typeof EMPTY_SCHEMA>>>()
42+
expectTypeOf(requestContext).toEqualTypeOf<RequestContext>()
3943
}),
4044
)
4145

packages/kafka/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/kafka",
3-
"version": "0.0.0",
3+
"version": "0.1.0",
44
"engines": {
55
"node": ">= 22.14.0"
66
},
@@ -54,7 +54,7 @@
5454
"zod": "^3.25.7"
5555
},
5656
"peerDependencies": {
57-
"@message-queue-toolkit/core": ">=21.0.0",
57+
"@message-queue-toolkit/core": ">=21.3.0",
5858
"@message-queue-toolkit/schemas": ">=6.0.0"
5959
},
6060
"devDependencies": {

0 commit comments

Comments
 (0)