Skip to content

Commit 0adefab

Browse files
authored
Kafka improve logging (#383)
* Reverting prev changes * Improving commit logs
1 parent 61e2b57 commit 0adefab

File tree

2 files changed

+11
-92
lines changed

2 files changed

+11
-92
lines changed

packages/kafka/lib/AbstractKafkaConsumer.spec.ts

Lines changed: 0 additions & 83 deletions
This file was deleted.

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,10 @@ export abstract class AbstractKafkaConsumer<
228228
stream: KafkaMessageBatchStream<DeserializedMessage<SupportedMessageValues<TopicsConfig>>>,
229229
): Promise<void> {
230230
for await (const messageBatch of stream) {
231-
this.consumerStream?.pause()
232231
await this.consume(
233232
messageBatch.topic,
234233
messageBatch.messages as DeserializedMessage<SupportedMessageValues<TopicsConfig>>,
235234
)
236-
this.consumerStream?.resume()
237235
}
238236
}
239237

@@ -434,9 +432,8 @@ export abstract class AbstractKafkaConsumer<
434432

435433
private commit(messageOrBatch: MessageOrBatch<SupportedMessageValues<TopicsConfig>>) {
436434
if (Array.isArray(messageOrBatch)) {
437-
if (messageOrBatch.length === 0) {
438-
return Promise.resolve()
439-
}
435+
if (messageOrBatch.length === 0) return Promise.resolve()
436+
440437
// biome-ignore lint/style/noNonNullAssertion: we check the length above
441438
return this.commitMessage(messageOrBatch[messageOrBatch.length - 1]!)
442439
} else {
@@ -445,13 +442,18 @@ export abstract class AbstractKafkaConsumer<
445442
}
446443

447444
private async commitMessage(message: DeserializedMessage<SupportedMessageValues<TopicsConfig>>) {
445+
const logDetails = {
446+
topic: message.topic,
447+
offset: message.offset,
448+
timestamp: message.timestamp,
449+
}
450+
this.logger.debug(logDetails, 'Trying to commit message')
451+
448452
try {
449-
this.logger.debug(
450-
{ topic: message.topic, offset: message.offset, timestamp: message.timestamp },
451-
'Trying to commit message',
452-
)
453453
await message.commit()
454+
this.logger.debug(logDetails, 'Message committed successfully')
454455
} catch (error) {
456+
this.logger.debug(logDetails, 'Message commit failed')
455457
if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error)
456458
throw error
457459
}

0 commit comments

Comments
 (0)