Skip to content

Commit 2ce88da

Browse files
committed
feat: implement pause/resume consumer api
Signed-off-by: Mirza Brunjadze <[email protected]>
1 parent 265c9fc commit 2ce88da

File tree

5 files changed

+338
-5
lines changed

5 files changed

+338
-5
lines changed

docs/consumer.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,39 @@ If `force` is not `true`, then the method will throw an error if any `MessagesSt
187187

188188
The return value is `void`.
189189

190+
### `pause(topicPartitions)`
191+
192+
Pauses message consumption for specific topic-partitions. Pausing only prevents new fetch requests for the specified partitions. Messages that are already buffered in the stream's internal buffer may still be emitted after calling `pause()`. This method will throw an error if called before joining a consumer group or if the specified topic is not assigned to this consumer.
193+
194+
The parameter is an array of `TopicPartitions` objects, where each object has:
195+
196+
| Property | Type | Description |
197+
| ---------- | ---------- | ------------------- |
198+
| topic | `string` | The topic name. |
199+
| partitions | `number[]` | Array of partitions |
200+
201+
The return value is `void`.
202+
203+
### `resume(topicPartitions)`
204+
205+
Resumes message consumption for specific topic-partitions that were previously paused. This method will throw an error if called before joining a consumer group or if the specified topic is not assigned to this consumer.
206+
207+
The parameter is an array of `TopicPartitions` objects (same structure as `pause()`).
208+
209+
The return value is `void`.
210+
211+
### `paused()`
212+
213+
Returns an array of all currently paused topic-partitions.
214+
215+
The return value is an array of `TopicPartitions` objects (same structure as `pause()`).
216+
217+
### `isPaused(topic, partition)`
218+
219+
Checks if a specific topic-partition is currently paused.
220+
221+
The return value is `boolean`.
222+
190223
## FAQs
191224

192225
### My consumer is not receiving any message when the application restarts

src/clients/consumer/consumer.ts

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ import {
9999
type ListCommitsOptions,
100100
type ListOffsetsOptions,
101101
type Offsets,
102-
type OffsetsWithTimestamps
102+
type OffsetsWithTimestamps,
103+
type TopicPartitions
103104
} from './types.ts'
104105

105106
export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderValue = Buffer> extends Base<
@@ -118,8 +119,8 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
118119
#coordinatorId: number | null
119120
#heartbeatInterval: NodeJS.Timeout | null
120121
#lastHeartbeat: Date | null
121-
#streams: Set<MessagesStream<Key, Value, HeaderKey, HeaderValue>>;
122-
122+
#streams: Set<MessagesStream<Key, Value, HeaderKey, HeaderValue>>
123+
#pausedPartitions: Map<string, Set<number>>;
123124
/*
124125
The following requests are blocking in Kafka:
125126
@@ -159,6 +160,7 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
159160
this.#heartbeatInterval = null
160161
this.#lastHeartbeat = null
161162
this.#streams = new Set()
163+
this.#pausedPartitions = new Map()
162164

163165
this.#validateGroupOptions(this[kOptions], groupIdAndOptionsValidator)
164166

@@ -297,6 +299,74 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
297299
return callback![kCallbackPromise]
298300
}
299301

302+
pause (topicPartitions: TopicPartitions[]): void {
303+
if (!this.assignments) {
304+
throw new UserError('Cannot pause partitions before joining a consumer group.')
305+
}
306+
307+
for (const { topic, partitions } of topicPartitions) {
308+
const assignment = this.assignments.find(a => a.topic === topic)
309+
if (!assignment) {
310+
throw new UserError(`Topic '${topic}' is not assigned to this consumer.`, { topic })
311+
}
312+
313+
for (const partition of partitions) {
314+
const existing = this.#pausedPartitions.get(topic)
315+
if (existing) {
316+
existing.add(partition)
317+
} else {
318+
this.#pausedPartitions.set(topic, new Set([partition]))
319+
}
320+
}
321+
}
322+
}
323+
324+
resume (topicPartitions: TopicPartitions[]): void {
325+
if (!this.assignments) {
326+
throw new UserError('Cannot resume partitions before joining a consumer group.')
327+
}
328+
329+
let emitResumeEvent = false
330+
for (const { topic, partitions } of topicPartitions) {
331+
const assignment = this.assignments.find(a => a.topic === topic)
332+
if (!assignment) {
333+
throw new UserError(`Topic '${topic}' is not assigned to this consumer.`, { topic })
334+
}
335+
336+
for (const partition of partitions) {
337+
const existing = this.#pausedPartitions.get(topic)
338+
if (existing?.has(partition)) {
339+
emitResumeEvent = true
340+
existing.delete(partition)
341+
}
342+
343+
if (existing?.size === 0) {
344+
this.#pausedPartitions.delete(topic)
345+
}
346+
}
347+
}
348+
349+
if (emitResumeEvent) {
350+
this.emitWithDebug('consumer', 'user:resume', { partitions: topicPartitions })
351+
}
352+
}
353+
354+
paused (): TopicPartitions[] {
355+
const result: TopicPartitions[] = []
356+
for (const [topic, partitions] of this.#pausedPartitions.entries()) {
357+
result.push({
358+
topic,
359+
partitions: Array.from(partitions)
360+
})
361+
}
362+
363+
return result
364+
}
365+
366+
isPaused (topic: string, partition: number): boolean {
367+
return !!this.#pausedPartitions.get(topic)?.has(partition)
368+
}
369+
300370
fetch (options: FetchOptions<Key, Value, HeaderKey, HeaderValue>, callback: CallbackWithPromise<FetchResponse>): void
301371
fetch (options: FetchOptions<Key, Value, HeaderKey, HeaderValue>): Promise<FetchResponse>
302372
fetch (

src/clients/consumer/messages-stream.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,14 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
144144
})
145145
})
146146

147+
this.#consumer.on('consumer:user:resume', () => {
148+
if (this.#shouldClose || this.closed || this.destroyed) {
149+
return
150+
}
151+
152+
this.#fetch()
153+
})
154+
147155
if (consumer[kPrometheus]) {
148156
this.#metricsConsumedMessages = ensureMetric<Counter>(
149157
consumer[kPrometheus],
@@ -372,6 +380,10 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
372380
const partitions = assignment.partitions
373381

374382
for (const partition of partitions) {
383+
if (this.#consumer.isPaused(topic, partition)) {
384+
continue
385+
}
386+
375387
const leader = metadata.topics.get(topic)!.partitions[partition].leader
376388

377389
if (this.#inflightNodes.has(leader)) {

src/clients/consumer/types.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ export interface GroupProtocolSubscription {
1010
metadata?: Buffer | string
1111
}
1212

13-
export interface GroupAssignment {
13+
export interface TopicPartitions {
1414
topic: string
1515
partitions: number[]
1616
}
1717

18+
export interface GroupAssignment extends TopicPartitions {}
19+
1820
export interface GroupPartitionsAssignments {
1921
memberId: string
2022
assignments: Map<string, GroupAssignment>

0 commit comments

Comments
 (0)