From 2ce88da0beb270e16210c7a842a87ae74f623509 Mon Sep 17 00:00:00 2001 From: Mirza Brunjadze Date: Sun, 12 Oct 2025 20:19:26 +0400 Subject: [PATCH] feat: implement pause/resume consumer api Signed-off-by: Mirza Brunjadze --- docs/consumer.md | 33 ++++ src/clients/consumer/consumer.ts | 76 ++++++++- src/clients/consumer/messages-stream.ts | 12 ++ src/clients/consumer/types.ts | 4 +- test/clients/consumer/consumer.test.ts | 218 +++++++++++++++++++++++- 5 files changed, 338 insertions(+), 5 deletions(-) diff --git a/docs/consumer.md b/docs/consumer.md index 776e040..0179965 100644 --- a/docs/consumer.md +++ b/docs/consumer.md @@ -187,6 +187,39 @@ If `force` is not `true`, then the method will throw an error if any `MessagesSt The return value is `void`. +### `pause(topicPartitions)` + +Pauses message consumption for specific topic-partitions. Pausing only prevents new fetch requests for the specified partitions. Messages that are already buffered in the stream's internal buffer may still be emitted after calling `pause()`. This method will throw an error if called before joining a consumer group or if the specified topic is not assigned to this consumer. + +The parameter is an array of `TopicPartitions` objects, where each object has: + +| Property | Type | Description | +| ---------- | ---------- | ------------------- | +| topic | `string` | The topic name. | +| partitions | `number[]` | Array of partitions | + +The return value is `void`. + +### `resume(topicPartitions)` + +Resumes message consumption for specific topic-partitions that were previously paused. This method will throw an error if called before joining a consumer group or if the specified topic is not assigned to this consumer. + +The parameter is an array of `TopicPartitions` objects (same structure as `pause()`). + +The return value is `void`. + +### `paused()` + +Returns an array of all currently paused topic-partitions. + +The return value is an array of `TopicPartitions` objects (same structure as `pause()`). + +### `isPaused(topic, partition)` + +Checks if a specific topic-partition is currently paused. + +The return value is `boolean`. + ## FAQs ### My consumer is not receiving any message when the application restarts diff --git a/src/clients/consumer/consumer.ts b/src/clients/consumer/consumer.ts index 4b18bd8..f4d167a 100644 --- a/src/clients/consumer/consumer.ts +++ b/src/clients/consumer/consumer.ts @@ -99,7 +99,8 @@ import { type ListCommitsOptions, type ListOffsetsOptions, type Offsets, - type OffsetsWithTimestamps + type OffsetsWithTimestamps, + type TopicPartitions } from './types.ts' export class Consumer extends Base< @@ -118,8 +119,8 @@ export class Consumer>; - + #streams: Set> + #pausedPartitions: Map>; /* The following requests are blocking in Kafka: @@ -159,6 +160,7 @@ export class Consumer a.topic === topic) + if (!assignment) { + throw new UserError(`Topic '${topic}' is not assigned to this consumer.`, { topic }) + } + + for (const partition of partitions) { + const existing = this.#pausedPartitions.get(topic) + if (existing) { + existing.add(partition) + } else { + this.#pausedPartitions.set(topic, new Set([partition])) + } + } + } + } + + resume (topicPartitions: TopicPartitions[]): void { + if (!this.assignments) { + throw new UserError('Cannot resume partitions before joining a consumer group.') + } + + let emitResumeEvent = false + for (const { topic, partitions } of topicPartitions) { + const assignment = this.assignments.find(a => a.topic === topic) + if (!assignment) { + throw new UserError(`Topic '${topic}' is not assigned to this consumer.`, { topic }) + } + + for (const partition of partitions) { + const existing = this.#pausedPartitions.get(topic) + if (existing?.has(partition)) { + emitResumeEvent = true + existing.delete(partition) + } + + if (existing?.size === 0) { + this.#pausedPartitions.delete(topic) + } + } + } + + if (emitResumeEvent) { + this.emitWithDebug('consumer', 'user:resume', { partitions: topicPartitions }) + } + } + + paused (): TopicPartitions[] { + const result: TopicPartitions[] = [] + for (const [topic, partitions] of this.#pausedPartitions.entries()) { + result.push({ + topic, + partitions: Array.from(partitions) + }) + } + + return result + } + + isPaused (topic: string, partition: number): boolean { + return !!this.#pausedPartitions.get(topic)?.has(partition) + } + fetch (options: FetchOptions, callback: CallbackWithPromise): void fetch (options: FetchOptions): Promise fetch ( diff --git a/src/clients/consumer/messages-stream.ts b/src/clients/consumer/messages-stream.ts index 4140a9c..e6eb810 100644 --- a/src/clients/consumer/messages-stream.ts +++ b/src/clients/consumer/messages-stream.ts @@ -144,6 +144,14 @@ export class MessagesStream extends Readable }) }) + this.#consumer.on('consumer:user:resume', () => { + if (this.#shouldClose || this.closed || this.destroyed) { + return + } + + this.#fetch() + }) + if (consumer[kPrometheus]) { this.#metricsConsumedMessages = ensureMetric( consumer[kPrometheus], @@ -372,6 +380,10 @@ export class MessagesStream extends Readable const partitions = assignment.partitions for (const partition of partitions) { + if (this.#consumer.isPaused(topic, partition)) { + continue + } + const leader = metadata.topics.get(topic)!.partitions[partition].leader if (this.#inflightNodes.has(leader)) { diff --git a/src/clients/consumer/types.ts b/src/clients/consumer/types.ts index dd2fee7..20c4c22 100644 --- a/src/clients/consumer/types.ts +++ b/src/clients/consumer/types.ts @@ -10,11 +10,13 @@ export interface GroupProtocolSubscription { metadata?: Buffer | string } -export interface GroupAssignment { +export interface TopicPartitions { topic: string partitions: number[] } +export interface GroupAssignment extends TopicPartitions {} + export interface GroupPartitionsAssignments { memberId: string assignments: Map diff --git a/test/clients/consumer/consumer.test.ts b/test/clients/consumer/consumer.test.ts index 53156e5..391b530 100644 --- a/test/clients/consumer/consumer.test.ts +++ b/test/clients/consumer/consumer.test.ts @@ -1,4 +1,4 @@ -import { deepStrictEqual, ok, strictEqual } from 'node:assert' +import { deepStrictEqual, ok, strictEqual, doesNotThrow } from 'node:assert' import { randomUUID } from 'node:crypto' import { once } from 'node:events' import { test, type TestContext } from 'node:test' @@ -3314,3 +3314,219 @@ test('metrics should track the number of active topics', async t => { deepStrictEqual(activeTopics.values[0].value, 0) } }) + +test('pause should prevent fetches from paused partitions during consumption', async t => { + const topic = await createTopic(t, true, 2) + const messages = Array.from({ length: 10 }, (_, i) => ({ + topic, + key: `key-${i}`, + value: `value-${i}` + })) + + const consumer = createConsumer(t) + const stream = await consumer.consume({ + topics: [topic], + mode: MessagesStreamModes.EARLIEST, + autocommit: true + }) + + consumer.pause([{ topic, partitions: [0] }]) + await produceTestMessages({ t, messages: messages.slice(0, 5), overrideOptions: { partitioner: () => 0 } }) + await produceTestMessages({ t, messages: messages.slice(5, 10), overrideOptions: { partitioner: () => 1 } }) + + const received: { key: string; partition: number }[] = [] + let count = 0 + + for await (const message of stream) { + received.push({ key: message.key.toString(), partition: message.partition }) + count++ + + if (count === 5) { + break + } + } + + strictEqual( + received.every(m => m.partition === 1), + true + ) +}) + +test('resume should allow fetches from previously paused partitions', async t => { + const topic = await createTopic(t, true, 2) + const messages = Array.from({ length: 10 }, (_, i) => ({ + topic, + key: `key-${i}`, + value: `value-${i}` + })) + + await produceTestMessages({ t, messages: messages.slice(0, 5), overrideOptions: { partitioner: () => 0 } }) + await produceTestMessages({ t, messages: messages.slice(5, 10), overrideOptions: { partitioner: () => 1 } }) + + const consumer = createConsumer(t) + const stream = await consumer.consume({ + topics: [topic], + mode: MessagesStreamModes.EARLIEST, + autocommit: true, + maxWaitTime: 100 + }) + + consumer.pause([{ topic, partitions: [0] }]) + + const received: { key: string; partition: number }[] = [] + let count = 0 + + for await (const message of stream) { + received.push({ key: message.key.toString(), partition: message.partition }) + count++ + + if (count === 5) { + consumer.resume([{ topic, partitions: [0] }]) + } + + if (count === 10) { + break + } + } + + strictEqual( + received.slice(0, 5).every(m => m.partition === 1), + true + ) + strictEqual( + received.slice(5, 10).every(m => m.partition === 0), + true + ) +}) + +test('resume should handle resuming non-paused partitions gracefully', async t => { + const topic = await createTopic(t, true) + const consumer = createConsumer(t) + + consumer.topics.trackAll(topic) + await consumer.joinGroup({}) + + doesNotThrow(() => consumer.resume([{ topic, partitions: [0] }])) +}) + +test('pause/resume should throw error if consumer has not joined a group', async t => { + const topic = await createTopic(t, true) + const consumer = createConsumer(t) + + try { + consumer.pause([{ topic, partitions: [0] }]) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof UserError, true) + strictEqual(error.message, 'Cannot pause partitions before joining a consumer group.') + } + + try { + consumer.resume([{ topic, partitions: [0] }]) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof UserError, true) + strictEqual(error.message, 'Cannot resume partitions before joining a consumer group.') + } +}) + +test('pause/resume should throw error if topic is not assigned to consumer', async t => { + const topic1 = await createTopic(t, true) + const topic2 = await createTopic(t, true) + const consumer = createConsumer(t) + + consumer.topics.trackAll(topic1) + await consumer.joinGroup({}) + + try { + consumer.pause([{ topic: topic2, partitions: [0] }]) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof UserError, true) + strictEqual(error.message, `Topic '${topic2}' is not assigned to this consumer.`) + } + + try { + consumer.resume([{ topic: topic2, partitions: [0] }]) + throw new Error('Expected error not thrown') + } catch (error) { + strictEqual(error instanceof UserError, true) + strictEqual(error.message, `Topic '${topic2}' is not assigned to this consumer.`) + } +}) + +test('pause/resume should handle multiple topic-partitions', async t => { + const topic1 = await createTopic(t, true) + const topic2 = await createTopic(t, true) + const consumer = createConsumer(t) + + consumer.topics.trackAll(topic1, topic2) + await consumer.joinGroup({}) + + consumer.pause([ + { topic: topic1, partitions: [0] }, + { topic: topic2, partitions: [0] } + ]) + + strictEqual(consumer.isPaused(topic1, 0), true) + strictEqual(consumer.isPaused(topic2, 0), true) + + consumer.resume([ + { topic: topic1, partitions: [0] }, + { topic: topic2, partitions: [0] } + ]) + + strictEqual(consumer.isPaused(topic1, 0), false) + strictEqual(consumer.isPaused(topic2, 0), false) +}) + +test('paused should return all paused topic-partitions', async t => { + const topic1 = await createTopic(t, true, 2) + const topic2 = await createTopic(t, true, 2) + const consumer = createConsumer(t) + + consumer.topics.trackAll(topic1, topic2) + await consumer.joinGroup({}) + + consumer.pause([ + { topic: topic1, partitions: [0, 1] }, + { topic: topic2, partitions: [0] } + ]) + + deepStrictEqual(consumer.paused(), [ + { topic: topic1, partitions: [0, 1] }, + { topic: topic2, partitions: [0] } + ]) +}) + +test('isPaused should return true for paused topic-partitions', async t => { + const topic = await createTopic(t, true) + const consumer = createConsumer(t) + + consumer.topics.trackAll(topic) + await consumer.joinGroup({}) + + consumer.pause([{ topic, partitions: [0] }]) + strictEqual(consumer.isPaused(topic, 0), true) +}) + +test('isPaused should return false for non-paused topic-partitions', async t => { + const topic = await createTopic(t, true) + const consumer = createConsumer(t) + + consumer.topics.trackAll(topic) + await consumer.joinGroup({}) + + strictEqual(consumer.isPaused(topic, 0), false) +}) + +test('isPaused should return false for topic-partitions not assigned to consumer', async t => { + const topic1 = await createTopic(t, true) + const topic2 = await createTopic(t, true) + const consumer = createConsumer(t) + + consumer.topics.trackAll(topic1) + await consumer.joinGroup({}) + + strictEqual(consumer.isPaused(topic2, 0), false) +})