Skip to content

Commit 83b3690

Browse files
committed
feat: add pause/resume consumer api
Signed-off-by: Mirza Brunjadze <[email protected]>
1 parent 87aac69 commit 83b3690

File tree

5 files changed

+323
-3
lines changed

5 files changed

+323
-3
lines changed

docs/consumer.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,43 @@ If `force` is not `true`, then the method will throw an error if any `MessagesSt
183183

184184
The return value is `void`.
185185

186+
### `pause(partitions)`
187+
188+
Pauses message consumption for specific topic-partitions.
189+
190+
The parameter is an array of `TopicPartition` objects, where each object has:
191+
192+
| Property | Type | Description |
193+
| --------- | -------- | ----------------------------- |
194+
| topic | `string` | The topic name. |
195+
| partition | `number` | The partition number to pause |
196+
197+
**Important:** Pausing only prevents new fetch requests for the specified partitions. Messages that are already buffered in the stream's internal buffer may still be emitted after calling `pause()`.
198+
199+
The return value is `void`.
200+
201+
### `resume(partitions)`
202+
203+
Resumes message consumption for specific topic-partitions that were previously paused.
204+
205+
The parameter is an array of `TopicPartition` objects with the same structure as `pause()`.
206+
207+
The return value is `void`.
208+
209+
### `paused()`
210+
211+
Returns an array of all currently paused topic-partitions.
212+
213+
The return value is an array of `TopicPartition` objects.
214+
215+
### `isPaused(partition)`
216+
217+
Checks if a specific topic-partition is currently paused.
218+
219+
The parameter is a `TopicPartition` object.
220+
221+
The return value is `boolean`.
222+
186223
## FAQs
187224

188225
### My consumer is not receiving any message when the application restarts

src/clients/consumer/consumer.ts

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ import {
9898
type ListCommitsOptions,
9999
type ListOffsetsOptions,
100100
type Offsets,
101-
type OffsetsWithTimestamps
101+
type OffsetsWithTimestamps,
102+
type TopicPartition
102103
} from './types.ts'
103104

104105
export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderValue = Buffer> extends Base<
@@ -118,7 +119,8 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
118119
#heartbeatInterval: NodeJS.Timeout | null
119120
#lastHeartbeat: Date | null
120121
#streams: Set<MessagesStream<Key, Value, HeaderKey, HeaderValue>>
121-
#partitionsAssigner: GroupPartitionsAssigner;
122+
#partitionsAssigner: GroupPartitionsAssigner
123+
#pausedPartitions: Map<string, Set<number>>;
122124
/*
123125
The following requests are blocking in Kafka:
124126
@@ -158,6 +160,7 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
158160
this.#heartbeatInterval = null
159161
this.#lastHeartbeat = null
160162
this.#streams = new Set()
163+
this.#pausedPartitions = new Map()
161164
this.#partitionsAssigner = this[kOptions].partitionAssigner ?? roundRobinAssigner
162165

163166
this.#validateGroupOptions(this[kOptions], groupIdAndOptionsValidator)
@@ -297,6 +300,58 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
297300
return callback![kCallbackPromise]
298301
}
299302

303+
pause (partitions: TopicPartition[]): void {
304+
if (!this.assignments) {
305+
throw new UserError('Cannot pause partitions before joining a consumer group.')
306+
}
307+
308+
for (const { topic, partition } of partitions) {
309+
const assignment = this.assignments.find(a => a.topic === topic)
310+
if (!assignment) {
311+
throw new UserError(`Topic '${topic}' is not assigned to this consumer.`, { topic })
312+
}
313+
314+
const existing = this.#pausedPartitions.get(topic)
315+
if (existing) {
316+
existing.add(partition)
317+
} else {
318+
this.#pausedPartitions.set(topic, new Set([partition]))
319+
}
320+
}
321+
}
322+
323+
resume (partitions: TopicPartition[]): void {
324+
if (!this.assignments) {
325+
throw new UserError('Cannot resume partitions before joining a consumer group.')
326+
}
327+
328+
for (const { topic, partition } of partitions) {
329+
const assignment = this.assignments.find(a => a.topic === topic)
330+
if (!assignment) {
331+
throw new UserError(`Topic '${topic}' is not assigned to this consumer.`, { topic })
332+
}
333+
334+
const existing = this.#pausedPartitions.get(topic)
335+
if (existing) {
336+
existing.delete(partition)
337+
if (existing.size === 0) {
338+
this.#pausedPartitions.delete(topic)
339+
}
340+
}
341+
}
342+
}
343+
344+
paused (): TopicPartition[] {
345+
return this.#pausedPartitions
346+
.entries()
347+
.flatMap(([topic, partitions]) => Array.from(partitions).map(partition => ({ topic, partition })))
348+
.toArray()
349+
}
350+
351+
isPaused (partition: TopicPartition): boolean {
352+
return !!this.#pausedPartitions.get(partition.topic)?.has(partition.partition)
353+
}
354+
300355
fetch (options: FetchOptions<Key, Value, HeaderKey, HeaderValue>, callback: CallbackWithPromise<FetchResponse>): void
301356
fetch (options: FetchOptions<Key, Value, HeaderKey, HeaderValue>): Promise<FetchResponse>
302357
fetch (

src/clients/consumer/messages-stream.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,10 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
372372
const partitions = assignment.partitions
373373

374374
for (const partition of partitions) {
375+
if (this.#consumer.isPaused({ topic, partition })) {
376+
continue
377+
}
378+
375379
const leader = metadata.topics.get(topic)!.partitions[partition].leader
376380

377381
if (this.#inflightNodes.has(leader)) {

src/clients/consumer/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@ export interface GroupProtocolSubscription {
1010
metadata?: Buffer | string
1111
}
1212

13+
export interface TopicPartition {
14+
topic: string
15+
partition: number
16+
}
17+
1318
export interface GroupAssignment {
1419
topic: string
1520
partitions: number[]

test/clients/consumer/consumer.test.ts

Lines changed: 220 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { deepStrictEqual, ok, strictEqual } from 'node:assert'
1+
import { deepStrictEqual, ok, strictEqual, doesNotThrow } from 'node:assert'
22
import { randomUUID } from 'node:crypto'
33
import { once } from 'node:events'
44
import { test, type TestContext } from 'node:test'
@@ -3264,3 +3264,222 @@ test('metrics should track the number of active topics', async t => {
32643264
deepStrictEqual(activeTopics.values[0].value, 0)
32653265
}
32663266
})
3267+
3268+
test('pause should prevent fetches from paused partitions during consumption', async t => {
3269+
const topic = await createTopic(t, true, 2)
3270+
const messages = Array.from({ length: 10 }, (_, i) => ({
3271+
topic,
3272+
key: `key-${i}`,
3273+
value: `value-${i}`,
3274+
partition: i % 2
3275+
}))
3276+
3277+
await produceTestMessages({ t, messages })
3278+
3279+
const consumer = createConsumer(t)
3280+
const stream = await consumer.consume({
3281+
topics: [topic],
3282+
mode: MessagesStreamModes.EARLIEST,
3283+
autocommit: true
3284+
})
3285+
3286+
consumer.pause([{ topic, partition: 0 }])
3287+
3288+
const received: number[] = []
3289+
let count = 0
3290+
3291+
for await (const message of stream) {
3292+
received.push(message.partition)
3293+
count++
3294+
3295+
if (count === 5) {
3296+
break
3297+
}
3298+
}
3299+
3300+
strictEqual(
3301+
received.every(p => p === 1),
3302+
true
3303+
)
3304+
})
3305+
3306+
test('resume should allow fetches from previously paused partitions', async t => {
3307+
const topic = await createTopic(t, true, 2)
3308+
const messages = Array.from({ length: 10 }, (_, i) => ({
3309+
topic,
3310+
key: `key-${i}`,
3311+
value: `value-${i}`
3312+
}))
3313+
3314+
await produceTestMessages({ t, messages: messages.slice(0, 5), overrideOptions: { partitioner: () => 0 } })
3315+
await produceTestMessages({ t, messages: messages.slice(5, 10), overrideOptions: { partitioner: () => 1 } })
3316+
3317+
const consumer = createConsumer(t)
3318+
const stream = await consumer.consume({
3319+
topics: [topic],
3320+
mode: MessagesStreamModes.EARLIEST,
3321+
autocommit: true,
3322+
maxWaitTime: 100
3323+
})
3324+
3325+
consumer.pause([{ topic, partition: 0 }])
3326+
3327+
const received: { key: string; partition: number }[] = []
3328+
let count = 0
3329+
3330+
for await (const message of stream) {
3331+
received.push({ key: message.key.toString(), partition: message.partition })
3332+
count++
3333+
3334+
if (count === 5) {
3335+
consumer.resume([{ topic, partition: 0 }])
3336+
}
3337+
3338+
if (count === 10) {
3339+
break
3340+
}
3341+
}
3342+
3343+
strictEqual(
3344+
received.slice(0, 5).every(m => m.partition === 1),
3345+
true
3346+
)
3347+
strictEqual(
3348+
received.slice(5, 10).every(m => m.partition === 0),
3349+
true
3350+
)
3351+
})
3352+
3353+
test('resume should handle resuming non-paused partitions gracefully', async t => {
3354+
const topic = await createTopic(t, true)
3355+
const consumer = createConsumer(t)
3356+
3357+
consumer.topics.trackAll(topic)
3358+
await consumer.joinGroup({})
3359+
3360+
doesNotThrow(() => consumer.resume([{ topic, partition: 0 }]))
3361+
})
3362+
3363+
test('pause/resume should throw error if consumer has not joined a group', async t => {
3364+
const topic = await createTopic(t, true)
3365+
const consumer = createConsumer(t)
3366+
3367+
try {
3368+
consumer.pause([{ topic, partition: 0 }])
3369+
throw new Error('Expected error not thrown')
3370+
} catch (error) {
3371+
strictEqual(error instanceof UserError, true)
3372+
strictEqual(error.message, 'Cannot pause partitions before joining a consumer group.')
3373+
}
3374+
3375+
try {
3376+
consumer.resume([{ topic, partition: 0 }])
3377+
throw new Error('Expected error not thrown')
3378+
} catch (error) {
3379+
strictEqual(error instanceof UserError, true)
3380+
strictEqual(error.message, 'Cannot resume partitions before joining a consumer group.')
3381+
}
3382+
})
3383+
3384+
test('pause/resume should throw error if topic is not assigned to consumer', async t => {
3385+
const topic1 = await createTopic(t, true)
3386+
const topic2 = await createTopic(t, true)
3387+
const consumer = createConsumer(t)
3388+
3389+
consumer.topics.trackAll(topic1)
3390+
await consumer.joinGroup({})
3391+
3392+
try {
3393+
consumer.pause([{ topic: topic2, partition: 0 }])
3394+
throw new Error('Expected error not thrown')
3395+
} catch (error) {
3396+
strictEqual(error instanceof UserError, true)
3397+
strictEqual(error.message, `Topic '${topic2}' is not assigned to this consumer.`)
3398+
}
3399+
3400+
try {
3401+
consumer.resume([{ topic: topic2, partition: 0 }])
3402+
throw new Error('Expected error not thrown')
3403+
} catch (error) {
3404+
strictEqual(error instanceof UserError, true)
3405+
strictEqual(error.message, `Topic '${topic2}' is not assigned to this consumer.`)
3406+
}
3407+
})
3408+
3409+
test('pause/resume should handle multiple topic-partitions', async t => {
3410+
const topic1 = await createTopic(t, true)
3411+
const topic2 = await createTopic(t, true)
3412+
const consumer = createConsumer(t)
3413+
3414+
consumer.topics.trackAll(topic1, topic2)
3415+
await consumer.joinGroup({})
3416+
3417+
consumer.pause([
3418+
{ topic: topic1, partition: 0 },
3419+
{ topic: topic2, partition: 0 }
3420+
])
3421+
3422+
strictEqual(consumer.isPaused({ topic: topic1, partition: 0 }), true)
3423+
strictEqual(consumer.isPaused({ topic: topic2, partition: 0 }), true)
3424+
3425+
consumer.resume([
3426+
{ topic: topic1, partition: 0 },
3427+
{ topic: topic2, partition: 0 }
3428+
])
3429+
3430+
strictEqual(consumer.isPaused({ topic: topic1, partition: 0 }), false)
3431+
strictEqual(consumer.isPaused({ topic: topic2, partition: 0 }), false)
3432+
})
3433+
3434+
test('paused should return all paused topic-partitions', async t => {
3435+
const topic1 = await createTopic(t, true, 2)
3436+
const topic2 = await createTopic(t, true, 2)
3437+
const consumer = createConsumer(t)
3438+
3439+
consumer.topics.trackAll(topic1, topic2)
3440+
await consumer.joinGroup({})
3441+
3442+
consumer.pause([
3443+
{ topic: topic1, partition: 0 },
3444+
{ topic: topic1, partition: 1 },
3445+
{ topic: topic2, partition: 0 }
3446+
])
3447+
3448+
deepStrictEqual(consumer.paused(), [
3449+
{ topic: topic1, partition: 0 },
3450+
{ topic: topic1, partition: 1 },
3451+
{ topic: topic2, partition: 0 }
3452+
])
3453+
})
3454+
3455+
test('isPaused should return true for paused topic-partitions', async t => {
3456+
const topic = await createTopic(t, true)
3457+
const consumer = createConsumer(t)
3458+
3459+
consumer.topics.trackAll(topic)
3460+
await consumer.joinGroup({})
3461+
3462+
consumer.pause([{ topic, partition: 0 }])
3463+
strictEqual(consumer.isPaused({ topic, partition: 0 }), true)
3464+
})
3465+
3466+
test('isPaused should return false for non-paused topic-partitions', async t => {
3467+
const topic = await createTopic(t, true)
3468+
const consumer = createConsumer(t)
3469+
3470+
consumer.topics.trackAll(topic)
3471+
await consumer.joinGroup({})
3472+
3473+
strictEqual(consumer.isPaused({ topic, partition: 0 }), false)
3474+
})
3475+
3476+
test('isPaused should return false for topic-partitions not assigned to consumer', async t => {
3477+
const topic1 = await createTopic(t, true)
3478+
const topic2 = await createTopic(t, true)
3479+
const consumer = createConsumer(t)
3480+
3481+
consumer.topics.trackAll(topic1)
3482+
await consumer.joinGroup({})
3483+
3484+
strictEqual(consumer.isPaused({ topic: topic2, partition: 0 }), false)
3485+
})

0 commit comments

Comments
 (0)