Skip to content

Commit 1aeda94

Browse files
authored
feat: Fetch all topics offsets when no topics is provided. (#151)
Signed-off-by: Paolo Insogna <[email protected]>
1 parent a738f5c commit 1aeda94

File tree

3 files changed

+52
-2
lines changed

3 files changed

+52
-2
lines changed

docs/consumer.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ Options:
140140

141141
Lists available offsets for topics.
142142

143+
If the topics list is empty, then it will fetch the offsets of all topics currently consumed by the consumer.
144+
143145
The return value is a map where keys are in the form `$topic:$partition` and values are arrays of offsets (where the position represents the partition).
144146

145147
Options:
@@ -167,6 +169,8 @@ Options:
167169

168170
Calculates the consumer lag for specified topics.
169171

172+
If the topics list is empty, then it will calculate the lag of all topics currently consumed by the consumer.
173+
170174
The return value is a map where keys are topic names and values are arrays of lag values (where the position represents the partition). A value of `-1n` indicates that the consumer is not assigned to that partition.
171175

172176
If a partition is filtered out via the `partitions` option, then a `-2n` will be returned for that partition.

src/clients/consumer/consumer.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -799,15 +799,21 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
799799
options: ListOffsetsOptions,
800800
callback: CallbackWithPromise<Offsets | OffsetsWithTimestamps>
801801
): void {
802-
this[kMetadata]({ topics: options.topics }, (error, metadata) => {
802+
let topics = options.topics
803+
804+
if (!topics || topics.length === 0) {
805+
topics = this.topics.current
806+
}
807+
808+
this[kMetadata]({ topics }, (error, metadata) => {
803809
if (error) {
804810
callback(error, undefined as unknown as Offsets)
805811
return
806812
}
807813

808814
const requests = new Map<number, Map<string, ListOffsetsRequestTopic>>()
809815

810-
for (const name of options.topics) {
816+
for (const name of topics) {
811817
const topic = metadata.topics.get(name)!
812818
const toInclude = new Set(options.partitions?.[name] ?? [])
813819
const hasPartitionsFilter = toInclude.size > 0

test/clients/consumer/consumer.test.ts

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1599,6 +1599,46 @@ test('listOffsets should return offset values for topics and partitions and supp
15991599
verifyTracingChannel()
16001600
})
16011601

1602+
test('listOffsets should return offset values for all consumed topics when none are explicitly requested', async t => {
1603+
const consumer = createConsumer(t)
1604+
const topic = await createTopic(t, true, 2)
1605+
1606+
const verifyTracingChannel = createTracingChannelVerifier(consumerOffsetsChannel, 'client', {
1607+
start (context: ClientDiagnosticEvent) {
1608+
deepStrictEqual(context, {
1609+
client: consumer,
1610+
operation: 'listOffsets',
1611+
options: { topics: [] },
1612+
operationId: mockedOperationId
1613+
})
1614+
},
1615+
asyncStart (context: ClientDiagnosticEvent) {
1616+
deepStrictEqual((context.result as Offsets).get(topic), [0n, 0n])
1617+
},
1618+
error (context: ClientDiagnosticEvent) {
1619+
ok(typeof context === 'undefined')
1620+
}
1621+
})
1622+
1623+
// Get offsets for the test topic
1624+
consumer.topics.track(topic)
1625+
const offsets = await consumer.listOffsets({ topics: [] })
1626+
1627+
// Verify the offsets structure
1628+
strictEqual(offsets instanceof Map, true, 'Should return a Map of offsets')
1629+
strictEqual(offsets.has(topic), true, 'Should contain the requested topic')
1630+
1631+
const topicOffsets = offsets.get(topic)!
1632+
strictEqual(Array.isArray(topicOffsets), true, 'Topic offsets should be an array')
1633+
strictEqual(topicOffsets.length, 2, 'Should have offsets for all partitions')
1634+
1635+
// For new topics, offsets should typically be 0
1636+
strictEqual(typeof topicOffsets[0], 'bigint', 'Offset should be a bigint')
1637+
strictEqual(typeof topicOffsets[1], 'bigint', 'Offset should be a bigint')
1638+
1639+
verifyTracingChannel()
1640+
})
1641+
16021642
test('listOffsets should support both promise and callback API', async t => {
16031643
const consumer = createConsumer(t)
16041644
const topic = await createTopic(t, true)

0 commit comments

Comments
 (0)