From 04ed9ff65a4992c15b93408d4f309c7bc02b88f4 Mon Sep 17 00:00:00 2001 From: Irfan Hodzic Date: Mon, 15 Dec 2025 15:21:28 +0100 Subject: [PATCH 1/3] pause and resume stream --- .../kafka/lib/AbstractKafkaConsumer.spec.ts | 83 +++++++++++++++++++ packages/kafka/lib/AbstractKafkaConsumer.ts | 2 + 2 files changed, 85 insertions(+) create mode 100644 packages/kafka/lib/AbstractKafkaConsumer.spec.ts diff --git a/packages/kafka/lib/AbstractKafkaConsumer.spec.ts b/packages/kafka/lib/AbstractKafkaConsumer.spec.ts new file mode 100644 index 00000000..5792bdd3 --- /dev/null +++ b/packages/kafka/lib/AbstractKafkaConsumer.spec.ts @@ -0,0 +1,83 @@ +import { AbstractKafkaConsumer } from './AbstractKafkaConsumer.ts' + +describe('AbstractKafkaConsumer - handleSyncStreamBatch', () => { + it('must call kafkaStream.pause before and kafkaStream.resume after consuming messages', async () => { + // Given + const logger = { + debug: vi.fn(), + error: vi.fn(), + child: vi.fn(() => logger), + } + + const dependencies = { + logger, + errorReporter: { report: vi.fn() }, + messageMetricsManager: undefined, + transactionObservabilityManager: { + start: vi.fn(), + stop: vi.fn(), + }, + } as any + + const options = { + kafka: { bootstrapBrokers: ['localhost:9092'], clientId: 'test-client' }, + groupId: 'test-group', + handlers: {}, + batchProcessingEnabled: true, + batchProcessingOptions: { + batchSize: 10, + timeoutMilliseconds: 1_000, + }, + } as any + + const executionContext = {} + + // Bypass abstract/visibility checks with `as any` + const consumer = new (AbstractKafkaConsumer as any)( + dependencies, + options, + executionContext, + ) as any + + const calls: string[] = [] + + consumer.consume = vi.fn(() => { + calls.push('consume') + }) + + const kafkaStream = { + pause: vi.fn(() => { + calls.push('pause') + }), + resume: vi.fn(() => { + calls.push('resume') + }), + } as any + + // Attach mocked stream so handleSyncStreamBatch uses it via this.consumerStream + ;(consumer as any).consumerStream = kafkaStream + + const messageBatch = { + topic: 'test-topic', + partition: 0, + messages: [{ value: { foo: 'bar' } }], + } + + const batchStream: any = { + async *[Symbol.asyncIterator]() { + // Add an await to satisfy async function lint rule + await Promise.resolve() + yield messageBatch + }, + } + + // When + await (consumer as any).handleSyncStreamBatch(batchStream) + + // Then + expect(calls).toEqual(['pause', 'consume', 'resume']) + expect(kafkaStream.pause).toHaveBeenCalledTimes(1) + expect(kafkaStream.resume).toHaveBeenCalledTimes(1) + expect(consumer.consume).toHaveBeenCalledTimes(1) + }) +}) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index 0e5764af..19aa8039 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -228,10 +228,12 @@ export abstract class AbstractKafkaConsumer< stream: KafkaMessageBatchStream>>, ): Promise { for await (const messageBatch of stream) { + this.consumerStream?.pause() await this.consume( messageBatch.topic, messageBatch.messages as DeserializedMessage>, ) + this.consumerStream?.resume() } } From 4b07a2a962ceaf7f347a453f6ff6f882aee7c05b Mon Sep 17 00:00:00 2001 From: Irfan Hodzic Date: Mon, 15 Dec 2025 15:22:02 +0100 Subject: [PATCH 2/3] bump kafka version --- packages/kafka/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kafka/package.json b/packages/kafka/package.json index cf7bd405..d46543d3 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.8.1", + "version": "0.8.2", "engines": { "node": ">= 22.14.0" }, From 187f0672ada56db277baed83ff3f87865513af0c Mon Sep 17 00:00:00 2001 From: Irfan Hodzic Date: Mon, 15 Dec 2025 15:28:06 +0100 Subject: [PATCH 3/3] revert version bump --- packages/kafka/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kafka/package.json b/packages/kafka/package.json index d46543d3..cf7bd405 100644 --- a/packages/kafka/package.json +++ b/packages/kafka/package.json @@ -1,6 +1,6 @@ { "name": "@message-queue-toolkit/kafka", - "version": "0.8.2", + "version": "0.8.1", "engines": { "node": ">= 22.14.0" },