diff --git a/packages/microservices/interfaces/microservice-configuration.interface.ts b/packages/microservices/interfaces/microservice-configuration.interface.ts index 809486914f5..b5a1d8c5117 100644 --- a/packages/microservices/interfaces/microservice-configuration.interface.ts +++ b/packages/microservices/interfaces/microservice-configuration.interface.ts @@ -347,5 +347,17 @@ export interface KafkaOptions { deserializer?: Deserializer; parser?: KafkaParserConfig; producerOnlyMode?: boolean; + /** + * When enabled, creates a separate Kafka consumer per registered topic, + * allowing concurrent message processing across topics. + * Each consumer uses `{groupId}-{topic}` as its consumer group ID, + * providing independent offset tracking per topic. + * + * Note: topic name is appended as groupId suffix. Ensure topic names + * contain only valid Kafka consumer group ID characters (alphanumeric, '.', '_', '-'). + * + * @default false + */ + topicConsumers?: boolean; }; } diff --git a/packages/microservices/server/server-kafka.ts b/packages/microservices/server/server-kafka.ts index 939096e6443..594329b3132 100644 --- a/packages/microservices/server/server-kafka.ts +++ b/packages/microservices/server/server-kafka.ts @@ -35,8 +35,6 @@ import { import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer'; import { Server } from './server'; -let kafkaPackage: any = {}; - /** * @publicApi */ @@ -46,6 +44,7 @@ export class ServerKafka extends Server { protected logger = new Logger(ServerKafka.name); protected client: Kafka | null = null; protected consumer: Consumer | null = null; + protected consumers: Map = new Map(); protected producer: Producer | null = null; protected parser: KafkaParser | null = null; protected brokers: string[] | BrokersFunction; @@ -75,10 +74,6 @@ export class ServerKafka extends Server { (clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId; this.groupId = (consumerOptions.groupId || KAFKA_DEFAULT_GROUP) + postfixId; - kafkaPackage = this.loadPackage('kafkajs', ServerKafka.name, () => - require('kafkajs'), - ); - this.parser = new KafkaParser((options && options.parser) || undefined); this.initializeSerializer(options); @@ -89,7 +84,7 @@ export class ServerKafka extends Server { callback: (err?: unknown, ...optionalParams: unknown[]) => void, ): Promise { try { - this.client = this.createClient(); + this.client = await this.createClient(); await this.start(callback); } catch (err) { callback(err); @@ -97,6 +92,12 @@ export class ServerKafka extends Server { } public async close(): Promise { + if (this.consumers.size > 0) { + await Promise.allSettled( + [...this.consumers.values()].map(consumer => consumer.disconnect()), + ); + this.consumers.clear(); + } this.consumer && (await this.consumer.disconnect()); this.producer && (await this.producer.disconnect()); this.consumer = null; @@ -105,17 +106,24 @@ export class ServerKafka extends Server { } public async start(callback: () => void): Promise { - const consumerOptions = Object.assign(this.options.consumer || {}, { + const consumerOptions = Object.assign({}, this.options.consumer || {}, { groupId: this.groupId, }); - this.consumer = this.client!.consumer(consumerOptions); - this.producer = this.client!.producer(this.options.producer); - this.registerConsumerEventListeners(); - this.registerProducerEventListeners(); - - await this.consumer.connect(); - await this.producer.connect(); - await this.bindEvents(this.consumer); + + if (this.getOptionsProp(this.options, 'topicConsumers', false)) { + this.producer = this.client!.producer(this.options.producer); + this.registerProducerEventListeners(); + await this.producer.connect(); + await this.bindEventsPerTopic(consumerOptions); + } else { + this.consumer = this.client!.consumer(consumerOptions); + this.producer = this.client!.producer(this.options.producer); + this.registerConsumerEventListeners(); + this.registerProducerEventListeners(); + await this.consumer.connect(); + await this.producer.connect(); + await this.bindEvents(this.consumer); + } callback(); } @@ -123,19 +131,23 @@ export class ServerKafka extends Server { if (!this.consumer) { return; } - this.consumer.on(this.consumer.events.CONNECT, () => + this.registerConsumerEventListenersFor(this.consumer); + } + + protected registerConsumerEventListenersFor(consumer: Consumer) { + consumer.on(consumer.events.CONNECT, () => this._status$.next(KafkaStatus.CONNECTED), ); - this.consumer.on(this.consumer.events.DISCONNECT, () => + consumer.on(consumer.events.DISCONNECT, () => this._status$.next(KafkaStatus.DISCONNECTED), ); - this.consumer.on(this.consumer.events.REBALANCING, () => + consumer.on(consumer.events.REBALANCING, () => this._status$.next(KafkaStatus.REBALANCING), ); - this.consumer.on(this.consumer.events.STOP, () => + consumer.on(consumer.events.STOP, () => this._status$.next(KafkaStatus.STOPPED), ); - this.consumer.on(this.consumer.events.CRASH, () => + consumer.on(consumer.events.CRASH, () => this._status$.next(KafkaStatus.CRASHED), ); } @@ -152,14 +164,19 @@ export class ServerKafka extends Server { ); } - public createClient(): T { + public async createClient(): Promise { + const kafkaPackage = await this.loadPackage( + 'kafkajs', + ServerKafka.name, + () => import('kafkajs'), + ); return new kafkaPackage.Kafka( Object.assign( { logCreator: KafkaLogger.bind(null, this.logger) }, this.options.client, { clientId: this.clientId, brokers: this.brokers }, ) as KafkaConfig, - ); + ) as T; } public async bindEvents(consumer: Consumer) { @@ -167,18 +184,58 @@ export class ServerKafka extends Server { const consumerSubscribeOptions = this.options.subscribe || {}; if (registeredPatterns.length > 0) { - await this.consumer!.subscribe({ + await consumer.subscribe({ ...consumerSubscribeOptions, topics: registeredPatterns, }); } - const consumerRunOptions = Object.assign(this.options.run || {}, { + const consumerRunOptions = { + ...(this.options.run || {}), eachMessage: this.getMessageHandler(), - }); + }; await consumer.run(consumerRunOptions); } + public async bindEventsPerTopic(consumerOptions: ConsumerConfig) { + const registeredPatterns = [...this.messageHandlers.keys()]; + const consumerSubscribeOptions = this.options.subscribe || {}; + + try { + for (const topic of registeredPatterns) { + const composedGroupId = `${this.groupId}-${topic}`; + if (!/^[a-zA-Z0-9._-]{1,255}$/.test(composedGroupId)) { + this.logger.warn( + `Consumer group ID "${composedGroupId}" may be invalid: ` + + `must be 1–255 characters and contain only alphanumeric, '.', '_', or '-'.`, + ); + } + const topicConsumer = this.client!.consumer({ + ...consumerOptions, + groupId: composedGroupId, + }); + this.registerConsumerEventListenersFor(topicConsumer); + await topicConsumer.connect(); + this.consumers.set(topic, topicConsumer); + await topicConsumer.subscribe({ + ...consumerSubscribeOptions, + topics: [topic], + }); + const consumerRunOptions = { + ...(this.options.run || {}), + eachMessage: this.getMessageHandler(), + }; + await topicConsumer.run(consumerRunOptions); + } + } catch (err) { + await Promise.allSettled( + [...this.consumers.values()].map(consumer => consumer.disconnect()), + ); + this.consumers.clear(); + throw err; + } + } + public getMessageHandler() { return async (payload: EachMessagePayload) => this.handleMessage(payload); } @@ -213,11 +270,12 @@ export class ServerKafka extends Server { const replyPartition = headers[KafkaHeaders.REPLY_PARTITION]; const packet = await this.deserializer.deserialize(rawMessage, { channel }); + const consumer = this.consumers.get(payload.topic) ?? this.consumer!; const kafkaContext = new KafkaContext([ rawMessage, payload.partition, payload.topic, - this.consumer!, + consumer, payload.heartbeat, this.producer!, ]); @@ -263,6 +321,9 @@ export class ServerKafka extends Server { 'Not initialized. Please call the "listen"/"startAllMicroservices" method before accessing the server.', ); } + if (this.consumers.size > 0) { + return [this.client, this.consumers, this.producer] as T; + } return [this.client, this.consumer, this.producer] as T; } diff --git a/packages/microservices/test/server/server-kafka.spec.ts b/packages/microservices/test/server/server-kafka.spec.ts index 855299200cf..bfb040d5d81 100644 --- a/packages/microservices/test/server/server-kafka.spec.ts +++ b/packages/microservices/test/server/server-kafka.spec.ts @@ -535,19 +535,214 @@ describe('ServerKafka', () => { }); }); + describe('topicConsumers mode', () => { + const mockConsumerEvents = { + CONNECT: 'consumer.connect', + DISCONNECT: 'consumer.disconnect', + STOP: 'consumer.stop', + CRASH: 'consumer.crash', + REBALANCING: 'consumer.rebalancing', + }; + + let perTopicServer: ServerKafka; + let perTopicUntyped: any; + let perTopicConnect: sinon.SinonStub; + let perTopicSubscribe: sinon.SinonStub; + let perTopicRun: sinon.SinonStub; + let perTopicOn: sinon.SinonStub; + let perTopicConsumerFactory: sinon.SinonStub; + + beforeEach(() => { + perTopicServer = new ServerKafka({ topicConsumers: true }); + perTopicUntyped = perTopicServer as any; + + perTopicConnect = sinon.stub(); + perTopicSubscribe = sinon.stub(); + perTopicRun = sinon.stub(); + perTopicOn = sinon.stub(); + + const mockConsumer = () => ({ + connect: perTopicConnect, + subscribe: perTopicSubscribe, + run: perTopicRun, + on: perTopicOn, + events: mockConsumerEvents, + }); + + perTopicConsumerFactory = sinon.stub().callsFake(mockConsumer); + + sinon.stub(perTopicServer, 'createClient').resolves({ + consumer: perTopicConsumerFactory, + producer: sinon.stub().returns({ + connect: perTopicConnect, + send: sinon.stub(), + on: perTopicOn, + events: { + CONNECT: 'producer.connect', + DISCONNECT: 'producer.disconnect', + }, + }), + } as any); + }); + + afterEach(() => sinon.restore()); + + describe('bindEventsPerTopic', () => { + it('should create a separate consumer for each registered topic', async () => { + perTopicUntyped.messageHandlers = objectToMap({ + 'topic-a': sinon.stub(), + 'topic-b': sinon.stub(), + }); + + await perTopicServer.listen(sinon.stub()); + + expect(perTopicConsumerFactory.callCount).to.equal(2); + }); + + it('should suffix groupId with topic name for each consumer', async () => { + perTopicUntyped.messageHandlers = objectToMap({ + 'topic-a': sinon.stub(), + 'topic-b': sinon.stub(), + }); + + await perTopicServer.listen(sinon.stub()); + + const groupIds = perTopicConsumerFactory.args.map( + args => args[0].groupId, + ); + expect(groupIds.some(id => id.endsWith('-topic-a'))).to.be.true; + expect(groupIds.some(id => id.endsWith('-topic-b'))).to.be.true; + }); + + it('should subscribe each consumer to exactly one topic', async () => { + perTopicUntyped.messageHandlers = objectToMap({ + 'topic-a': sinon.stub(), + 'topic-b': sinon.stub(), + }); + + await perTopicServer.listen(sinon.stub()); + + expect(perTopicSubscribe.callCount).to.equal(2); + perTopicSubscribe.args.forEach(args => { + expect(args[0].topics.length).to.equal(1); + }); + const subscribedTopics = perTopicSubscribe.args + .map(args => args[0].topics[0]) + .sort(); + expect(subscribedTopics).to.deep.equal(['topic-a', 'topic-b']); + }); + + it('should call run on each per-topic consumer', async () => { + perTopicUntyped.messageHandlers = objectToMap({ + 'topic-a': sinon.stub(), + 'topic-b': sinon.stub(), + }); + + await perTopicServer.listen(sinon.stub()); + + expect(perTopicRun.callCount).to.equal(2); + perTopicRun.args.forEach(args => { + expect(args[0]).to.have.property('eachMessage'); + }); + }); + + it('should populate consumers map with one entry per topic', async () => { + perTopicUntyped.messageHandlers = objectToMap({ + 'topic-a': sinon.stub(), + 'topic-b': sinon.stub(), + }); + + await perTopicServer.listen(sinon.stub()); + + expect(perTopicUntyped.consumers.size).to.equal(2); + expect(perTopicUntyped.consumers.has('topic-a')).to.be.true; + expect(perTopicUntyped.consumers.has('topic-b')).to.be.true; + }); + + it('should not create any consumer when there are no messageHandlers', async () => { + await perTopicServer.listen(sinon.stub()); + + expect(perTopicConsumerFactory.called).to.be.false; + expect(perTopicUntyped.consumers.size).to.equal(0); + }); + + it('should clean up connected consumers and rethrow when a topic connect fails', async () => { + const disconnectOk = sinon.stub(); + const connectError = new Error('connect failed'); + let callCount = 0; + + perTopicConsumerFactory.callsFake(() => ({ + connect: sinon.stub().callsFake(() => { + callCount++; + if (callCount === 2) throw connectError; + }), + subscribe: sinon.stub(), + run: sinon.stub(), + on: perTopicOn, + events: mockConsumerEvents, + disconnect: disconnectOk, + })); + + perTopicUntyped.messageHandlers = objectToMap({ + 'topic-a': sinon.stub(), + 'topic-b': sinon.stub(), + }); + + const cb = sinon.stub(); + await perTopicServer.listen(cb); + + expect(cb.calledWith(connectError)).to.be.true; + expect(disconnectOk.calledOnce).to.be.true; + expect(perTopicUntyped.consumers.size).to.equal(0); + }); + }); + + describe('close with topicConsumers', () => { + it('should disconnect all per-topic consumers and null refs', async () => { + const disconnectA = sinon.stub(); + const disconnectB = sinon.stub(); + perTopicUntyped.consumers = new Map([ + ['topic-a', { disconnect: disconnectA }], + ['topic-b', { disconnect: disconnectB }], + ]); + perTopicUntyped.producer = { disconnect: sinon.stub() }; + + await perTopicServer.close(); + + expect(disconnectA.calledOnce).to.be.true; + expect(disconnectB.calledOnce).to.be.true; + expect(perTopicUntyped.consumers.size).to.equal(0); + expect(perTopicUntyped.producer).to.be.null; + expect(perTopicUntyped.client).to.be.null; + }); + }); + }); + describe('createClient', () => { - it('should accept a custom logCreator in client options', () => { + it('should accept a custom logCreator in client options', async () => { const logCreatorSpy = sinon.spy(() => 'test'); const logCreator = () => logCreatorSpy; + class MockKafka { + private logFn: any; + constructor({ logCreator: lc }: any) { + this.logFn = lc(1); + } + logger() { + return { info: (entry: any) => this.logFn(entry) }; + } + } + server = new ServerKafka({ client: { brokers: [], logCreator, }, }); + sinon.stub(server as any, 'loadPackage').resolves({ Kafka: MockKafka }); - const logger = server.createClient().logger(); + const kafkaClient = await server.createClient(); + const logger = kafkaClient.logger(); logger.info({ namespace: '', level: 1, log: 'test' });