Skip to content

Commit 69a4a3b

Browse files
authored
fix: Fixed lag computation and describeGroup response handling. (#153)
* fix: Fixed lag computation and describeGroup response handling. Signed-off-by: Paolo Insogna <[email protected]>
1 parent 49abd5c commit 69a4a3b

File tree

5 files changed

+74
-27
lines changed

5 files changed

+74
-27
lines changed

src/clients/admin/admin.ts

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ import {
8585
type DescribeLogDirsOptions,
8686
type Group,
8787
type GroupBase,
88+
type GroupMember,
8889
type ListGroupsOptions,
8990
type ListTopicsOptions
9091
} from './types.ts'
@@ -705,26 +706,31 @@ export class Admin extends Base<AdminOptions> {
705706
for (const member of raw.members) {
706707
const reader = Reader.from(member.memberMetadata)
707708

708-
const memberMetadata = {
709-
version: reader.readInt16(),
710-
topics: reader.readArray(r => r.readString(false), false, false),
711-
metadata: reader.readBytes(false)
712-
}
709+
let memberMetadata: GroupMember['metadata'] | undefined
710+
let memberAssignments: Map<string, GroupAssignment> | undefined
711+
712+
if (reader.remaining > 0) {
713+
memberMetadata = {
714+
version: reader.readInt16(),
715+
topics: reader.readArray(r => r.readString(false), false, false),
716+
metadata: reader.readBytes(false)
717+
}
713718

714-
reader.reset(member.memberAssignment)
715-
reader.skip(2) // Ignore Version information
719+
reader.reset(member.memberAssignment)
720+
reader.skip(2) // Ignore Version information
716721

717-
const memberAssignments: Map<string, GroupAssignment> = reader.readMap(
718-
r => {
719-
const topic = r.readString(false)
722+
memberAssignments = reader.readMap(
723+
r => {
724+
const topic = r.readString(false)
720725

721-
return [topic, { topic, partitions: reader.readArray(r => r.readInt32(), false, false) }]
722-
},
723-
false,
724-
false
725-
)
726+
return [topic, { topic, partitions: reader.readArray(r => r.readInt32(), false, false) }]
727+
},
728+
false,
729+
false
730+
)
726731

727-
reader.readBytes() // Ignore the user data
732+
reader.readBytes() // Ignore the user data
733+
}
728734

729735
group.members.set(member.memberId, {
730736
id: member.memberId,

src/clients/admin/types.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import { type AlterClientQuotasRequestEntry } from '../../apis/admin/alter-client-quotas-v1.ts'
2+
import { type CreateTopicsRequestTopicConfig } from '../../apis/admin/create-topics-v7.ts'
23
import { type DescribeClientQuotasRequestComponent } from '../../apis/admin/describe-client-quotas-v0.ts'
34
import {
5+
type DescribeLogDirsRequestTopic,
46
type DescribeLogDirsResponse,
5-
type DescribeLogDirsResponseResult,
6-
type DescribeLogDirsRequestTopic
7+
type DescribeLogDirsResponseResult
78
} from '../../apis/admin/describe-log-dirs-v4.ts'
8-
import { type CreateTopicsRequestTopicConfig } from '../../apis/admin/create-topics-v7.ts'
99
import { type ConsumerGroupState } from '../../apis/enumerations.ts'
1010
import { type NullableString } from '../../protocol/definitions.ts'
1111
import { type BaseOptions } from '../base/types.ts'
@@ -29,8 +29,8 @@ export interface GroupMember {
2929
groupInstanceId: NullableString
3030
clientId: string
3131
clientHost: string
32-
metadata: Omit<ExtendedGroupProtocolSubscription, 'memberId'>
33-
assignments: Map<string, GroupAssignment>
32+
metadata?: Omit<ExtendedGroupProtocolSubscription, 'memberId'>
33+
assignments?: Map<string, GroupAssignment>
3434
}
3535

3636
export interface GroupBase {

src/clients/consumer/consumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
520520
// Now gather the last committed offsets from each stream
521521
const committeds = new Map<string, bigint>()
522522
for (const stream of this.#streams) {
523-
for (const [topic, offset] of stream.committedOffsets) {
523+
for (const [topic, offset] of stream.offsetsCommitted) {
524524
committeds.set(topic, offset)
525525
}
526526
}

src/clients/consumer/messages-stream.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
146146
// will have changed so we may have gone from last with no assignments to
147147
// having some.
148148
this.#consumer.on('consumer:group:join', () => {
149+
this.#offsetsCommitted.clear()
150+
149151
this.#refreshOffsets((error: Error | null) => {
150152
/* c8 ignore next 4 - Hard to test */
151153
if (error) {
@@ -169,6 +171,19 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
169171
notifyCreation('messages-stream', this)
170172
}
171173

174+
get offsetsToFetch (): Map<string, bigint> {
175+
return this.#offsetsToFetch
176+
}
177+
178+
get offsetsToCommit (): Map<string, CommitOptionsPartition> {
179+
return this.#offsetsToCommit
180+
}
181+
182+
get offsetsCommitted (): Map<string, bigint> {
183+
return this.#offsetsCommitted
184+
}
185+
186+
// TODO: This is deprecated alias, remove in future major version
172187
get committedOffsets (): Map<string, bigint> {
173188
return this.#offsetsCommitted
174189
}
@@ -279,6 +294,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
279294
*/
280295
addListener (event: 'autocommit', listener: (err: Error, offsets: CommitOptionsPartition[]) => void): this
281296
addListener (event: 'fetch', listener: () => void): this
297+
addListener (event: 'offsets', listener: () => void): this
282298
addListener (event: 'data', listener: (message: Message<Key, Value, HeaderKey, HeaderValue>) => void): this
283299
addListener (event: 'close', listener: () => void): this
284300
addListener (event: 'end', listener: () => void): this
@@ -293,6 +309,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
293309

294310
on (event: 'autocommit', listener: (err: Error, offsets: CommitOptionsPartition[]) => void): this
295311
on (event: 'fetch', listener: () => void): this
312+
on (event: 'offsets', listener: () => void): this
296313
on (event: 'data', listener: (message: Message<Key, Value, HeaderKey, HeaderValue>) => void): this
297314
on (event: 'close', listener: () => void): this
298315
on (event: 'end', listener: () => void): this
@@ -307,6 +324,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
307324

308325
once (event: 'autocommit', listener: (err: Error, offsets: CommitOptionsPartition[]) => void): this
309326
once (event: 'fetch', listener: () => void): this
327+
once (event: 'offsets', listener: () => void): this
310328
once (event: 'data', listener: (message: Message<Key, Value, HeaderKey, HeaderValue>) => void): this
311329
once (event: 'close', listener: () => void): this
312330
once (event: 'end', listener: () => void): this
@@ -321,6 +339,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
321339

322340
prependListener (event: 'autocommit', listener: (err: Error, offsets: CommitOptionsPartition[]) => void): this
323341
prependListener (event: 'fetch', listener: () => void): this
342+
prependListener (event: 'offsets', listener: () => void): this
324343
prependListener (event: 'data', listener: (message: Message<Key, Value, HeaderKey, HeaderValue>) => void): this
325344
prependListener (event: 'close', listener: () => void): this
326345
prependListener (event: 'end', listener: () => void): this
@@ -335,6 +354,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
335354

336355
prependOnceListener (event: 'autocommit', listener: (err: Error, offsets: CommitOptionsPartition[]) => void): this
337356
prependOnceListener (event: 'fetch', listener: () => void): this
357+
prependOnceListener (event: 'offsets', listener: () => void): this
338358
prependOnceListener (event: 'data', listener: (message: Message<Key, Value, HeaderKey, HeaderValue>) => void): this
339359
prependOnceListener (event: 'close', listener: () => void): this
340360
prependOnceListener (event: 'end', listener: () => void): this
@@ -774,6 +794,24 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
774794
}
775795
}
776796

797+
// Rebuild the list of offsetsCommitted (which is used for consumer lag) out of the offsets to fetch
798+
for (const topic of this.#topics) {
799+
const assignment = this.#assignmentsForTopic(topic)
800+
801+
// This consumer has no assignment for the topic, continue
802+
if (!assignment) {
803+
continue
804+
}
805+
806+
const partitions = assignment.partitions
807+
808+
for (const partition of partitions) {
809+
const committed = this.#offsetsToFetch.get(`${topic}:${partition}`)!
810+
this.#offsetsCommitted.set(`${topic}:${partition}`, committed - 1n)
811+
}
812+
}
813+
814+
this.emit('offsets')
777815
callback(null)
778816
}
779817

test/clients/consumer/consumer.test.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2188,22 +2188,25 @@ test('getLag should allow to filter out topics and partitions', async t => {
21882188
const topic = await createTopic(t, true, 3)
21892189
const consumer = createConsumer(t)
21902190

2191-
await consumer.consume({
2191+
const stream = await consumer.consume({
21922192
topics: [topic],
21932193
autocommit: true,
21942194
mode: 'earliest',
21952195
maxWaitTime: 1000,
21962196
maxBytes: 10
21972197
})
21982198

2199+
// Wait for the topics to be fetched
2200+
await once(stream, 'offsets')
2201+
21992202
await new Promise<void>((resolve, reject) => {
22002203
consumer.getLag({ topics: [topic], partitions: { [topic]: [0, 2] } }, (err, offsets) => {
22012204
if (err) {
22022205
reject(err)
22032206
return
22042207
}
22052208

2206-
deepStrictEqual(offsets, new Map([[topic, [-1n, -2n, -1n]]]))
2209+
deepStrictEqual(offsets, new Map([[topic, [0n, -2n, 0n]]]))
22072210
resolve()
22082211
})
22092212
})
@@ -2364,9 +2367,9 @@ test('startLagMonitoring should regularly check consumer lag', async t => {
23642367
unsubscribe(consumerLagChannel.name, onLag as ChannelListener)
23652368

23662369
deepStrictEqual(lagsViaEvent, [
2367-
new Map([[topic, [-1n, -1n, -1n]]]),
2368-
new Map([[topic, [-1n, -1n, -1n]]]),
2369-
new Map([[topic, [-1n, -1n, -1n]]])
2370+
new Map([[topic, [-1n, -1n, -1n]]]), // Still joining and not assigned to any partition
2371+
new Map([[topic, [0n, 0n, 0n]]]),
2372+
new Map([[topic, [0n, 0n, 0n]]])
23702373
])
23712374
})
23722375

0 commit comments

Comments
 (0)