Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 170 additions & 1 deletion src/clients/consumer/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ import { ensureMetric, type Gauge } from '../metrics.ts'
import { MessagesStream } from './messages-stream.ts'
import {
commitOptionsValidator,
consumeByPartitionOptionsValidator,
consumeOptionsValidator,
consumerOptionsValidator,
defaultConsumerOptions,
Expand All @@ -86,6 +87,7 @@ import {
import { roundRobinAssigner } from './partitions-assigners.ts'
import { TopicsMap } from './topics-map.ts'
import {
type ConsumeByPartitionOptions,
type CommitOptions,
type ConsumeOptions,
type ConsumerOptions,
Expand Down Expand Up @@ -297,6 +299,41 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
return callback![kCallbackPromise]
}

consumeByPartition (
options: ConsumeByPartitionOptions<Key, Value, HeaderKey, HeaderValue>,
callback: CallbackWithPromise<Map<string, MessagesStream<Key, Value, HeaderKey, HeaderValue>>>
): void
consumeByPartition (
options: ConsumeByPartitionOptions<Key, Value, HeaderKey, HeaderValue>
): Promise<Map<string, MessagesStream<Key, Value, HeaderKey, HeaderValue>>>
consumeByPartition (
options: ConsumeByPartitionOptions<Key, Value, HeaderKey, HeaderValue>,
callback?: CallbackWithPromise<Map<string, MessagesStream<Key, Value, HeaderKey, HeaderValue>>>
): void | Promise<Map<string, MessagesStream<Key, Value, HeaderKey, HeaderValue>>> {
if (!callback) {
callback = createPromisifiedCallback<Map<string, MessagesStream<Key, Value, HeaderKey, HeaderValue>>>()
}

if (this[kCheckNotClosed](callback)) {
return callback[kCallbackPromise]
}

const validationError = this[kValidateOptions](options, consumeByPartitionOptionsValidator, '/options', false)
if (validationError) {
callback(validationError, undefined as unknown as Map<string, MessagesStream<Key, Value, HeaderKey, HeaderValue>>)
return callback[kCallbackPromise]
}

options.autocommit ??= this[kOptions].autocommit! ?? true
options.maxBytes ??= this[kOptions].maxBytes!
options.deserializers = Object.assign({}, options.deserializers, this[kOptions].deserializers)
options.highWaterMark ??= this[kOptions].highWaterMark!

this.#consumeByPartition(options, callback)

return callback![kCallbackPromise]
}

fetch (options: FetchOptions<Key, Value, HeaderKey, HeaderValue>, callback: CallbackWithPromise<FetchResponse>): void
fetch (options: FetchOptions<Key, Value, HeaderKey, HeaderValue>): Promise<FetchResponse>
fetch (
Expand Down Expand Up @@ -547,6 +584,21 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
)
}

#consumeByPartition (
options: ConsumeByPartitionOptions<Key, Value, HeaderKey, HeaderValue>,
callback: CallbackWithPromise<Map<string, MessagesStream<Key, Value, HeaderKey, HeaderValue>>>
): void {
consumerConsumesChannel.traceCallback(
this.#performConsumeByPartition,
2,
createDiagnosticContext({ client: this, operation: 'consume', options }),
this,
options,
true,
callback
)
}

#fetch (
options: FetchOptions<Key, Value, HeaderKey, HeaderValue>,
callback: CallbackWithPromise<FetchResponse>
Expand Down Expand Up @@ -986,7 +1038,7 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
// Create the stream and start consuming
const stream = new MessagesStream<Key, Value, HeaderKey, HeaderValue>(
this,
options as ConsumeOptions<Key, Value, HeaderKey, HeaderValue>
options as ConsumeByPartitionOptions<Key, Value, HeaderKey, HeaderValue>
)
this.#streams.add(stream)

Expand All @@ -1000,6 +1052,123 @@ export class Consumer<Key = Buffer, Value = Buffer, HeaderKey = Buffer, HeaderVa
callback(null, stream)
}

#performConsumeByPartition (
options: ConsumeByPartitionOptions<Key, Value, HeaderKey, HeaderValue>,
trackTopics: boolean,
callback: CallbackWithPromise<Map<string, MessagesStream<Key, Value, HeaderKey, HeaderValue>>>
): void {
// Subscribe all topics
let joinNeeded = this.memberId === null

if (trackTopics) {
for (const topic of options.topics) {
if (this.topics.track(topic)) {
joinNeeded = true
}
}
}

// If we need to (re)join the group, do that first and then try again
if (joinNeeded) {
this.joinGroup(options, error => {
if (error) {
callback(error, undefined as unknown as Map<string, MessagesStream<Key, Value, HeaderKey, HeaderValue>>)
return
}

this.#performConsumeByPartition(options, false, callback)
})

return
}

let streamMap = new Map<string, MessagesStream<Key, Value, HeaderKey, HeaderValue>>()

for (const topic of options.topics) {
const assignment = this.assignments!.find(assignment => assignment.topic === topic)
if (!assignment) {
continue
}

for (const partition of assignment.partitions) {
const stream = new MessagesStream<Key, Value, HeaderKey, HeaderValue>(
this,
options as ConsumeByPartitionOptions<Key, Value, HeaderKey, HeaderValue>,
{ topic, partition }
)

this.#streams.add(stream)

this.#metricActiveStreams?.inc()
stream.once('close', () => {
this.#metricActiveStreams?.dec()
this.topics.untrack(topic)
this.#streams.delete(stream)
})

streamMap.set(`${topic}:${partition}`, stream)
}
}

this.on('consumer:group:join', () => {
// calculate new list of topic-partitions vs. the keys of current streamMap
// if new list is identical to old list, do nothing
// if lists are different, create new streamMap and fill it with streams from
// old map that are still in new list of topic-partitions, plus new streams for
// the new topic-partitions (if any)
// replace streamMap let var with new map, and pass it to the onAssignmentChange callback
const oldTopicPartitions = Array.from(streamMap.keys())
const newTopicPartitions: string[] = []

for (const topic of options.topics) {
const assignment = this.assignments!.find(assignment => assignment.topic === topic)
if (!assignment) {
continue
}

for (const partition of assignment.partitions) {
newTopicPartitions.push(`${topic}:${partition}`)
}
}

if (oldTopicPartitions.length === newTopicPartitions.length && oldTopicPartitions.sort().join(',') === newTopicPartitions.sort().join(',')) {
return
}

const newStreamMap = new Map<string, MessagesStream<Key, Value, HeaderKey, HeaderValue>>()

for (const topicPartition of newTopicPartitions) {
const [topic, partition] = topicPartition.split(':')
const stream = streamMap.get(topicPartition)
if (stream) {
newStreamMap.set(topicPartition, stream)
} else {
const stream = new MessagesStream<Key, Value, HeaderKey, HeaderValue>(
this,
options as ConsumeByPartitionOptions<Key, Value, HeaderKey, HeaderValue>,
{ topic, partition: parseInt(partition, 10) }
)

this.#streams.add(stream)

this.#metricActiveStreams?.inc()
stream.once('close', () => {
this.#metricActiveStreams?.dec()
this.topics.untrack(topic)
this.#streams.delete(stream)
})

newStreamMap.set(topicPartition, stream)
}
}

streamMap = newStreamMap
options.onAssignmentChange(streamMap)
})

callback(null, streamMap)
}

#performFindGroupCoordinator (callback: CallbackWithPromise<number>): void {
this[kPerformDeduplicated](
'findGroupCoordinator',
Expand Down
41 changes: 35 additions & 6 deletions src/clients/consumer/messages-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { type Deserializer, type DeserializerWithHeaders } from '../serde.ts'
import { type Consumer } from './consumer.ts'
import { defaultConsumerOptions } from './options.ts'
import {
type ConsumeByPartitionOptions,
MessagesStreamFallbackModes,
MessagesStreamModes,
type CommitOptionsPartition,
Expand Down Expand Up @@ -51,6 +52,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
#maxFetches: number
#options: ConsumeOptions<Key, Value, HeaderKey, HeaderValue>
#topics: string[]
#topicPartition?: { topic: string, partition: number }
#offsetsToFetch: Map<string, bigint>
#offsetsToCommit: Map<string, CommitOptionsPartition>
#inflightNodes: Set<number>
Expand All @@ -68,7 +70,8 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable

constructor (
consumer: Consumer<Key, Value, HeaderKey, HeaderValue>,
options: ConsumeOptions<Key, Value, HeaderKey, HeaderValue>
options: ConsumeByPartitionOptions<Key, Value, HeaderKey, HeaderValue>,
topicPartition?: { topic: string, partition: number }
) {
const {
autocommit,
Expand All @@ -80,6 +83,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
onCorruptedMessage,
// The options below are only destructured to avoid being part of structuredClone below
partitionAssigner: _partitionAssigner,
onAssignmentChange: _onAssignmentChange,
...otherOptions
} = options

Expand All @@ -100,6 +104,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
this.#fetches = 0
this.#maxFetches = maxFetches ?? 0
this.#topics = structuredClone(options.topics)
this.#topicPartition = topicPartition
this.#inflightNodes = new Set()
this.#keyDeserializer = deserializers?.key ?? (noopDeserializer as Deserializer<Key>)
this.#valueDeserializer = deserializers?.value ?? (noopDeserializer as Deserializer<Value>)
Expand All @@ -115,7 +120,9 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
this.#offsetsToFetch = new Map()
if (offsets) {
for (const { topic, partition, offset } of offsets) {
this.#offsetsToFetch.set(`${topic}:${partition}`, offset)
if (topicPartition && topicPartition.topic === topic && topicPartition.partition === partition) {
this.#offsetsToFetch.set(`${topic}:${partition}`, offset)
}
}
}

Expand Down Expand Up @@ -361,7 +368,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable

// Group topic-partitions by the destination broker
const requestedOffsets = new Map<string, bigint>()
for (const topic of this.#topics) {
for (const topic of this.#topicPartition ? [this.#topicPartition.topic] : this.#topics) {
const assignment = this.#assignmentsForTopic(topic)

// This consumer has no assignment for the topic, continue
Expand Down Expand Up @@ -614,7 +621,8 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
// List topic offsets
this.#consumer.listOffsets(
{
topics: this.#topics,
topics: this.#topicPartition ? [this.#topicPartition.topic] : this.#topics,
partitions: this.#topicPartition ? { [this.#topicPartition.topic]: [this.#topicPartition.partition] } : undefined,
timestamp:
this.#mode === MessagesStreamModes.EARLIEST ||
(this.#mode !== MessagesStreamModes.LATEST && this.#fallbackMode === MessagesStreamFallbackModes.EARLIEST)
Expand All @@ -640,7 +648,7 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable

// Now restore group offsets
const topics: GroupAssignment[] = []
for (const topic of this.#topics) {
for (const topic of this.#topicPartition ? [this.#topicPartition.topic] : this.#topics) {
const assignment = this.#assignmentsForTopic(topic)

if (!assignment) {
Expand Down Expand Up @@ -704,7 +712,28 @@ export class MessagesStream<Key, Value, HeaderKey, HeaderValue> extends Readable
}

#assignmentsForTopic (topic: string): GroupAssignment | undefined {
return this.#consumer.assignments!.find(assignment => assignment.topic === topic)
if (this.#topicPartition) {
const topicPartition = this.#topicPartition
const assignment = this.#consumer.assignments!.find(assignment => assignment.topic === topic && assignment.partitions.includes(topicPartition.partition))

if (assignment) {
// The consumer still has this topic partition assigned,
// so return the assignment and continue working
return {
topic: topicPartition.topic,
partitions: [topicPartition.partition]
}
} else {
// The consumer no longer has this topic partition assigned to it,
// since this stream is specific to this topic partition,
// close the stream so we stop fetching and downstream consumers
// can clean themselves up.
this.close()
return undefined
}
} else {
return this.#consumer.assignments!.find(assignment => assignment.topic === topic)
}
}

#invokeCloseCallbacks (error: Error | null) {
Expand Down
Loading
Loading