Skip to content

Commit 44442d0

Browse files
committed
refactor: use same type as assignments for easier usage
Signed-off-by: Mirza Brunjadze <[email protected]>
1 parent c416d21 commit 44442d0

File tree

4 files changed

+64
-63
lines changed

4 files changed

+64
-63
lines changed

src/clients/consumer/consumer.ts

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ import {
9999
type ListOffsetsOptions,
100100
type Offsets,
101101
type OffsetsWithTimestamps,
102-
type TopicPartition
102+
type TopicPartitions
103103
} from './types.ts'
104104

105105
export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderValue = Buffer> extends Base<
@@ -300,67 +300,72 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
300300
return callback![kCallbackPromise]
301301
}
302302

303-
pause (partitions: TopicPartition[]): void {
303+
pause (topicPartitions: TopicPartitions[]): void {
304304
if (!this.assignments) {
305305
throw new UserError('Cannot pause partitions before joining a consumer group.')
306306
}
307307

308-
for (const { topic, partition } of partitions) {
308+
for (const { topic, partitions } of topicPartitions) {
309309
const assignment = this.assignments.find(a => a.topic === topic)
310310
if (!assignment) {
311311
throw new UserError(`Topic '${topic}' is not assigned to this consumer.`, { topic })
312312
}
313313

314-
const existing = this.#pausedPartitions.get(topic)
315-
if (existing) {
316-
existing.add(partition)
317-
} else {
318-
this.#pausedPartitions.set(topic, new Set([partition]))
314+
for (const partition of partitions) {
315+
const existing = this.#pausedPartitions.get(topic)
316+
if (existing) {
317+
existing.add(partition)
318+
} else {
319+
this.#pausedPartitions.set(topic, new Set([partition]))
320+
}
319321
}
320322
}
321323
}
322324

323-
resume (partitions: TopicPartition[]): void {
325+
resume (topicPartitions: TopicPartitions[]): void {
324326
if (!this.assignments) {
325327
throw new UserError('Cannot resume partitions before joining a consumer group.')
326328
}
327329

328330
let emitResumeEvent = false
329-
for (const { topic, partition } of partitions) {
331+
for (const { topic, partitions } of topicPartitions) {
330332
const assignment = this.assignments.find(a => a.topic === topic)
331333
if (!assignment) {
332334
throw new UserError(`Topic '${topic}' is not assigned to this consumer.`, { topic })
333335
}
334336

335-
const existing = this.#pausedPartitions.get(topic)
336-
if (existing?.has(partition)) {
337-
emitResumeEvent = true
338-
existing.delete(partition)
339-
}
337+
for (const partition of partitions) {
338+
const existing = this.#pausedPartitions.get(topic)
339+
if (existing?.has(partition)) {
340+
emitResumeEvent = true
341+
existing.delete(partition)
342+
}
340343

341-
if (existing?.size === 0) {
342-
this.#pausedPartitions.delete(topic)
344+
if (existing?.size === 0) {
345+
this.#pausedPartitions.delete(topic)
346+
}
343347
}
344348
}
345349

346350
if (emitResumeEvent) {
347-
this.emitWithDebug('consumer', 'user:resume', { partitions })
351+
this.emitWithDebug('consumer', 'user:resume', { partitions: topicPartitions })
348352
}
349353
}
350354

351-
paused (): TopicPartition[] {
352-
const result: TopicPartition[] = []
355+
paused (): TopicPartitions[] {
356+
const result: TopicPartitions[] = []
353357
for (const [topic, partitions] of this.#pausedPartitions.entries()) {
354-
for (const partition of partitions) {
355-
result.push({ topic, partition })
356-
}
358+
result.push({
359+
topic,
360+
partitions: Array.from(partitions)
361+
})
357362
}
358363

359364
return result
360365
}
361366

362-
isPaused (partition: TopicPartition): boolean {
363-
return !!this.#pausedPartitions.get(partition.topic)?.has(partition.partition)
367+
isPaused (topic: string, partition: number): boolean {
368+
return !!this.#pausedPartitions.get(topic)?.has(partition)
364369
}
365370

366371
fetch (options: FetchOptions<Key, Value, HeaderKey, HeaderValue>, callback: CallbackWithPromise<FetchResponse>): void

src/clients/consumer/messages-stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
380380
const partitions = assignment.partitions
381381

382382
for (const partition of partitions) {
383-
if (this.#consumer.isPaused({ topic, partition })) {
383+
if (this.#consumer.isPaused(topic, partition)) {
384384
continue
385385
}
386386

src/clients/consumer/types.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,13 @@ export interface GroupProtocolSubscription {
1010
metadata?: Buffer | string
1111
}
1212

13-
export interface TopicPartition {
13+
export interface TopicPartitions {
1414
topic: string
15-
partition: number
15+
partitions: number[]
1616
}
1717

18-
export interface GroupAssignment {
19-
topic: string
20-
partitions: number[]
18+
export interface GroupAssignment extends TopicPartitions {
19+
// Pulic export, left to not break existing usage
2120
}
2221

2322
export interface GroupPartitionsAssignments {

test/clients/consumer/consumer.test.ts

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3273,23 +3273,22 @@ test('pause should prevent fetches from paused partitions during consumption', a
32733273
value: `value-${i}`
32743274
}))
32753275

3276-
await produceTestMessages({ t, messages: messages.slice(0, 5), overrideOptions: { partitioner: () => 0 } })
3277-
await produceTestMessages({ t, messages: messages.slice(5, 10), overrideOptions: { partitioner: () => 1 } })
3278-
32793276
const consumer = createConsumer(t)
32803277
const stream = await consumer.consume({
32813278
topics: [topic],
32823279
mode: MessagesStreamModes.EARLIEST,
32833280
autocommit: true
32843281
})
32853282

3286-
consumer.pause([{ topic, partition: 0 }])
3283+
consumer.pause([{ topic, partitions: [0] }])
3284+
await produceTestMessages({ t, messages: messages.slice(0, 5), overrideOptions: { partitioner: () => 0 } })
3285+
await produceTestMessages({ t, messages: messages.slice(5, 10), overrideOptions: { partitioner: () => 1 } })
32873286

3288-
const received: number[] = []
3287+
const received: { key: string; partition: number }[] = []
32893288
let count = 0
32903289

32913290
for await (const message of stream) {
3292-
received.push(message.partition)
3291+
received.push({ key: message.key.toString(), partition: message.partition })
32933292
count++
32943293

32953294
if (count === 5) {
@@ -3298,7 +3297,7 @@ test('pause should prevent fetches from paused partitions during consumption', a
32983297
}
32993298

33003299
strictEqual(
3301-
received.every(p => p === 1),
3300+
received.every(m => m.partition === 1),
33023301
true
33033302
)
33043303
})
@@ -3322,7 +3321,7 @@ test('resume should allow fetches from previously paused partitions', async t =>
33223321
maxWaitTime: 100
33233322
})
33243323

3325-
consumer.pause([{ topic, partition: 0 }])
3324+
consumer.pause([{ topic, partitions: [0] }])
33263325

33273326
const received: { key: string; partition: number }[] = []
33283327
let count = 0
@@ -3332,7 +3331,7 @@ test('resume should allow fetches from previously paused partitions', async t =>
33323331
count++
33333332

33343333
if (count === 5) {
3335-
consumer.resume([{ topic, partition: 0 }])
3334+
consumer.resume([{ topic, partitions: [0] }])
33363335
}
33373336

33383337
if (count === 10) {
@@ -3357,23 +3356,23 @@ test('resume should handle resuming non-paused partitions gracefully', async t =
33573356
consumer.topics.trackAll(topic)
33583357
await consumer.joinGroup({})
33593358

3360-
doesNotThrow(() => consumer.resume([{ topic, partition: 0 }]))
3359+
doesNotThrow(() => consumer.resume([{ topic, partitions: [0] }]))
33613360
})
33623361

33633362
test('pause/resume should throw error if consumer has not joined a group', async t => {
33643363
const topic = await createTopic(t, true)
33653364
const consumer = createConsumer(t)
33663365

33673366
try {
3368-
consumer.pause([{ topic, partition: 0 }])
3367+
consumer.pause([{ topic, partitions: [0] }])
33693368
throw new Error('Expected error not thrown')
33703369
} catch (error) {
33713370
strictEqual(error instanceof UserError, true)
33723371
strictEqual(error.message, 'Cannot pause partitions before joining a consumer group.')
33733372
}
33743373

33753374
try {
3376-
consumer.resume([{ topic, partition: 0 }])
3375+
consumer.resume([{ topic, partitions: [0] }])
33773376
throw new Error('Expected error not thrown')
33783377
} catch (error) {
33793378
strictEqual(error instanceof UserError, true)
@@ -3390,15 +3389,15 @@ test('pause/resume should throw error if topic is not assigned to consumer', asy
33903389
await consumer.joinGroup({})
33913390

33923391
try {
3393-
consumer.pause([{ topic: topic2, partition: 0 }])
3392+
consumer.pause([{ topic: topic2, partitions: [0] }])
33943393
throw new Error('Expected error not thrown')
33953394
} catch (error) {
33963395
strictEqual(error instanceof UserError, true)
33973396
strictEqual(error.message, `Topic '${topic2}' is not assigned to this consumer.`)
33983397
}
33993398

34003399
try {
3401-
consumer.resume([{ topic: topic2, partition: 0 }])
3400+
consumer.resume([{ topic: topic2, partitions: [0] }])
34023401
throw new Error('Expected error not thrown')
34033402
} catch (error) {
34043403
strictEqual(error instanceof UserError, true)
@@ -3415,20 +3414,20 @@ test('pause/resume should handle multiple topic-partitions', async t => {
34153414
await consumer.joinGroup({})
34163415

34173416
consumer.pause([
3418-
{ topic: topic1, partition: 0 },
3419-
{ topic: topic2, partition: 0 }
3417+
{ topic: topic1, partitions: [0] },
3418+
{ topic: topic2, partitions: [0] }
34203419
])
34213420

3422-
strictEqual(consumer.isPaused({ topic: topic1, partition: 0 }), true)
3423-
strictEqual(consumer.isPaused({ topic: topic2, partition: 0 }), true)
3421+
strictEqual(consumer.isPaused(topic1, 0), true)
3422+
strictEqual(consumer.isPaused(topic2, 0), true)
34243423

34253424
consumer.resume([
3426-
{ topic: topic1, partition: 0 },
3427-
{ topic: topic2, partition: 0 }
3425+
{ topic: topic1, partitions: [0] },
3426+
{ topic: topic2, partitions: [0] }
34283427
])
34293428

3430-
strictEqual(consumer.isPaused({ topic: topic1, partition: 0 }), false)
3431-
strictEqual(consumer.isPaused({ topic: topic2, partition: 0 }), false)
3429+
strictEqual(consumer.isPaused(topic1, 0), false)
3430+
strictEqual(consumer.isPaused(topic2, 0), false)
34323431
})
34333432

34343433
test('paused should return all paused topic-partitions', async t => {
@@ -3440,15 +3439,13 @@ test('paused should return all paused topic-partitions', async t => {
34403439
await consumer.joinGroup({})
34413440

34423441
consumer.pause([
3443-
{ topic: topic1, partition: 0 },
3444-
{ topic: topic1, partition: 1 },
3445-
{ topic: topic2, partition: 0 }
3442+
{ topic: topic1, partitions: [0, 1] },
3443+
{ topic: topic2, partitions: [0] }
34463444
])
34473445

34483446
deepStrictEqual(consumer.paused(), [
3449-
{ topic: topic1, partition: 0 },
3450-
{ topic: topic1, partition: 1 },
3451-
{ topic: topic2, partition: 0 }
3447+
{ topic: topic1, partitions: [0, 1] },
3448+
{ topic: topic2, partitions: [0] }
34523449
])
34533450
})
34543451

@@ -3459,8 +3456,8 @@ test('isPaused should return true for paused topic-partitions', async t => {
34593456
consumer.topics.trackAll(topic)
34603457
await consumer.joinGroup({})
34613458

3462-
consumer.pause([{ topic, partition: 0 }])
3463-
strictEqual(consumer.isPaused({ topic, partition: 0 }), true)
3459+
consumer.pause([{ topic, partitions: [0] }])
3460+
strictEqual(consumer.isPaused(topic, 0), true)
34643461
})
34653462

34663463
test('isPaused should return false for non-paused topic-partitions', async t => {
@@ -3470,7 +3467,7 @@ test('isPaused should return false for non-paused topic-partitions', async t =>
34703467
consumer.topics.trackAll(topic)
34713468
await consumer.joinGroup({})
34723469

3473-
strictEqual(consumer.isPaused({ topic, partition: 0 }), false)
3470+
strictEqual(consumer.isPaused(topic, 0), false)
34743471
})
34753472

34763473
test('isPaused should return false for topic-partitions not assigned to consumer', async t => {
@@ -3481,5 +3478,5 @@ test('isPaused should return false for topic-partitions not assigned to consumer
34813478
consumer.topics.trackAll(topic1)
34823479
await consumer.joinGroup({})
34833480

3484-
strictEqual(consumer.isPaused({ topic: topic2, partition: 0 }), false)
3481+
strictEqual(consumer.isPaused(topic2, 0), false)
34853482
})

0 commit comments

Comments
 (0)