Skip to content

Commit 6816298

Browse files
authored
fix: Fixed consumer partions assignment handling. (#138)
Signed-off-by: Paolo Insogna <[email protected]>
1 parent e227fa2 commit 6816298

File tree

2 files changed

+73
-14
lines changed

2 files changed

+73
-14
lines changed

src/clients/consumer/consumer.ts

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import {
4747
import { type GenericError, type ProtocolError, UserError } from '../../errors.ts'
4848
import { type ConnectionPool } from '../../network/connection-pool.ts'
4949
import { type Connection } from '../../network/connection.ts'
50+
import { INT32_SIZE } from '../../protocol/definitions.ts'
5051
import { Reader } from '../../protocol/reader.ts'
5152
import { Writer } from '../../protocol/writer.ts'
5253
import {
@@ -117,8 +118,8 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
117118
#coordinatorId: number | null
118119
#heartbeatInterval: NodeJS.Timeout | null
119120
#lastHeartbeat: Date | null
120-
#streams: Set<MessagesStream<Key, Value, HeaderKey, HeaderValue>>
121-
#partitionsAssigner: GroupPartitionsAssigner;
121+
#streams: Set<MessagesStream<Key, Value, HeaderKey, HeaderValue>>;
122+
122123
/*
123124
The following requests are blocking in Kafka:
124125
@@ -158,7 +159,6 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
158159
this.#heartbeatInterval = null
159160
this.#lastHeartbeat = null
160161
this.#streams = new Set()
161-
this.#partitionsAssigner = this[kOptions].partitionAssigner ?? roundRobinAssigner
162162

163163
this.#validateGroupOptions(this[kOptions], groupIdAndOptionsValidator)
164164

@@ -868,12 +868,16 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
868868
)
869869
}
870870

871-
#syncGroup (callback: CallbackWithPromise<GroupAssignment[]>): void {
871+
#syncGroup (
872+
partitionsAssigner: GroupPartitionsAssigner | null,
873+
callback: CallbackWithPromise<GroupAssignment[]>
874+
): void {
872875
consumerGroupChannel.traceCallback(
873876
this.#performSyncGroup,
874-
1,
877+
2,
875878
createDiagnosticContext({ client: this, operation: 'syncGroup' }),
876879
this,
880+
partitionsAssigner,
877881
null,
878882
callback
879883
)
@@ -1108,7 +1112,7 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
11081112
}
11091113

11101114
// Send a syncGroup request
1111-
this.#syncGroup((error, response) => {
1115+
this.#syncGroup(options.partitionAssigner, (error, response) => {
11121116
if (!this.#membershipActive) {
11131117
callback(null, undefined as unknown as string)
11141118
return
@@ -1225,6 +1229,7 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
12251229
}
12261230

12271231
#performSyncGroup (
1232+
partitionsAssigner: GroupPartitionsAssigner | null,
12281233
assignments: SyncGroupRequestAssignment[] | null,
12291234
callback: CallbackWithPromise<GroupAssignment[]>
12301235
): void {
@@ -1257,7 +1262,7 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
12571262
return
12581263
}
12591264

1260-
this.#performSyncGroup(this.#createAssignments(metadata), callback)
1265+
this.#performSyncGroup(partitionsAssigner, this.#createAssignments(partitionsAssigner, metadata), callback)
12611266
})
12621267

12631268
return
@@ -1409,6 +1414,10 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
14091414

14101415
reader.skip(2) // Ignore Version information
14111416

1417+
if (reader.remaining < INT32_SIZE) {
1418+
return []
1419+
}
1420+
14121421
return reader.readArray(
14131422
r => {
14141423
return {
@@ -1421,7 +1430,10 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
14211430
)
14221431
}
14231432

1424-
#createAssignments (metadata: ClusterMetadata): SyncGroupRequestAssignment[] {
1433+
#createAssignments (
1434+
partitionsAssigner: GroupPartitionsAssigner | null,
1435+
metadata: ClusterMetadata
1436+
): SyncGroupRequestAssignment[] {
14251437
const partitionTracker: Map<string, { next: number; max: number }> = new Map()
14261438

14271439
// First of all, layout topics-partitions in a list
@@ -1448,12 +1460,9 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
14481460
}
14491461

14501462
const encodedAssignments: SyncGroupRequestAssignment[] = []
1451-
for (const member of this.#partitionsAssigner(
1452-
this.memberId!,
1453-
this.#members,
1454-
new Set(this.topics.current),
1455-
metadata
1456-
)) {
1463+
1464+
partitionsAssigner ??= this[kOptions].partitionAssigner ?? roundRobinAssigner
1465+
for (const member of partitionsAssigner(this.memberId!, this.#members, new Set(this.topics.current), metadata)) {
14571466
encodedAssignments.push({
14581467
memberId: member.memberId,
14591468
assignment: this.#encodeProtocolAssignment(Array.from(member.assignments.values()))

test/clients/consumer/consumer.test.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2350,6 +2350,56 @@ test('joinGroup should setup assignment with a custom policy', async t => {
23502350
deepStrictEqual(consumer2.assignments, [{ topic, partitions: [0, 1, 2] }])
23512351
})
23522352

2353+
test('joinGroup should not fail when the partition assigner misses a member', async t => {
2354+
const topic = await createTopic(t, true, 3)
2355+
const groupId = createGroupId()
2356+
2357+
function partitionAssigner (
2358+
_current: string,
2359+
_members: Map<string, ExtendedGroupProtocolSubscription>,
2360+
topics: Set<string>,
2361+
metadata: ClusterMetadata
2362+
): GroupPartitionsAssignments[] {
2363+
const assignments: GroupPartitionsAssignments[] = []
2364+
2365+
// Assign all partitions to only consumer1
2366+
2367+
const member = { memberId: consumer1.memberId!, assignments: new Map() }
2368+
assignments.push(member)
2369+
2370+
for (const topic of topics) {
2371+
const partitionsCount = metadata.topics.get(topic)!.partitionsCount
2372+
2373+
for (let i = 0; i < partitionsCount; i++) {
2374+
let topicAssignments = member.assignments.get(topic)
2375+
2376+
if (!topicAssignments) {
2377+
topicAssignments = { topic, partitions: [] }
2378+
member.assignments.set(topic, topicAssignments)
2379+
}
2380+
2381+
topicAssignments?.partitions.push(i)
2382+
}
2383+
}
2384+
2385+
return assignments
2386+
}
2387+
2388+
const consumer1 = createConsumer(t, { groupId, partitionAssigner })
2389+
const consumer2 = createConsumer(t, { groupId, partitionAssigner })
2390+
2391+
await consumer1.topics.trackAll(topic)
2392+
await consumer2.topics.trackAll(topic)
2393+
2394+
await consumer1.joinGroup({ protocols: [{ name: 'roundrobin', version: 1, metadata: '123' }] })
2395+
const rejoinPromise = once(consumer1, 'consumer:group:join')
2396+
await consumer2.joinGroup({ protocols: [{ name: 'roundrobin', version: 1, metadata: Buffer.from('123') }] })
2397+
await rejoinPromise
2398+
2399+
deepStrictEqual(consumer1.assignments, [{ topic, partitions: [0, 1, 2] }])
2400+
deepStrictEqual(consumer2.assignments, [])
2401+
})
2402+
23532403
test('joinGroup might receive no assignment', async t => {
23542404
const topic = await createTopic(t, true)
23552405
const groupId = createGroupId()

0 commit comments

Comments
 (0)