|
| 1 | +### Custom transporters |
| 2 | + |
| 3 | +Nest provides a variety of **transporters** out-of-the-box, as well as an API allowing developers to build new custom transport strategies. |
| 4 | +Transporters enable you to connect components over a network using a pluggable communications layer and a very simple application-level message protocol (read full [article](https://dev.to/nestjs/integrate-nestjs-with-external-services-using-microservice-transporters-part-1-p3)). |
| 5 | + |
| 6 | +> info **Hint** Building a microservice with Nest does not necessarily mean you must use the `@nestjs/microservices` package. For example, if you want to communicate with external services (let's say other microservices written in different languages), you may not need all the features provided by `@nestjs/microservice` library. |
| 7 | +> In fact, if you don't need decorators (`@EventPattern` or `@MessagePattern`) that let you declaratively define subscribers, running a [Standalone Application](/application-context) and manually maintaining connection/subscribing to channels should be enough for most use-cases and will provide you with more flexibility. |
| 8 | +
|
| 9 | +With a custom transporter, you can integrate any messaging system/protocol (including Google Cloud Pub/Sub, Amazon Kinesis, and others) or extend the existing one, adding extra features on top (for example, [QoS](https://github.com/mqttjs/MQTT.js/blob/master/README.md#qos) for MQTT). |
| 10 | + |
| 11 | +> info **Hint** To better understand how Nest microservices work and how you can extend the capabilities of existing transporters, we recommend reading the [NestJS Microservices in Action]() and [Advanced NestJS Microservices](https://dev.to/nestjs/part-1-introduction-and-setup-1a2l) article series. |
| 12 | +
|
| 13 | +#### Creating a strategy |
| 14 | + |
| 15 | +First, let's define a class representing our custom transporter. |
| 16 | + |
| 17 | +```typescript |
| 18 | +import { CustomTransportStrategy, Server } from '@nestjs/microservices'; |
| 19 | + |
| 20 | +class GoogleCloudPubSubServer |
| 21 | + extends Server |
| 22 | + implements CustomTransportStrategy { |
| 23 | + /** |
| 24 | + * This method is triggered when you run "app.listen()". |
| 25 | + */ |
| 26 | + listen(callback: () => void) { |
| 27 | + callback(); |
| 28 | + } |
| 29 | + |
| 30 | + /** |
| 31 | + * This method is triggered on application shutdown. |
| 32 | + */ |
| 33 | + close() {} |
| 34 | +} |
| 35 | +``` |
| 36 | + |
| 37 | +> warning **Warning** Please, note we won't be implementing a fully-featured Google Cloud Pub/Sub server in this chapter as this would require diving into transporter specific technical details. |
| 38 | +
|
| 39 | +In our example above, we declared the `GoogleCloudPubSubServer` class and provided `listen()` and `close()` methods enforced by the `CustomTransportStrategy` interface. |
| 40 | +Also, our class extends the `Server` class imported from the `@nestjs/microservices` package that provides a few useful methods, for example, methods used by Nest runtime to register message handlers. Alternatively, in case you want to extend the capabilities of an existing transport strategy, you could extend the corresponding server class, for example, `ServerRedis`. |
| 41 | +Conventionally, we added the `"Server"` suffix to our class as it will be responsible for subscribing to messages/events (and responding to them, if necessary). |
| 42 | + |
| 43 | +With this in place, we can now use our custom strategy instead of using a built-in transporter, as follows: |
| 44 | + |
| 45 | +```typescript |
| 46 | +const app = await NestFactory.createMicroservice<MicroserviceOptions>( |
| 47 | + AppModule, |
| 48 | + { |
| 49 | + strategy: new GoogleCloudPubSubServer(), |
| 50 | + }, |
| 51 | +); |
| 52 | +``` |
| 53 | + |
| 54 | +Basically, instead of passing the normal transporter options object with `transport` and `options` properties, we pass a single property, `strategy`, whose value is an instance of our custom transporter class. |
| 55 | + |
| 56 | +Back to our `GoogleCloudPubSubServer` class, in a real-world application, we would be establishing a connection to our message broker/external service and registering subscribers/listening to specific channels in `listen()` method (and then removing subscriptions & closing the connection in the `close()` teardown method), |
| 57 | +but since this requires a good understanding of how Nest microservices communicate with each other, we recommend reading this [article series](https://dev.to/nestjs/part-1-introduction-and-setup-1a2l). |
| 58 | +In this chapter instead, we'll focus on the capabilities the `Server` class provides and how you can leverage them to build custom strategies. |
| 59 | + |
| 60 | +For example, let's say that somewhere in our application, the following message handler is defined: |
| 61 | + |
| 62 | +```typescript |
| 63 | +@MessagePattern('echo') |
| 64 | +echo(@Payload() data: object) { |
| 65 | + return data; |
| 66 | +} |
| 67 | +``` |
| 68 | + |
| 69 | +This message handler will be automatically registered by Nest runtime. With `Server` class, you can see what message patterns have been registered and also, access and execute the actual methods that were assigned to them. |
| 70 | +To test this out, let's add a simple `console.log` inside `listen()` method before `callback` function is called: |
| 71 | + |
| 72 | +```typescript |
| 73 | +listen(callback: () => void) { |
| 74 | + console.log(this.messageHandlers); |
| 75 | + callback(); |
| 76 | +} |
| 77 | +``` |
| 78 | + |
| 79 | +After your application restarts, you'll see the following log in your terminal: |
| 80 | + |
| 81 | +```typescript |
| 82 | +Map { 'echo' => [AsyncFunction] { isEventHandler: false } } |
| 83 | +``` |
| 84 | + |
| 85 | +> info **Hint** If we used the `@EventPattern` decorator, you would see the same output, but with the `isEventHandler` property set to `true`. |
| 86 | +
|
| 87 | +As you can see, the `messageHandlers` property is a `Map` collection of all message (and event) handlers, in which patterns are being used as keys. |
| 88 | +Now, you can use a key (for example, `"echo"`) to receive a reference to the message handler: |
| 89 | + |
| 90 | +```typescript |
| 91 | +async listen(callback: () => void) { |
| 92 | + const echoHandler = this.messageHandlers.get('echo'); |
| 93 | + console.log(await echoHandler('Hello world!')); |
| 94 | + callback(); |
| 95 | +} |
| 96 | +``` |
| 97 | + |
| 98 | +Once we execute the `echoHandler` passing an arbitrary string as an argument (`"Hello world!"` here), we should see it in the console: |
| 99 | + |
| 100 | +```json |
| 101 | +Hello world! |
| 102 | +``` |
| 103 | + |
| 104 | +Which means that our method handler was properly executed. |
| 105 | + |
| 106 | +#### Client proxy |
| 107 | + |
| 108 | +As we mentioned in the first section, you don't necessarily need to use the `@nestjs/microservices` package to create microservices, but if you decide to do so and you need to integrate a custom strategy, you will need to provide a "client" class too. |
| 109 | + |
| 110 | +> info **Hint** Again, implementing a fully-featured client class compatible with all `@nestjs/microservices` features (e.g., streaming) requires a good understading of communication techniques used by the framework. To learn more, check out this [article](https://dev.to/nestjs/part-4-basic-client-component-16f9). |
| 111 | +
|
| 112 | +To communicate with an external service/emit & publish messages (or events) you can either use a library-specific SDK package, or implement a custom client class that extends the `ClientProxy`, as follows: |
| 113 | + |
| 114 | +```typescript |
| 115 | +import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices'; |
| 116 | + |
| 117 | +class GoogleCloudPubSubClient extends ClientProxy { |
| 118 | + async connect(): Promise<any> {} |
| 119 | + async close() {} |
| 120 | + async dispatchEvent(packet: ReadPacket<any>): Promise<any> {} |
| 121 | + publish( |
| 122 | + packet: ReadPacket<any>, |
| 123 | + callback: (packet: WritePacket<any>) => void, |
| 124 | + ): Function {} |
| 125 | +} |
| 126 | +``` |
| 127 | + |
| 128 | +> warning **Warning** Please, note we won't be implementing a fully-featured Google Cloud Pub/Sub client in this chapter as this would require diving into transporter specific technical details. |
| 129 | +
|
| 130 | +As you can see, `ClientProxy` class requires us to provide several methods for establishing & closing the connection and publishing messages (`publish`) and events (`dispatchEvent`). |
| 131 | +Note, if you don't need a request-response communication style support, you can leave the `publish()` method empty. Likewise, if you don't need to support event-based communication, skip the `dispatchEvent()` method. |
| 132 | + |
| 133 | +To observe what and when those methods are executed, let's add multiple `console.log` calls, as follows: |
| 134 | + |
| 135 | +```typescript |
| 136 | +class GoogleCloudPubSubClient extends ClientProxy { |
| 137 | + async connect(): Promise<any> { |
| 138 | + console.log('connect'); |
| 139 | + } |
| 140 | + |
| 141 | + async close() { |
| 142 | + console.log('close'); |
| 143 | + } |
| 144 | + |
| 145 | + async dispatchEvent(packet: ReadPacket<any>): Promise<any> { |
| 146 | + return console.log('event to dispatch: ', packet); |
| 147 | + } |
| 148 | + |
| 149 | + publish( |
| 150 | + packet: ReadPacket<any>, |
| 151 | + callback: (packet: WritePacket<any>) => void, |
| 152 | + ): Function { |
| 153 | + console.log('message:', packet); |
| 154 | + |
| 155 | + // In a real-world application, the "callback" function should be executed |
| 156 | + // with payload sent back from the responder. Here, we'll simply simulate (5 seconds delay) |
| 157 | + // that response came through by passing the same "data" as we've originally passed in. |
| 158 | + setTimeout(() => callback({ response: packet.data }), 5000); |
| 159 | + |
| 160 | + return () => console.log('teardown'); |
| 161 | + } |
| 162 | +} |
| 163 | +``` |
| 164 | + |
| 165 | +With this in place, let's create an instance of `GoogleCloudPubSubClient` class and run the `send()` method (which you might have seen in earlier chapters), subscribing to the returned observable stream. |
| 166 | + |
| 167 | +```typescript |
| 168 | +const googlePubSubClient = new GoogleCloudPubSubClient(); |
| 169 | +googlePubSubClient |
| 170 | + .send('pattern', 'Hello world!') |
| 171 | + .subscribe((response) => console.log(response)); |
| 172 | +``` |
| 173 | + |
| 174 | +Now, you should see the following output in your terminal: |
| 175 | + |
| 176 | +```typescript |
| 177 | +connect |
| 178 | +message: { pattern: 'pattern', data: 'Hello world!' } |
| 179 | +Hello world! // <-- after 5 seconds |
| 180 | +``` |
| 181 | + |
| 182 | +To test if our "teardown" method (which our `publish()` method returns) is properly executed, let's apply a timeout operator to our stream, setting it to 2 seconds to make sure it throws earlier then our `setTimeout` calls the `callback` function. |
| 183 | + |
| 184 | +```typescript |
| 185 | +const googlePubSubClient = new GoogleCloudPubSubClient(); |
| 186 | +googlePubSubClient |
| 187 | + .send('pattern', 'Hello world!') |
| 188 | + .pipe(timeout(2000)) |
| 189 | + .subscribe( |
| 190 | + (response) => console.log(response), |
| 191 | + (error) => console.error(error.message), |
| 192 | + ); |
| 193 | +``` |
| 194 | + |
| 195 | +> info **Hint** The `timeout` operator is imported from the `rxjs/operators` package. |
| 196 | +
|
| 197 | +With `timeout` operator applied, your terminal output should look as follows: |
| 198 | + |
| 199 | +```typescript |
| 200 | +connect |
| 201 | +message: { pattern: 'pattern', data: 'Hello world!' } |
| 202 | +teardown // <-- teardown |
| 203 | +Timeout has occurred |
| 204 | +``` |
| 205 | + |
| 206 | +To dispatch an event (instead of sending a message), use the `emit()` method: |
| 207 | + |
| 208 | +```typescript |
| 209 | +googlePubSubClient.emit('event', 'Hello world!'); |
| 210 | +``` |
| 211 | + |
| 212 | +And that's what you should see in the console: |
| 213 | + |
| 214 | +```typescript |
| 215 | +connect |
| 216 | +event to dispatch: { pattern: 'event', data: 'Hello world!' } |
| 217 | +``` |
0 commit comments