diff --git a/packages/kafka/lib/AbstractKafkaConsumer.spec.ts b/packages/kafka/lib/AbstractKafkaConsumer.spec.ts deleted file mode 100644 index 5792bdd3..00000000 --- a/packages/kafka/lib/AbstractKafkaConsumer.spec.ts +++ /dev/null @@ -1,83 +0,0 @@ -import { AbstractKafkaConsumer } from './AbstractKafkaConsumer.ts' - -describe('AbstractKafkaConsumer - handleSyncStreamBatch', () => { - it('must call kafkaStream.pause before and kafkaStream.resume after consuming messages', async () => { - // Given - const logger = { - debug: vi.fn(), - error: vi.fn(), - child: vi.fn(() => logger), - } - - const dependencies = { - logger, - errorReporter: { report: vi.fn() }, - messageMetricsManager: undefined, - transactionObservabilityManager: { - start: vi.fn(), - stop: vi.fn(), - }, - } as any - - const options = { - kafka: { bootstrapBrokers: ['localhost:9092'], clientId: 'test-client' }, - groupId: 'test-group', - handlers: {}, - batchProcessingEnabled: true, - batchProcessingOptions: { - batchSize: 10, - timeoutMilliseconds: 1_000, - }, - } as any - - const executionContext = {} - - // Bypass abstract/visibility checks with `as any` - const consumer = new (AbstractKafkaConsumer as any)( - dependencies, - options, - executionContext, - ) as any - - const calls: string[] = [] - - consumer.consume = vi.fn(() => { - calls.push('consume') - }) - - const kafkaStream = { - pause: vi.fn(() => { - calls.push('pause') - }), - resume: vi.fn(() => { - calls.push('resume') - }), - } as any - - // Attach mocked stream so handleSyncStreamBatch uses it via this.consumerStream - ;(consumer as any).consumerStream = kafkaStream - - const messageBatch = { - topic: 'test-topic', - partition: 0, - messages: [{ value: { foo: 'bar' } }], - } - - const batchStream: any = { - async *[Symbol.asyncIterator]() { - // Add an await to satisfy async function lint rule - await Promise.resolve() - yield messageBatch - }, - } - - // When - await (consumer as any).handleSyncStreamBatch(batchStream) - - // Then - expect(calls).toEqual(['pause', 'consume', 'resume']) - expect(kafkaStream.pause).toHaveBeenCalledTimes(1) - expect(kafkaStream.resume).toHaveBeenCalledTimes(1) - expect(consumer.consume).toHaveBeenCalledTimes(1) - }) -}) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 19aa8039..a64d5e0d 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -228,12 +228,10 @@ export abstract class AbstractKafkaConsumer< stream: KafkaMessageBatchStream>>, ): Promise { for await (const messageBatch of stream) { - this.consumerStream?.pause() await this.consume( messageBatch.topic, messageBatch.messages as DeserializedMessage>, ) - this.consumerStream?.resume() } } @@ -434,9 +432,8 @@ export abstract class AbstractKafkaConsumer< private commit(messageOrBatch: MessageOrBatch>) { if (Array.isArray(messageOrBatch)) { - if (messageOrBatch.length === 0) { - return Promise.resolve() - } + if (messageOrBatch.length === 0) return Promise.resolve() + // biome-ignore lint/style/noNonNullAssertion: we check the length above return this.commitMessage(messageOrBatch[messageOrBatch.length - 1]!) } else { @@ -445,13 +442,18 @@ export abstract class AbstractKafkaConsumer< } private async commitMessage(message: DeserializedMessage>) { + const logDetails = { + topic: message.topic, + offset: message.offset, + timestamp: message.timestamp, + } + this.logger.debug(logDetails, 'Trying to commit message') + try { - this.logger.debug( - { topic: message.topic, offset: message.offset, timestamp: message.timestamp }, - 'Trying to commit message', - ) await message.commit() + this.logger.debug(logDetails, 'Message committed successfully') } catch (error) { + this.logger.debug(logDetails, 'Message commit failed') if (error instanceof ResponseError) return this.handleResponseErrorOnCommit(error) throw error }