Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions docs/consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,39 @@ If `force` is not `true`, then the method will throw an error if any `MessagesSt

The return value is `void`.

### `pause(topicPartitions)`

Pauses message consumption for specific topic-partitions. Pausing only prevents new fetch requests for the specified partitions. Messages that are already buffered in the stream's internal buffer may still be emitted after calling `pause()`. This method will throw an error if called before joining a consumer group or if the specified topic is not assigned to this consumer.

The parameter is an array of `TopicPartitions` objects, where each object has:

| Property | Type | Description |
| ---------- | ---------- | ------------------- |
| topic | `string` | The topic name. |
| partitions | `number[]` | Array of partitions |

The return value is `void`.

### `resume(topicPartitions)`

Resumes message consumption for specific topic-partitions that were previously paused. This method will throw an error if called before joining a consumer group or if the specified topic is not assigned to this consumer.

The parameter is an array of `TopicPartitions` objects (same structure as `pause()`).

The return value is `void`.

### `paused()`

Returns an array of all currently paused topic-partitions.

The return value is an array of `TopicPartitions` objects (same structure as `pause()`).

### `isPaused(topic, partition)`

Checks if a specific topic-partition is currently paused.

The return value is `boolean`.

## FAQs

### My consumer is not receiving any message when the application restarts
Expand Down
76 changes: 73 additions & 3 deletions src/clients/consumer/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ import {
type ListCommitsOptions,
type ListOffsetsOptions,
type Offsets,
type OffsetsWithTimestamps
type OffsetsWithTimestamps,
type TopicPartitions
} from './types.ts'

export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderValue = Buffer> extends Base<
Expand All @@ -118,8 +119,8 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
#coordinatorId: number | null
#heartbeatInterval: NodeJS.Timeout | null
#lastHeartbeat: Date | null
#streams: Set<MessagesStream<Key, Value, HeaderKey, HeaderValue>>;

#streams: Set<MessagesStream<Key, Value, HeaderKey, HeaderValue>>
#pausedPartitions: Map<string, Set<number>>;
/*
The following requests are blocking in Kafka:

Expand Down Expand Up @@ -159,6 +160,7 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
this.#heartbeatInterval = null
this.#lastHeartbeat = null
this.#streams = new Set()
this.#pausedPartitions = new Map()

this.#validateGroupOptions(this[kOptions], groupIdAndOptionsValidator)

Expand Down Expand Up @@ -297,6 +299,74 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
return callback![kCallbackPromise]
}

pause (topicPartitions: TopicPartitions[]): void {
if (!this.assignments) {
throw new UserError('Cannot pause partitions before joining a consumer group.')
}

for (const { topic, partitions } of topicPartitions) {
const assignment = this.assignments.find(a => a.topic === topic)
if (!assignment) {
throw new UserError(`Topic '${topic}' is not assigned to this consumer.`, { topic })
}

for (const partition of partitions) {
const existing = this.#pausedPartitions.get(topic)
if (existing) {
existing.add(partition)
} else {
this.#pausedPartitions.set(topic, new Set([partition]))
}
}
}
}

resume (topicPartitions: TopicPartitions[]): void {
if (!this.assignments) {
throw new UserError('Cannot resume partitions before joining a consumer group.')
}

let emitResumeEvent = false
for (const { topic, partitions } of topicPartitions) {
const assignment = this.assignments.find(a => a.topic === topic)
if (!assignment) {
throw new UserError(`Topic '${topic}' is not assigned to this consumer.`, { topic })
}

for (const partition of partitions) {
const existing = this.#pausedPartitions.get(topic)
if (existing?.has(partition)) {
emitResumeEvent = true
existing.delete(partition)
}

if (existing?.size === 0) {
this.#pausedPartitions.delete(topic)
}
}
}

if (emitResumeEvent) {
this.emitWithDebug('consumer', 'user:resume', { partitions: topicPartitions })
}
}

paused (): TopicPartitions[] {
const result: TopicPartitions[] = []
for (const [topic, partitions] of this.#pausedPartitions.entries()) {
result.push({
topic,
partitions: Array.from(partitions)
})
}

return result
}

isPaused (topic: string, partition: number): boolean {
return !!this.#pausedPartitions.get(topic)?.has(partition)
}

fetch (options: FetchOptions<Key, Value, HeaderKey, HeaderValue>, callback: CallbackWithPromise<FetchResponse>): void
fetch (options: FetchOptions<Key, Value, HeaderKey, HeaderValue>): Promise<FetchResponse>
fetch (
Expand Down
12 changes: 12 additions & 0 deletions src/clients/consumer/messages-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
})
})

this.#consumer.on('consumer:user:resume', () => {
if (this.#shouldClose || this.closed || this.destroyed) {
return
}

this.#fetch()
})

if (consumer[kPrometheus]) {
this.#metricsConsumedMessages = ensureMetric<Counter>(
consumer[kPrometheus],
Expand Down Expand Up @@ -372,6 +380,10 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
const partitions = assignment.partitions

for (const partition of partitions) {
if (this.#consumer.isPaused(topic, partition)) {
continue
}

const leader = metadata.topics.get(topic)!.partitions[partition].leader

if (this.#inflightNodes.has(leader)) {
Expand Down
4 changes: 3 additions & 1 deletion src/clients/consumer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ export interface GroupProtocolSubscription {
metadata?: Buffer | string
}

export interface GroupAssignment {
export interface TopicPartitions {
topic: string
partitions: number[]
}

export interface GroupAssignment extends TopicPartitions {}

export interface GroupPartitionsAssignments {
memberId: string
assignments: Map<string, GroupAssignment>
Expand Down
Loading