Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,5 +347,17 @@ export interface KafkaOptions {
deserializer?: Deserializer;
parser?: KafkaParserConfig;
producerOnlyMode?: boolean;
/**
* When enabled, creates a separate Kafka consumer per registered topic,
* allowing concurrent message processing across topics.
* Each consumer uses `{groupId}-{topic}` as its consumer group ID,
* providing independent offset tracking per topic.
*
* Note: topic name is appended as groupId suffix. Ensure topic names
* contain only valid Kafka consumer group ID characters (alphanumeric, '.', '_', '-').
*
* @default false
*/
topicConsumers?: boolean;
};
}
115 changes: 88 additions & 27 deletions packages/microservices/server/server-kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import {
import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer';
import { Server } from './server';

let kafkaPackage: any = {};

/**
* @publicApi
*/
Expand All @@ -46,6 +44,7 @@ export class ServerKafka extends Server<never, KafkaStatus> {
protected logger = new Logger(ServerKafka.name);
protected client: Kafka | null = null;
protected consumer: Consumer | null = null;
protected consumers: Map<string, Consumer> = new Map();
protected producer: Producer | null = null;
protected parser: KafkaParser | null = null;
protected brokers: string[] | BrokersFunction;
Expand Down Expand Up @@ -75,10 +74,6 @@ export class ServerKafka extends Server<never, KafkaStatus> {
(clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId;
this.groupId = (consumerOptions.groupId || KAFKA_DEFAULT_GROUP) + postfixId;

kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () =>
require('kafkajs'),
);

this.parser = new KafkaParser((options && options.parser) || undefined);

this.initializeSerializer(options);
Expand All @@ -89,14 +84,20 @@ export class ServerKafka extends Server<never, KafkaStatus> {
callback: (err?: unknown, ...optionalParams: unknown[]) => void,
): Promise<void> {
try {
this.client = this.createClient();
this.client = await this.createClient();
await this.start(callback);
} catch (err) {
callback(err);
}
}

public async close(): Promise<void> {
if (this.consumers.size > 0) {
await Promise.allSettled(
[...this.consumers.values()].map(consumer => consumer.disconnect()),
);
this.consumers.clear();
}
this.consumer && (await this.consumer.disconnect());
this.producer && (await this.producer.disconnect());
this.consumer = null;
Expand All @@ -105,37 +106,48 @@ export class ServerKafka extends Server<never, KafkaStatus> {
}

public async start(callback: () => void): Promise<void> {
const consumerOptions = Object.assign(this.options.consumer || {}, {
const consumerOptions = Object.assign({}, this.options.consumer || {}, {
groupId: this.groupId,
});
this.consumer = this.client!.consumer(consumerOptions);
this.producer = this.client!.producer(this.options.producer);
this.registerConsumerEventListeners();
this.registerProducerEventListeners();

await this.consumer.connect();
await this.producer.connect();
await this.bindEvents(this.consumer);

if (this.getOptionsProp(this.options, 'topicConsumers', false)) {
this.producer = this.client!.producer(this.options.producer);
this.registerProducerEventListeners();
await this.producer.connect();
await this.bindEventsPerTopic(consumerOptions);
} else {
this.consumer = this.client!.consumer(consumerOptions);
this.producer = this.client!.producer(this.options.producer);
this.registerConsumerEventListeners();
this.registerProducerEventListeners();
await this.consumer.connect();
await this.producer.connect();
await this.bindEvents(this.consumer);
}
callback();
}

protected registerConsumerEventListeners() {
if (!this.consumer) {
return;
}
this.consumer.on(this.consumer.events.CONNECT, () =>
this.registerConsumerEventListenersFor(this.consumer);
}

protected registerConsumerEventListenersFor(consumer: Consumer) {
consumer.on(consumer.events.CONNECT, () =>
this._status$.next(KafkaStatus.CONNECTED),
);
this.consumer.on(this.consumer.events.DISCONNECT, () =>
consumer.on(consumer.events.DISCONNECT, () =>
this._status$.next(KafkaStatus.DISCONNECTED),
);
this.consumer.on(this.consumer.events.REBALANCING, () =>
consumer.on(consumer.events.REBALANCING, () =>
this._status$.next(KafkaStatus.REBALANCING),
);
this.consumer.on(this.consumer.events.STOP, () =>
consumer.on(consumer.events.STOP, () =>
this._status$.next(KafkaStatus.STOPPED),
);
this.consumer.on(this.consumer.events.CRASH, () =>
consumer.on(consumer.events.CRASH, () =>
this._status$.next(KafkaStatus.CRASHED),
);
}
Expand All @@ -152,33 +164,78 @@ export class ServerKafka extends Server<never, KafkaStatus> {
);
}

public createClient<T = any>(): T {
public async createClient<T = any>(): Promise<T> {
const kafkaPackage = await this.loadPackage(
'kafkajs',
ServerKafka.name,
() => import('kafkajs'),
);
return new kafkaPackage.Kafka(
Object.assign(
{ logCreator: KafkaLogger.bind(null, this.logger) },
this.options.client,
{ clientId: this.clientId, brokers: this.brokers },
) as KafkaConfig,
);
) as T;
}

public async bindEvents(consumer: Consumer) {
const registeredPatterns = [...this.messageHandlers.keys()];
const consumerSubscribeOptions = this.options.subscribe || {};

if (registeredPatterns.length > 0) {
await this.consumer!.subscribe({
await consumer.subscribe({
...consumerSubscribeOptions,
topics: registeredPatterns,
});
}

const consumerRunOptions = Object.assign(this.options.run || {}, {
const consumerRunOptions = {
...(this.options.run || {}),
eachMessage: this.getMessageHandler(),
});
};
await consumer.run(consumerRunOptions);
}

public async bindEventsPerTopic(consumerOptions: ConsumerConfig) {
const registeredPatterns = [...this.messageHandlers.keys()];
const consumerSubscribeOptions = this.options.subscribe || {};

try {
for (const topic of registeredPatterns) {
const composedGroupId = `${this.groupId}-${topic}`;
if (!/^[a-zA-Z0-9._-]{1,255}$/.test(composedGroupId)) {
this.logger.warn(
`Consumer group ID "${composedGroupId}" may be invalid: ` +
`must be 1–255 characters and contain only alphanumeric, '.', '_', or '-'.`,
);
}
const topicConsumer = this.client!.consumer({
...consumerOptions,
groupId: composedGroupId,
});
this.registerConsumerEventListenersFor(topicConsumer);
await topicConsumer.connect();
this.consumers.set(topic, topicConsumer);
await topicConsumer.subscribe({
...consumerSubscribeOptions,
topics: [topic],
});
const consumerRunOptions = {
...(this.options.run || {}),
eachMessage: this.getMessageHandler(),
};
await topicConsumer.run(consumerRunOptions);
}
} catch (err) {
await Promise.allSettled(
[...this.consumers.values()].map(consumer => consumer.disconnect()),
);
this.consumers.clear();
throw err;
}
}

public getMessageHandler() {
return async (payload: EachMessagePayload) => this.handleMessage(payload);
}
Expand Down Expand Up @@ -213,11 +270,12 @@ export class ServerKafka extends Server<never, KafkaStatus> {
const replyPartition = headers[KafkaHeaders.REPLY_PARTITION];

const packet = await this.deserializer.deserialize(rawMessage, { channel });
const consumer = this.consumers.get(payload.topic) ?? this.consumer!;
const kafkaContext = new KafkaContext([
rawMessage,
payload.partition,
payload.topic,
this.consumer!,
consumer,
payload.heartbeat,
this.producer!,
]);
Expand Down Expand Up @@ -263,6 +321,9 @@ export class ServerKafka extends Server<never, KafkaStatus> {
'Not initialized. Please call the "listen"/"startAllMicroservices" method before accessing the server.',
);
}
if (this.consumers.size > 0) {
return [this.client, this.consumers, this.producer] as T;
}
return [this.client, this.consumer, this.producer] as T;
}

Expand Down
Loading
Loading