diff --git a/src/clients/consumer/consumer.ts b/src/clients/consumer/consumer.ts index b925c7b..ae468dd 100644 --- a/src/clients/consumer/consumer.ts +++ b/src/clients/consumer/consumer.ts @@ -74,6 +74,7 @@ import { ensureMetric, type Gauge } from '../metrics.ts' import { MessagesStream } from './messages-stream.ts' import { commitOptionsValidator, + consumeByPartitionOptionsValidator, consumeOptionsValidator, consumerOptionsValidator, defaultConsumerOptions, @@ -86,6 +87,7 @@ import { import { roundRobinAssigner } from './partitions-assigners.ts' import { TopicsMap } from './topics-map.ts' import { + type ConsumeByPartitionOptions, type CommitOptions, type ConsumeOptions, type ConsumerOptions, @@ -297,6 +299,41 @@ export class Consumer, + callback: CallbackWithPromise>> + ): void + consumeByPartition ( + options: ConsumeByPartitionOptions + ): Promise>> + consumeByPartition ( + options: ConsumeByPartitionOptions, + callback?: CallbackWithPromise>> + ): void | Promise>> { + if (!callback) { + callback = createPromisifiedCallback>>() + } + + if (this[kCheckNotClosed](callback)) { + return callback[kCallbackPromise] + } + + const validationError = this[kValidateOptions](options, consumeByPartitionOptionsValidator, '/options', false) + if (validationError) { + callback(validationError, undefined as unknown as Map>) + return callback[kCallbackPromise] + } + + options.autocommit ??= this[kOptions].autocommit! ?? true + options.maxBytes ??= this[kOptions].maxBytes! + options.deserializers = Object.assign({}, options.deserializers, this[kOptions].deserializers) + options.highWaterMark ??= this[kOptions].highWaterMark! + + this.#consumeByPartition(options, callback) + + return callback![kCallbackPromise] + } + fetch (options: FetchOptions, callback: CallbackWithPromise): void fetch (options: FetchOptions): Promise fetch ( @@ -547,6 +584,21 @@ export class Consumer, + callback: CallbackWithPromise>> + ): void { + consumerConsumesChannel.traceCallback( + this.#performConsumeByPartition, + 2, + createDiagnosticContext({ client: this, operation: 'consume', options }), + this, + options, + true, + callback + ) + } + #fetch ( options: FetchOptions, callback: CallbackWithPromise @@ -986,7 +1038,7 @@ export class Consumer( this, - options as ConsumeOptions + options as ConsumeByPartitionOptions ) this.#streams.add(stream) @@ -1000,6 +1052,123 @@ export class Consumer, + trackTopics: boolean, + callback: CallbackWithPromise>> + ): void { + // Subscribe all topics + let joinNeeded = this.memberId === null + + if (trackTopics) { + for (const topic of options.topics) { + if (this.topics.track(topic)) { + joinNeeded = true + } + } + } + + // If we need to (re)join the group, do that first and then try again + if (joinNeeded) { + this.joinGroup(options, error => { + if (error) { + callback(error, undefined as unknown as Map>) + return + } + + this.#performConsumeByPartition(options, false, callback) + }) + + return + } + + let streamMap = new Map>() + + for (const topic of options.topics) { + const assignment = this.assignments!.find(assignment => assignment.topic === topic) + if (!assignment) { + continue + } + + for (const partition of assignment.partitions) { + const stream = new MessagesStream( + this, + options as ConsumeByPartitionOptions, + { topic, partition } + ) + + this.#streams.add(stream) + + this.#metricActiveStreams?.inc() + stream.once('close', () => { + this.#metricActiveStreams?.dec() + this.topics.untrack(topic) + this.#streams.delete(stream) + }) + + streamMap.set(`${topic}:${partition}`, stream) + } + } + + this.on('consumer:group:join', () => { + // calculate new list of topic-partitions vs. the keys of current streamMap + // if new list is identical to old list, do nothing + // if lists are different, create new streamMap and fill it with streams from + // old map that are still in new list of topic-partitions, plus new streams for + // the new topic-partitions (if any) + // replace streamMap let var with new map, and pass it to the onAssignmentChange callback + const oldTopicPartitions = Array.from(streamMap.keys()) + const newTopicPartitions: string[] = [] + + for (const topic of options.topics) { + const assignment = this.assignments!.find(assignment => assignment.topic === topic) + if (!assignment) { + continue + } + + for (const partition of assignment.partitions) { + newTopicPartitions.push(`${topic}:${partition}`) + } + } + + if (oldTopicPartitions.length === newTopicPartitions.length && oldTopicPartitions.sort().join(',') === newTopicPartitions.sort().join(',')) { + return + } + + const newStreamMap = new Map>() + + for (const topicPartition of newTopicPartitions) { + const [topic, partition] = topicPartition.split(':') + const stream = streamMap.get(topicPartition) + if (stream) { + newStreamMap.set(topicPartition, stream) + } else { + const stream = new MessagesStream( + this, + options as ConsumeByPartitionOptions, + { topic, partition: parseInt(partition, 10) } + ) + + this.#streams.add(stream) + + this.#metricActiveStreams?.inc() + stream.once('close', () => { + this.#metricActiveStreams?.dec() + this.topics.untrack(topic) + this.#streams.delete(stream) + }) + + newStreamMap.set(topicPartition, stream) + } + } + + streamMap = newStreamMap + options.onAssignmentChange(streamMap) + }) + + callback(null, streamMap) + } + #performFindGroupCoordinator (callback: CallbackWithPromise): void { this[kPerformDeduplicated]( 'findGroupCoordinator', diff --git a/src/clients/consumer/messages-stream.ts b/src/clients/consumer/messages-stream.ts index 4140a9c..b17d114 100644 --- a/src/clients/consumer/messages-stream.ts +++ b/src/clients/consumer/messages-stream.ts @@ -23,6 +23,7 @@ import { type Deserializer, type DeserializerWithHeaders } from '../serde.ts' import { type Consumer } from './consumer.ts' import { defaultConsumerOptions } from './options.ts' import { + type ConsumeByPartitionOptions, MessagesStreamFallbackModes, MessagesStreamModes, type CommitOptionsPartition, @@ -51,6 +52,7 @@ export class MessagesStream extends Readable #maxFetches: number #options: ConsumeOptions #topics: string[] + #topicPartition?: { topic: string, partition: number } #offsetsToFetch: Map #offsetsToCommit: Map #inflightNodes: Set @@ -68,7 +70,8 @@ export class MessagesStream extends Readable constructor ( consumer: Consumer, - options: ConsumeOptions + options: ConsumeByPartitionOptions, + topicPartition?: { topic: string, partition: number } ) { const { autocommit, @@ -80,6 +83,7 @@ export class MessagesStream extends Readable onCorruptedMessage, // The options below are only destructured to avoid being part of structuredClone below partitionAssigner: _partitionAssigner, + onAssignmentChange: _onAssignmentChange, ...otherOptions } = options @@ -100,6 +104,7 @@ export class MessagesStream extends Readable this.#fetches = 0 this.#maxFetches = maxFetches ?? 0 this.#topics = structuredClone(options.topics) + this.#topicPartition = topicPartition this.#inflightNodes = new Set() this.#keyDeserializer = deserializers?.key ?? (noopDeserializer as Deserializer) this.#valueDeserializer = deserializers?.value ?? (noopDeserializer as Deserializer) @@ -115,7 +120,9 @@ export class MessagesStream extends Readable this.#offsetsToFetch = new Map() if (offsets) { for (const { topic, partition, offset } of offsets) { - this.#offsetsToFetch.set(`${topic}:${partition}`, offset) + if (topicPartition && topicPartition.topic === topic && topicPartition.partition === partition) { + this.#offsetsToFetch.set(`${topic}:${partition}`, offset) + } } } @@ -361,7 +368,7 @@ export class MessagesStream extends Readable // Group topic-partitions by the destination broker const requestedOffsets = new Map() - for (const topic of this.#topics) { + for (const topic of this.#topicPartition ? [this.#topicPartition.topic] : this.#topics) { const assignment = this.#assignmentsForTopic(topic) // This consumer has no assignment for the topic, continue @@ -614,7 +621,8 @@ export class MessagesStream extends Readable // List topic offsets this.#consumer.listOffsets( { - topics: this.#topics, + topics: this.#topicPartition ? [this.#topicPartition.topic] : this.#topics, + partitions: this.#topicPartition ? { [this.#topicPartition.topic]: [this.#topicPartition.partition] } : undefined, timestamp: this.#mode === MessagesStreamModes.EARLIEST || (this.#mode !== MessagesStreamModes.LATEST && this.#fallbackMode === MessagesStreamFallbackModes.EARLIEST) @@ -640,7 +648,7 @@ export class MessagesStream extends Readable // Now restore group offsets const topics: GroupAssignment[] = [] - for (const topic of this.#topics) { + for (const topic of this.#topicPartition ? [this.#topicPartition.topic] : this.#topics) { const assignment = this.#assignmentsForTopic(topic) if (!assignment) { @@ -704,7 +712,28 @@ export class MessagesStream extends Readable } #assignmentsForTopic (topic: string): GroupAssignment | undefined { - return this.#consumer.assignments!.find(assignment => assignment.topic === topic) + if (this.#topicPartition) { + const topicPartition = this.#topicPartition + const assignment = this.#consumer.assignments!.find(assignment => assignment.topic === topic && assignment.partitions.includes(topicPartition.partition)) + + if (assignment) { + // The consumer still has this topic partition assigned, + // so return the assignment and continue working + return { + topic: topicPartition.topic, + partitions: [topicPartition.partition] + } + } else { + // The consumer no longer has this topic partition assigned to it, + // since this stream is specific to this topic partition, + // close the stream so we stop fetching and downstream consumers + // can clean themselves up. + this.close() + return undefined + } + } else { + return this.#consumer.assignments!.find(assignment => assignment.topic === topic) + } } #invokeCloseCallbacks (error: Error | null) { diff --git a/src/clients/consumer/options.ts b/src/clients/consumer/options.ts index 51b1b40..e5d5aa5 100644 --- a/src/clients/consumer/options.ts +++ b/src/clients/consumer/options.ts @@ -54,7 +54,7 @@ export const groupOptionsAdditionalValidations = { } } -export const consumeOptionsProperties = { +export const baseConsumeOptionsProperties = { autocommit: { oneOf: [{ type: 'boolean' }, { type: 'number', minimum: 100 }] }, minBytes: { type: 'number', minimum: 0 }, maxBytes: { type: 'number', minimum: 0 }, @@ -70,36 +70,52 @@ export const groupOptionsSchema = { additionalProperties: true // This is needed as we might forward options from consume } +export const consumeOptionsProperties = { + topics: { type: 'array', items: idProperty }, + mode: { type: 'string', enum: allowedMessagesStreamModes }, + fallbackMode: { type: 'string', enum: allowedMessagesStreamFallbackModes }, + maxFetches: { type: 'number', minimum: 0, default: 0 }, + offsets: { + type: 'array', + items: { + type: 'object', + properties: topicWithPartitionAndOffsetProperties, + required: ['topic', 'partition', 'offset'], + additionalProperties: false + } + }, + onCorruptedMessage: { function: true }, +} + export const consumeOptionsSchema = { type: 'object', properties: { - topics: { type: 'array', items: idProperty }, - mode: { type: 'string', enum: allowedMessagesStreamModes }, - fallbackMode: { type: 'string', enum: allowedMessagesStreamFallbackModes }, - maxFetches: { type: 'number', minimum: 0, default: 0 }, - offsets: { - type: 'array', - items: { - type: 'object', - properties: topicWithPartitionAndOffsetProperties, - required: ['topic', 'partition', 'offset'], - additionalProperties: false - } - }, - onCorruptedMessage: { function: true }, ...groupOptionsProperties, - ...consumeOptionsProperties + ...baseConsumeOptionsProperties, + ...consumeOptionsProperties, }, required: ['topics'], additionalProperties: false } +export const consumeByPartitionOptionsSchema = { + type: 'object', + properties: { + ...groupOptionsProperties, + ...baseConsumeOptionsProperties, + ...consumeOptionsProperties, + onAssignmentChange: { function: true } + }, + required: ['topics', 'onAssignmentChange'], + additionalProperties: false +} + export const consumerOptionsSchema = { type: 'object', properties: { groupId: idProperty, ...groupOptionsProperties, - ...consumeOptionsProperties + ...baseConsumeOptionsProperties }, required: ['groupId'], additionalProperties: true @@ -134,7 +150,7 @@ export const fetchOptionsSchema = { } }, ...groupOptionsProperties, - ...consumeOptionsProperties + ...baseConsumeOptionsProperties }, required: ['node', 'topics'], additionalProperties: false @@ -221,6 +237,7 @@ export const groupIdAndOptionsValidator = ajv.compile({ }) export const consumeOptionsValidator = ajv.compile(consumeOptionsSchema) +export const consumeByPartitionOptionsValidator = ajv.compile(consumeByPartitionOptionsSchema) export const consumerOptionsValidator = ajv.compile(consumerOptionsSchema) export const fetchOptionsValidator = ajv.compile(fetchOptionsSchema) export const commitOptionsValidator = ajv.compile(commitOptionsSchema) diff --git a/src/clients/consumer/types.ts b/src/clients/consumer/types.ts index 6238921..826ab42 100644 --- a/src/clients/consumer/types.ts +++ b/src/clients/consumer/types.ts @@ -3,6 +3,7 @@ import { type FetchIsolationLevel } from '../../apis/enumerations.ts' import { type KafkaRecord, type Message } from '../../protocol/records.ts' import { type BaseOptions, type ClusterMetadata, type TopicWithPartitionAndOffset } from '../base/types.ts' import { type Deserializers } from '../serde.ts' +import type { MessagesStream } from './messages-stream.ts' export interface GroupProtocolSubscription { name: string @@ -100,6 +101,10 @@ export type ConsumeOptions = StreamOptions & ConsumeBaseOptions & GroupOptions +export type ConsumeByPartitionOptions = ConsumeOptions & { + onAssignmentChange: (streams: Map>) => void +} + export type ConsumerOptions = BaseOptions & { groupId: string } & GroupOptions & ConsumeBaseOptions