Skip to content

Commit c416d21

Browse files
committed
fix: call fetch on resume if necessary
Signed-off-by: Mirza Brunjadze <[email protected]>
1 parent d6a116d commit c416d21

File tree

3 files changed

+23
-10
lines changed

3 files changed

+23
-10
lines changed

src/clients/consumer/consumer.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -325,19 +325,26 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
325325
throw new UserError('Cannot resume partitions before joining a consumer group.')
326326
}
327327

328+
let emitResumeEvent = false
328329
for (const { topic, partition } of partitions) {
329330
const assignment = this.assignments.find(a => a.topic === topic)
330331
if (!assignment) {
331332
throw new UserError(`Topic '${topic}' is not assigned to this consumer.`, { topic })
332333
}
333334

334335
const existing = this.#pausedPartitions.get(topic)
335-
if (existing) {
336+
if (existing?.has(partition)) {
337+
emitResumeEvent = true
336338
existing.delete(partition)
337-
if (existing.size === 0) {
338-
this.#pausedPartitions.delete(topic)
339-
}
340339
}
340+
341+
if (existing?.size === 0) {
342+
this.#pausedPartitions.delete(topic)
343+
}
344+
}
345+
346+
if (emitResumeEvent) {
347+
this.emitWithDebug('consumer', 'user:resume', { partitions })
341348
}
342349
}
343350

src/clients/consumer/messages-stream.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,14 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
144144
})
145145
})
146146

147+
this.#consumer.on('consumer:user:resume', () => {
148+
if (this.#inflightNodes.size > 0 || this.#shouldClose || this.closed || this.destroyed) {
149+
return
150+
}
151+
152+
this.#fetch()
153+
})
154+
147155
if (consumer[kPrometheus]) {
148156
this.#metricsConsumedMessages = ensureMetric<Counter>(
149157
consumer[kPrometheus],
@@ -563,9 +571,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
563571
}
564572

565573
if (canPush && !(this.#shouldClose || this.closed || this.destroyed)) {
566-
process.nextTick(() => {
567-
this.#fetch()
568-
})
574+
this.#fetch()
569575
}
570576
}
571577

test/clients/consumer/consumer.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3270,11 +3270,11 @@ test('pause should prevent fetches from paused partitions during consumption', a
32703270
const messages = Array.from({ length: 10 }, (_, i) => ({
32713271
topic,
32723272
key: `key-${i}`,
3273-
value: `value-${i}`,
3274-
partition: i % 2
3273+
value: `value-${i}`
32753274
}))
32763275

3277-
await produceTestMessages({ t, messages })
3276+
await produceTestMessages({ t, messages: messages.slice(0, 5), overrideOptions: { partitioner: () => 0 } })
3277+
await produceTestMessages({ t, messages: messages.slice(5, 10), overrideOptions: { partitioner: () => 1 } })
32783278

32793279
const consumer = createConsumer(t)
32803280
const stream = await consumer.consume({

0 commit comments

Comments
 (0)