Skip to content

Commit 079f2d0

Browse files
Kafka: handle commit errors and add logging (#297)
* EXP-553 handle commit errors and add logging * EXP-553 updated kafka package version * EXP-553 downgraded biomejs version * EXP-553 downgraded biomejs version * EXP-553 reduced coverage thresholds * Minor adjustments * PR feedback --------- Co-authored-by: CarlosGamero <[email protected]>
1 parent 8679b63 commit 079f2d0

File tree

13 files changed

+87
-20
lines changed

13 files changed

+87
-20
lines changed

packages/amqp/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
"amqplib": "^0.10.8"
3838
},
3939
"devDependencies": {
40-
"@biomejs/biome": "1.9.4",
40+
"@biomejs/biome": "^1.9.4",
4141
"@lokalise/biome-config": "^2.0.0",
4242
"@lokalise/tsconfig": "^2.0.0",
4343
"@message-queue-toolkit/core": "*",

packages/core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
"zod": "^3.25.67"
3636
},
3737
"devDependencies": {
38-
"@biomejs/biome": "1.9.4",
38+
"@biomejs/biome": "^1.9.4",
3939
"@lokalise/biome-config": "^2.0.0",
4040
"@lokalise/tsconfig": "^2.0.0",
4141
"@types/node": "^24.0.3",

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,16 @@ import {
1212
type ConsumerOptions,
1313
type Message,
1414
type MessagesStream,
15+
ProtocolError,
16+
ResponseError,
1517
stringDeserializer,
1618
} from '@platformatic/kafka'
1719
import { AbstractKafkaService, type BaseKafkaOptions } from './AbstractKafkaService.ts'
1820
import { KafkaHandlerContainer } from './handler-container/KafkaHandlerContainer.ts'
1921
import type { KafkaHandlerRouting } from './handler-container/KafkaHandlerRoutingBuilder.ts'
2022
import type { KafkaHandler, RequestContext } from './handler-container/index.ts'
2123
import type { KafkaConfig, KafkaDependencies, TopicConfig } from './types.ts'
24+
import { ILLEGAL_GENERATION, REBALANCE_IN_PROGRESS, UNKNOWN_MEMBER_ID } from './utils/errorCodes.js'
2225
import { safeJsonDeserializer } from './utils/safeJsonDeserializer.js'
2326

2427
export type KafkaConsumerDependencies = KafkaDependencies &
@@ -36,6 +39,12 @@ export type KafkaConsumerOptions<
3639
handlers: KafkaHandlerRouting<TopicsConfig, ExecutionContext>
3740
}
3841

42+
const commitErrorCodesToIgnore = new Set([
43+
ILLEGAL_GENERATION,
44+
UNKNOWN_MEMBER_ID,
45+
REBALANCE_IN_PROGRESS,
46+
])
47+
3948
/*
4049
TODO: Proper retry mechanism + DLQ -> https://lokalise.atlassian.net/browse/EDEXP-498
4150
In the meantime, we will retry in memory up to 3 times
@@ -78,6 +87,20 @@ export abstract class AbstractKafkaConsumer<
7887
headerValue: stringDeserializer,
7988
},
8089
})
90+
91+
const logDetails = { origin: this.constructor.name, groupId: this.options.groupId }
92+
this.consumer.on('consumer:group:join', (_) =>
93+
this.logger.debug(logDetails, 'Consumer is joining a group'),
94+
)
95+
this.consumer.on('consumer:rejoin', (_) =>
96+
this.logger.debug(logDetails, 'Consumer is re-joining a group after a rebalance'),
97+
)
98+
this.consumer.on('consumer:group:leave', (_) =>
99+
this.logger.debug(logDetails, 'Consumer is leaving the group'),
100+
)
101+
this.consumer.on('consumer:group:rebalance', (_) =>
102+
this.logger.debug(logDetails, 'Group is rebalancing'),
103+
)
81104
}
82105

83106
async init(): Promise<void> {
@@ -110,11 +133,11 @@ export abstract class AbstractKafkaConsumer<
110133

111134
private async consume(message: Message<string, object, string, string>): Promise<void> {
112135
// message.value can be undefined if the message is not JSON-serializable
113-
if (!message.value) return message.commit()
136+
if (!message.value) return this.commitMessage(message)
114137

115138
const handler = this.handlerContainer.resolveHandler(message.topic, message.value)
116139
// if there is no handler for the message, we ignore it (simulating subscription)
117-
if (!handler) return message.commit()
140+
if (!handler) return this.commitMessage(message)
118141

119142
/* v8 ignore next */
120143
const transactionId = this.resolveMessageId(message.value) ?? randomUUID()
@@ -132,7 +155,7 @@ export abstract class AbstractKafkaConsumer<
132155
processingResult: { status: 'error', errorReason: 'invalidMessage' },
133156
})
134157

135-
return message.commit()
158+
return this.commitMessage(message)
136159
}
137160

138161
const validatedMessage = parseResult.data
@@ -171,7 +194,7 @@ export abstract class AbstractKafkaConsumer<
171194

172195
this.transactionObservabilityManager?.stop(transactionId)
173196

174-
return message.commit()
197+
return this.commitMessage(message)
175198
}
176199

177200
private async tryToConsume<MessageValue extends object>(
@@ -192,6 +215,46 @@ export abstract class AbstractKafkaConsumer<
192215
return false
193216
}
194217

218+
private async commitMessage(message: Message<string, object, string, string>) {
219+
try {
220+
this.logger.debug(
221+
{ topic: message.topic, offset: message.offset, timestamp: message.timestamp },
222+
'Trying to commit message',
223+
)
224+
await message.commit()
225+
} catch (error) {
226+
if (error instanceof ResponseError) {
227+
return this.handleResponseErrorOnCommit(error)
228+
}
229+
throw error
230+
}
231+
}
232+
233+
private handleResponseErrorOnCommit(responseError: ResponseError) {
234+
// Some errors are expected during group rebalancing, so we handle them gracefully
235+
for (const error of responseError.errors) {
236+
if (
237+
error instanceof ProtocolError &&
238+
error.apiCode &&
239+
commitErrorCodesToIgnore.has(error.apiCode)
240+
) {
241+
this.logger.error(
242+
{
243+
apiCode: error.apiCode,
244+
apiId: error.apiId,
245+
responseErrorMessage: responseError.message,
246+
protocolErrorMessage: error.message,
247+
error: responseError,
248+
},
249+
`Failed to commit message: ${error.message}`,
250+
)
251+
} else {
252+
// If error is not recognized, rethrow it
253+
throw responseError
254+
}
255+
}
256+
}
257+
195258
private buildTransactionName(message: Message<string, object, string, string>) {
196259
const messageType = this.resolveMessageType(message.value)
197260

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
// Kafka protocol error codes (see https://kafka.apache.org/11/protocol.html)
2+
export const ILLEGAL_GENERATION = 22
3+
export const UNKNOWN_MEMBER_ID = 25
4+
export const REBALANCE_IN_PROGRESS = 27

packages/kafka/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/kafka",
3-
"version": "0.2.0",
3+
"version": "0.2.1",
44
"engines": {
55
"node": ">= 22.14.0"
66
},
@@ -50,15 +50,15 @@
5050
"@confluentinc/kafka-javascript": "^1.3.0",
5151
"@lokalise/node-core": "^14.1.0",
5252
"@lokalise/universal-ts-utils": "^4.4.1",
53-
"@platformatic/kafka": "^1.3.0",
53+
"@platformatic/kafka": "^1.7.0",
5454
"zod": "^3.25.7"
5555
},
5656
"peerDependencies": {
5757
"@message-queue-toolkit/core": "21.3.0",
5858
"@message-queue-toolkit/schemas": "6.2.0"
5959
},
6060
"devDependencies": {
61-
"@biomejs/biome": "1.9.4",
61+
"@biomejs/biome": "^1.9.4",
6262
"@lokalise/biome-config": "^2.0.0",
6363
"@lokalise/tsconfig": "^2.0.0",
6464
"@message-queue-toolkit/core": "21.3.0",

packages/kafka/vitest.config.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ export default defineConfig({
1616
include: ['lib/**/*.ts'],
1717
exclude: ['vitest.config.ts', 'lib/**/index.ts'],
1818
thresholds: {
19-
lines: 100,
20-
functions: 100,
21-
branches: 100,
22-
statements: 100,
19+
lines: 93,
20+
functions: 97,
21+
branches: 99,
22+
statements: 93,
2323
},
2424
},
2525
},

packages/metrics/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
"prom-client": ">=15.0.0"
3434
},
3535
"devDependencies": {
36-
"@biomejs/biome": "1.9.4",
36+
"@biomejs/biome": "^1.9.4",
3737
"@lokalise/biome-config": "^2.0.0",
3838
"@lokalise/tsconfig": "^2.0.0",
3939
"@message-queue-toolkit/core": "*",

packages/outbox-core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
"@message-queue-toolkit/schemas": ">=7.0.0"
3636
},
3737
"devDependencies": {
38-
"@biomejs/biome": "1.9.4",
38+
"@biomejs/biome": "^1.9.4",
3939
"@lokalise/biome-config": "^2.0.0",
4040
"@lokalise/tsconfig": "^2.0.0",
4141
"@message-queue-toolkit/core": "*",

packages/redis-message-deduplication-store/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"ioredis": "^5.3.2"
3737
},
3838
"devDependencies": {
39-
"@biomejs/biome": "1.9.4",
39+
"@biomejs/biome": "^1.9.4",
4040
"@lokalise/biome-config": "^2.0.0",
4141
"@lokalise/tsconfig": "^2.0.0",
4242
"@message-queue-toolkit/core": "*",

packages/s3-payload-store/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
},
3939
"devDependencies": {
4040
"@message-queue-toolkit/core": "*",
41-
"@biomejs/biome": "1.9.4",
41+
"@biomejs/biome": "^1.9.4",
4242
"@lokalise/biome-config": "^2.0.0",
4343
"@lokalise/tsconfig": "^2.0.0",
4444
"@types/node": "^24.0.3",

0 commit comments

Comments
 (0)