Skip to content

Commit faf3e6a

Browse files
Merge pull request #14540 from nestjs/feat/rmq-topic-exchange
feat(microservices): add support for topic exchange (rabbitmq)
2 parents 23ef863 + d2948d9 commit faf3e6a

File tree

6 files changed

+335
-68
lines changed

6 files changed

+335
-68
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { INestApplication } from '@nestjs/common';
2+
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
3+
import { Test } from '@nestjs/testing';
4+
import * as request from 'supertest';
5+
import { RMQTopicExchangeController } from '../src/rmq/topic-exchange-rmq.controller';
6+
7+
describe('RabbitMQ transport (Topic Exchange - wildcards)', () => {
8+
let server: any;
9+
let app: INestApplication;
10+
11+
beforeEach(async () => {
12+
const module = await Test.createTestingModule({
13+
controllers: [RMQTopicExchangeController],
14+
}).compile();
15+
16+
app = module.createNestApplication();
17+
server = app.getHttpAdapter().getInstance();
18+
19+
app.connectMicroservice<MicroserviceOptions>({
20+
transport: Transport.RMQ,
21+
options: {
22+
urls: [`amqp://0.0.0.0:5672`],
23+
queue: 'test2',
24+
wildcards: true,
25+
},
26+
});
27+
await app.startAllMicroservices();
28+
await app.init();
29+
});
30+
31+
it(`should send message to wildcard topic exchange`, () => {
32+
return request(server).get('/topic-exchange').expect(200, 'wildcard.a.b');
33+
});
34+
35+
afterEach(async () => {
36+
await app.close();
37+
});
38+
});
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { Controller, Get } from '@nestjs/common';
2+
import {
3+
ClientProxy,
4+
ClientProxyFactory,
5+
Ctx,
6+
MessagePattern,
7+
RmqContext,
8+
Transport,
9+
} from '@nestjs/microservices';
10+
import { lastValueFrom } from 'rxjs';
11+
12+
@Controller()
13+
export class RMQTopicExchangeController {
14+
client: ClientProxy;
15+
16+
constructor() {
17+
this.client = ClientProxyFactory.create({
18+
transport: Transport.RMQ,
19+
options: {
20+
urls: [`amqp://localhost:5672`],
21+
queue: 'test2',
22+
wildcards: true,
23+
},
24+
});
25+
}
26+
27+
@Get('topic-exchange')
28+
async topicExchange() {
29+
return lastValueFrom(this.client.send<string>('wildcard.a.b', 1));
30+
}
31+
32+
@MessagePattern('wildcard.*.*')
33+
handleTopicExchange(@Ctx() ctx: RmqContext): string {
34+
return ctx.getPattern();
35+
}
36+
}

packages/microservices/client/client-rmq.ts

Lines changed: 94 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
/* eslint-disable @typescript-eslint/no-redundant-type-constituents */
12
import { Logger } from '@nestjs/common/services/logger.service';
23
import { loadPackage } from '@nestjs/common/utils/load-package.util';
34
import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util';
4-
import { isFunction } from '@nestjs/common/utils/shared.utils';
5+
import { isFunction, isString } from '@nestjs/common/utils/shared.utils';
56
import { EventEmitter } from 'events';
67
import {
78
EmptyError,
@@ -55,8 +56,8 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
5556
protected readonly logger = new Logger(ClientProxy.name);
5657
protected connection$: ReplaySubject<any>;
5758
protected connectionPromise: Promise<void>;
58-
protected client: AmqpConnectionManager = null;
59-
protected channel: ChannelWrapper = null;
59+
protected client: AmqpConnectionManager | null = null;
60+
protected channel: ChannelWrapper | null = null;
6061
protected pendingEventListeners: Array<{
6162
event: keyof RmqEvents;
6263
callback: RmqEvents[keyof RmqEvents];
@@ -113,7 +114,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
113114
this.registerDisconnectListener(this.client);
114115
this.registerConnectListener(this.client);
115116
this.pendingEventListeners.forEach(({ event, callback }) =>
116-
this.client.on(event, callback),
117+
this.client!.on(event, callback),
117118
);
118119
this.pendingEventListeners = [];
119120

@@ -140,7 +141,7 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
140141

141142
public createChannel(): Promise<void> {
142143
return new Promise(resolve => {
143-
this.channel = this.client.createChannel({
144+
this.channel = this.client!.createChannel({
144145
json: false,
145146
setup: (channel: Channel) => this.setupChannel(channel, resolve),
146147
});
@@ -215,6 +216,22 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
215216
);
216217
}
217218

219+
if (this.options.wildcards) {
220+
const exchange = this.getOptionsProp(
221+
this.options,
222+
'exchange',
223+
this.options.queue,
224+
);
225+
const exchangeType = this.getOptionsProp(
226+
this.options,
227+
'exchangeType',
228+
'topic',
229+
);
230+
await channel.assertExchange(exchange, exchangeType, {
231+
durable: true,
232+
});
233+
}
234+
218235
await channel.prefetch(prefetchCount, isGlobalPrefetchCount);
219236
await this.consumeChannel(channel);
220237
resolve();
@@ -224,8 +241,8 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
224241
const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK);
225242
await channel.consume(
226243
this.replyQueue,
227-
(msg: ConsumeMessage) =>
228-
this.responseEmitter.emit(msg.properties.correlationId, msg),
244+
(msg: ConsumeMessage | null) =>
245+
this.responseEmitter.emit(msg!.properties.correlationId, msg),
229246
{
230247
noAck,
231248
},
@@ -359,23 +376,44 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
359376
delete serializedPacket.options;
360377

361378
this.responseEmitter.on(correlationId, listener);
362-
this.channel
363-
.sendToQueue(
379+
380+
const content = Buffer.from(JSON.stringify(serializedPacket));
381+
const sendOptions = {
382+
replyTo: this.replyQueue,
383+
persistent: this.getOptionsProp(
384+
this.options,
385+
'persistent',
386+
RQM_DEFAULT_PERSISTENT,
387+
),
388+
...options,
389+
headers: this.mergeHeaders(options?.headers),
390+
correlationId,
391+
};
392+
393+
if (this.options.wildcards) {
394+
const stringifiedPattern = isString(message.pattern)
395+
? message.pattern
396+
: JSON.stringify(message.pattern);
397+
398+
// The exchange is the same as the queue when wildcards are enabled
399+
// and the exchange is not explicitly set
400+
const exchange = this.getOptionsProp(
401+
this.options,
402+
'exchange',
364403
this.queue,
365-
Buffer.from(JSON.stringify(serializedPacket)),
366-
{
367-
replyTo: this.replyQueue,
368-
persistent: this.getOptionsProp(
369-
this.options,
370-
'persistent',
371-
RQM_DEFAULT_PERSISTENT,
372-
),
373-
...options,
374-
headers: this.mergeHeaders(options?.headers),
375-
correlationId,
376-
},
377-
)
378-
.catch(err => callback({ err }));
404+
);
405+
406+
this.channel!.publish(
407+
exchange,
408+
stringifiedPattern,
409+
content,
410+
sendOptions,
411+
).catch(err => callback({ err }));
412+
} else {
413+
this.channel!.sendToQueue(this.queue, content, sendOptions).catch(err =>
414+
callback({ err }),
415+
);
416+
}
379417
return () => this.responseEmitter.removeListener(correlationId, listener);
380418
} catch (err) {
381419
callback({ err });
@@ -390,22 +428,39 @@ export class ClientRMQ extends ClientProxy<RmqEvents, RmqStatus> {
390428
const options = serializedPacket.options;
391429
delete serializedPacket.options;
392430

393-
return new Promise<void>((resolve, reject) =>
394-
this.channel.sendToQueue(
395-
this.queue,
396-
Buffer.from(JSON.stringify(serializedPacket)),
397-
{
398-
persistent: this.getOptionsProp(
399-
this.options,
400-
'persistent',
401-
RQM_DEFAULT_PERSISTENT,
402-
),
403-
...options,
404-
headers: this.mergeHeaders(options?.headers),
405-
},
406-
(err: unknown) => (err ? reject(err as Error) : resolve()),
407-
),
408-
);
431+
return new Promise<void>((resolve, reject) => {
432+
const content = Buffer.from(JSON.stringify(serializedPacket));
433+
const sendOptions = {
434+
persistent: this.getOptionsProp(
435+
this.options,
436+
'persistent',
437+
RQM_DEFAULT_PERSISTENT,
438+
),
439+
...options,
440+
headers: this.mergeHeaders(options?.headers),
441+
};
442+
const errorCallback = (err: unknown) =>
443+
err ? reject(err as Error) : resolve();
444+
445+
return this.options.wildcards
446+
? this.channel!.publish(
447+
// The exchange is the same as the queue when wildcards are enabled
448+
// and the exchange is not explicitly set
449+
this.getOptionsProp(this.options, 'exchange', this.queue),
450+
isString(packet.pattern)
451+
? packet.pattern
452+
: JSON.stringify(packet.pattern),
453+
content,
454+
sendOptions,
455+
errorCallback,
456+
)
457+
: this.channel!.sendToQueue(
458+
this.queue,
459+
content,
460+
sendOptions,
461+
errorCallback,
462+
);
463+
});
409464
}
410465

411466
protected initializeSerializer(options: RmqOptions['options']) {

packages/microservices/interfaces/microservice-configuration.interface.ts

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,22 +215,89 @@ export interface NatsOptions {
215215
export interface RmqOptions {
216216
transport?: Transport.RMQ;
217217
options?: {
218+
/**
219+
* An array of connection URLs to try in order.
220+
*/
218221
urls?: string[] | RmqUrl[];
222+
/**
223+
* The name of the queue.
224+
*/
219225
queue?: string;
226+
/**
227+
* A prefetch count for this channel. The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement;
228+
* once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged.
229+
*/
220230
prefetchCount?: number;
231+
/**
232+
* Sets the per-channel behavior for prefetching messages.
233+
*/
221234
isGlobalPrefetchCount?: boolean;
235+
/**
236+
* Amqplib queue options.
237+
* @see https://amqp-node.github.io/amqplib/channel_api.html#channel_assertQueue
238+
*/
222239
queueOptions?: AmqplibQueueOptions;
240+
/**
241+
* AMQP Connection Manager socket options.
242+
*/
223243
socketOptions?: AmqpConnectionManagerSocketOptions;
224-
exchange?: string;
225-
routingKey?: string;
244+
/**
245+
* Iif true, the broker won’t expect an acknowledgement of messages delivered to this consumer; i.e., it will dequeue messages as soon as they’ve been sent down the wire.
246+
* @default false
247+
*/
226248
noAck?: boolean;
249+
/**
250+
* A name which the server will use to distinguish message deliveries for the consumer; mustn’t be already in use on the channel. It’s usually easier to omit this, in which case the server will create a random name and supply it in the reply.
251+
*/
227252
consumerTag?: string;
253+
/**
254+
* A serializer for the message payload.
255+
*/
228256
serializer?: Serializer;
257+
/**
258+
* A deserializer for the message payload.
259+
*/
229260
deserializer?: Deserializer;
261+
/**
262+
* A reply queue for the producer.
263+
* @default 'amq.rabbitmq.reply-to'
264+
*/
230265
replyQueue?: string;
266+
/**
267+
* If truthy, the message will survive broker restarts provided it’s in a queue that also survives restarts.
268+
*/
231269
persistent?: boolean;
270+
/**
271+
* Additional headers to be sent with every message.
272+
* Applies only to the producer configuration.
273+
*/
232274
headers?: Record<string, string>;
275+
/**
276+
* When false, a queue will not be asserted before consuming.
277+
* @default false
278+
*/
233279
noAssert?: boolean;
280+
/**
281+
* Name for the exchange. Defaults to the queue name when "wildcards" is set to true.
282+
* @default ''
283+
*/
284+
exchange?: string;
285+
/**
286+
* Type of the exchange
287+
* @default 'topic'
288+
*/
289+
exchangeType?: 'direct' | 'fanout' | 'topic' | 'headers';
290+
/**
291+
* Additional routing key for the topic exchange.
292+
*/
293+
routingKey?: string;
294+
/**
295+
* Set to true only if you want to use Topic Exchange for routing messages to queues.
296+
* Enabling this will allow you to use wildcards (*, #) as message and event patterns.
297+
* @see https://www.rabbitmq.com/tutorials/tutorial-five-python#topic-exchange
298+
* @default false
299+
*/
300+
wildcards?: boolean;
234301
/**
235302
* Maximum number of connection attempts.
236303
* Applies only to the consumer configuration.

0 commit comments

Comments
 (0)