Skip to content

Commit d9f2518

Browse files
authored
EXP-585 Kafka metrics manager (#308)
* EXP-585 Using metrics manager * Lint fix * EXP-585 Improving id resolving logic * Fixing tests * EXP-585 Adjusting processed message * EXP-585 Adding publisher tests * EXP-585 Adding consumer tests * EXP-585 Fixing timestamp issue * release prepare * Coverage fix
1 parent 7296eb8 commit d9f2518

File tree

9 files changed

+269
-71
lines changed

9 files changed

+269
-71
lines changed

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ export abstract class AbstractKafkaConsumer<
8989
})
9090

9191
const logDetails = { origin: this.constructor.name, groupId: this.options.groupId }
92+
/* v8 ignore start */
9293
this.consumer.on('consumer:group:join', (_) =>
9394
this.logger.debug(logDetails, 'Consumer is joining a group'),
9495
)
@@ -101,6 +102,7 @@ export abstract class AbstractKafkaConsumer<
101102
this.consumer.on('consumer:group:rebalance', (_) =>
102103
this.logger.debug(logDetails, 'Group is rebalancing'),
103104
)
105+
/* v8 ignore stop */
104106
}
105107

106108
async init(): Promise<void> {
@@ -135,6 +137,8 @@ export abstract class AbstractKafkaConsumer<
135137
// message.value can be undefined if the message is not JSON-serializable
136138
if (!message.value) return this.commitMessage(message)
137139

140+
const messageProcessingStartTimestamp = Date.now()
141+
138142
const handler = this.handlerContainer.resolveHandler(message.topic, message.value)
139143
// if there is no handler for the message, we ignore it (simulating subscription)
140144
if (!handler) return this.commitMessage(message)
@@ -150,9 +154,9 @@ export abstract class AbstractKafkaConsumer<
150154
message: stringValueSerializer(message.value),
151155
})
152156
this.handleMessageProcessed({
153-
topic: message.topic,
154-
message: message.value,
157+
message: message,
155158
processingResult: { status: 'error', errorReason: 'invalidMessage' },
159+
messageProcessingStartTimestamp,
156160
})
157161

158162
return this.commitMessage(message)
@@ -180,15 +184,15 @@ export abstract class AbstractKafkaConsumer<
180184

181185
if (consumed) {
182186
this.handleMessageProcessed({
183-
topic: message.topic,
184-
message: validatedMessage,
187+
message: message,
185188
processingResult: { status: 'consumed' },
189+
messageProcessingStartTimestamp,
186190
})
187191
} else {
188192
this.handleMessageProcessed({
189-
topic: message.topic,
190-
message: validatedMessage,
193+
message: message,
191194
processingResult: { status: 'error', errorReason: 'handlerError' },
195+
messageProcessingStartTimestamp,
192196
})
193197
}
194198

@@ -223,9 +227,7 @@ export abstract class AbstractKafkaConsumer<
223227
)
224228
await message.commit()
225229
} catch (error) {
226-
if (error instanceof ResponseError) {
227-
return this.handleResponseErrorOnCommit(error)
228-
}
230+
if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error)
229231
throw error
230232
}
231233
}

packages/kafka/lib/AbstractKafkaPublisher.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ export abstract class AbstractKafkaPublisher<
9595
requestContext?: RequestContext,
9696
options?: KafkaMessageOptions,
9797
): Promise<void> {
98+
const messageProcessingStartTimestamp = Date.now()
99+
98100
const schemaResult = this.schemaContainers[topic]?.resolveSchema(message)
99101
if (!schemaResult) throw new Error(`Message schemas not found for topic: ${topic}`)
100102
if (schemaResult.error) throw schemaResult.error
@@ -109,15 +111,15 @@ export abstract class AbstractKafkaPublisher<
109111
[this.resolveHeaderRequestIdField()]: requestContext?.reqId ?? '',
110112
}
111113

112-
// biome-ignore lint/style/noNonNullAssertion: Should always exist due to lazy init
113-
await this.producer!.send({
114-
messages: [{ ...options, topic, value: parsedMessage, headers }],
114+
const kafkaMessage = { ...options, topic, value: parsedMessage, headers }
115+
await this.producer?.send({
116+
messages: [kafkaMessage],
115117
})
116118

117119
this.handleMessageProcessed({
118-
message: parsedMessage,
120+
message: kafkaMessage,
119121
processingResult: { status: 'published' },
120-
topic,
122+
messageProcessingStartTimestamp,
121123
})
122124
} catch (error) {
123125
const errorDetails = {

packages/kafka/lib/AbstractKafkaService.ts

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@ import {
55
resolveGlobalErrorLogObject,
66
stringValueSerializer,
77
} from '@lokalise/node-core'
8+
import type { MakeRequired, MayOmit } from '@lokalise/universal-ts-utils/node'
89
import {
910
type HandlerSpy,
1011
type HandlerSpyParams,
12+
type MessageMetricsManager,
1113
type MessageProcessingResult,
1214
type PublicHandlerSpy,
1315
resolveHandlerSpy,
1416
} from '@message-queue-toolkit/core'
15-
import type { BaseOptions } from '@platformatic/kafka'
17+
import type { BaseOptions, Message } from '@platformatic/kafka'
1618
import type {
1719
KafkaConfig,
1820
KafkaDependencies,
@@ -34,20 +36,29 @@ export type BaseKafkaOptions = {
3436
logMessages?: boolean
3537
} & Omit<BaseOptions, keyof KafkaConfig> // Exclude properties that are already in KafkaConfig
3638

39+
type ProcessedMessage = MayOmit<
40+
Pick<Message<string, object, string, string>, 'topic' | 'value' | 'timestamp'>,
41+
'timestamp'
42+
>
43+
3744
export abstract class AbstractKafkaService<
3845
TopicsConfig extends TopicConfig[],
3946
KafkaOptions extends BaseKafkaOptions,
4047
> {
4148
protected readonly errorReporter: ErrorReporter
4249
protected readonly logger: CommonLogger
50+
protected readonly messageMetricsManager?: MessageMetricsManager<
51+
SupportedMessageValues<TopicsConfig>
52+
>
4353

44-
protected readonly options: KafkaOptions
54+
protected readonly options: MakeRequired<KafkaOptions, 'messageIdField'>
4555
protected readonly _handlerSpy?: HandlerSpy<SupportedMessageValues<TopicsConfig>>
4656

4757
constructor(dependencies: KafkaDependencies, options: KafkaOptions) {
4858
this.logger = dependencies.logger
4959
this.errorReporter = dependencies.errorReporter
50-
this.options = options
60+
this.messageMetricsManager = dependencies.messageMetricsManager
61+
this.options = { ...options, messageIdField: options.messageIdField ?? 'id' }
5162

5263
this._handlerSpy = resolveHandlerSpy(options)
5364
}
@@ -65,40 +76,54 @@ export abstract class AbstractKafkaService<
6576

6677
protected resolveMessageType(message: SupportedMessageValues<TopicsConfig>): string | undefined {
6778
if (!this.options.messageTypeField) return undefined
68-
return message[this.options.messageTypeField]
79+
return message[this.options.messageTypeField] as string | undefined
6980
}
7081

7182
protected resolveMessageId(message: SupportedMessageValues<TopicsConfig>): string | undefined {
72-
if (!this.options.messageIdField) return undefined
73-
return message[this.options.messageIdField]
83+
// @ts-expect-error
84+
return message[this.options.messageIdField] as string | undefined
7485
}
7586

7687
protected resolveHeaderRequestIdField(): string {
7788
return this.options.headerRequestIdField ?? 'x-request-id'
7889
}
7990

8091
protected handleMessageProcessed(params: {
81-
message: SupportedMessageValues<TopicsConfig>
92+
message: ProcessedMessage
8293
processingResult: MessageProcessingResult
83-
topic: string
94+
messageProcessingStartTimestamp: number
8495
}) {
85-
const { message, processingResult, topic } = params
86-
const messageId = this.resolveMessageId(message)
96+
const { message, processingResult } = params
97+
const messageId = this.resolveMessageId(message.value)
98+
const messageType = this.resolveMessageType(message.value)
8799

88-
this._handlerSpy?.addProcessedMessage({ message, processingResult }, messageId)
100+
this._handlerSpy?.addProcessedMessage({ message: message.value, processingResult }, messageId)
89101

90102
if (this.options.logMessages) {
91103
this.logger.debug(
92104
{
93-
message: stringValueSerializer(message),
94-
topic,
105+
message: stringValueSerializer(message.value),
106+
topic: message.topic,
95107
processingResult,
96108
messageId,
97-
messageType: this.resolveMessageType(message),
109+
messageType,
98110
},
99111
`Finished processing message ${messageId}`,
100112
)
101113
}
114+
115+
if (this.messageMetricsManager) {
116+
this.messageMetricsManager.registerProcessedMessage({
117+
message: message.value,
118+
processingResult,
119+
queueName: message.topic,
120+
messageId: messageId ?? 'unknown',
121+
messageType: messageType ?? 'unknown',
122+
messageTimestamp: message.timestamp ? Number(message.timestamp) : undefined,
123+
messageProcessingStartTimestamp: params.messageProcessingStartTimestamp,
124+
messageProcessingEndTimestamp: Date.now(),
125+
})
126+
}
102127
}
103128

104129
protected handlerError(error: unknown, context: Record<string, unknown> = {}): void {

packages/kafka/lib/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import type { ConnectionOptions } from '@platformatic/kafka'
33
import type { ZodSchema } from 'zod'
44
import type z from 'zod/v3'
55

6-
export type KafkaDependencies = Omit<QueueDependencies, 'messageMetricsManager'>
6+
export type KafkaDependencies = QueueDependencies
77

88
export type KafkaConfig = {
99
bootstrapBrokers: string[]

packages/kafka/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/kafka",
3-
"version": "0.2.3",
3+
"version": "0.3.0",
44
"engines": {
55
"node": ">= 22.14.0"
66
},

0 commit comments

Comments
 (0)