You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: content/microservices/kafka.md
+20-14Lines changed: 20 additions & 14 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -8,8 +8,6 @@
8
8
9
9
The Kafka project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. It integrates very well with Apache Storm and Spark for real-time streaming data analysis.
10
10
11
-
**Kafka transporter is experimental.**
12
-
13
11
#### Installation
14
12
15
13
To start building Kafka-based microservices, first install the required package:
@@ -163,8 +161,22 @@ Use the `@Client()` decorator as follows:
163
161
client: ClientKafka;
164
162
```
165
163
164
+
#### Message pattern
165
+
166
+
The Kafka microservice message pattern utilizes two topics for the request and reply channels. The `ClientKafka#send()` method sends messages with a [return address](https://www.enterpriseintegrationpatterns.com/patterns/messaging/ReturnAddress.html) by associating a [correlation id](https://www.enterpriseintegrationpatterns.com/patterns/messaging/CorrelationIdentifier.html), reply topic, and reply partition with the request message. This requires the `ClientKafka` instance to be subscribed to the reply topic and assigned to at least one partition before sending a message.
167
+
168
+
Subsequently, you need to have at least one reply topic partition for every Nest application running. For example, if you are running 4 Nest applications but the reply topic only has 3 partitions, then 1 of the Nest applications will error out when trying to send a message.
169
+
170
+
When new `ClientKafka` instances are launched they join the consumer group and subscribe to their respective topics. This process triggers a rebalance of topic partitions assigned to consumers of the consumer group.
171
+
172
+
Normally, topic partitions are assigned using the round robin partitioner, which assigns topic partitions to a collection of consumers sorted by consumer names which are randomly set on application launch. However, when a new consumer joins the consumer group, the new consumer can be positioned anywhere within the collection of consumers. This creates a condition where pre-existing consumers can be assigned different partitions when the pre-existing consumer is positioned after the new consumer. As a result, the consumers that are assigned different partitions will lose response messages of requests sent before the rebalance.
173
+
174
+
To prevent the `ClientKafka` consumers from losing response messages, a Nest-specific built-in custom partitioner is utilized. This custom partitioner assigns partitions to a collection of consumers sorted by high-resolution timestamps (`process.hrtime()`) that are set on application launch.
175
+
166
176
#### Message response subscription
167
177
178
+
> warning **Note** This section is only relevant if you use [request-response](/microservices/basics#request-response) message style (with the `@MessagePatern` decorator and the `ClientKafka#send` method). Subscribing to the response topic is not necessary for the [event-based](/microservices/basics#event-based) communication (`@EventPattern` decorator and `ClientKafka#emit` method).
179
+
168
180
The `ClientKafka` class provides the `subscribeToResponseOf()` method. The `subscribeToResponseOf()` method takes a request's topic name as an argument and adds the derived reply topic name to a collection of reply topics. This method is required when implementing the message pattern.
169
181
170
182
```typescript
@@ -184,18 +196,6 @@ async onModuleInit() {
184
196
}
185
197
```
186
198
187
-
#### Message pattern
188
-
189
-
The Kafka microservice message pattern utilizes two topics for the request and reply channels. The `ClientKafka#send()` method sends messages with a [return address](https://www.enterpriseintegrationpatterns.com/patterns/messaging/ReturnAddress.html) by associating a [correlation id](https://www.enterpriseintegrationpatterns.com/patterns/messaging/CorrelationIdentifier.html), reply topic, and reply partition with the request message. This requires the `ClientKafka` instance to be subscribed to the reply topic and assigned to at least one partition before sending a message.
190
-
191
-
Subsequently, you need to have at least one reply topic partition for every Nest application running. For example, if you are running 4 Nest applications but the reply topic only has 3 partitions, then 1 of the Nest applications will error out when trying to send a message.
192
-
193
-
When new `ClientKafka` instances are launched they join the consumer group and subscribe to their respective topics. This process triggers a rebalance of topic partitions assigned to consumers of the consumer group.
194
-
195
-
Normally, topic partitions are assigned using the round robin partitioner, which assigns topic partitions to a collection of consumers sorted by consumer names which are randomly set on application launch. However, when a new consumer joins the consumer group, the new consumer can be positioned anywhere within the collection of consumers. This creates a condition where pre-existing consumers can be assigned different partitions when the pre-existing consumer is positioned after the new consumer. As a result, the consumers that are assigned different partitions will lose response messages of requests sent before the rebalance.
196
-
197
-
To prevent the `ClientKafka` consumers from losing response messages, a Nest-specific built-in custom partitioner is utilized. This custom partitioner assigns partitions to a collection of consumers sorted by high-resolution timestamps (`process.hrtime()`) that are set on application launch.
198
-
199
199
#### Incoming
200
200
201
201
Nest receives incoming Kafka messages as an object with `key`, `value`, and `headers` properties that have values of type `Buffer`. Nest then parses these values by transforming the buffers into strings. If the string is "object like", Nest attempts to parse the string as `JSON`. The `value` is then passed to its associated handler.
@@ -278,6 +278,12 @@ export class HeroesController {
278
278
}
279
279
```
280
280
281
+
#### Event-based
282
+
283
+
While the request-response method is ideal for exchanging messages between services, it is less suitable when your message style is event-based (which in turn is ideal for Kafka) - when you just want to publish events **without waiting for a response**. In that case, you do not want the overhead required by request-response for maintaining two topics.
284
+
285
+
Check out these two sections to learn more about this: [Overview: Event-based](/microservices/basics#event-based) and [Overview: Publishing events](/microservices/basics#publishing-events).
286
+
281
287
#### Context
282
288
283
289
In more sophisticated scenarios, you may want to access more information about the incoming request. When using the Kafka transporter, you can access the `KafkaContext` object.
<p>If you need to add some custom logic around the serialization of responses on the client side, you can use a custom class that extends the <code>ClientProxy</code> class or one of its child classes. For modifying successful requests you can override the <code>serializeResponse</code> method, and for modifying any errors that go through this client you can override the <code>serializeError</code> method. To make use of this custom class, you can pass the class itself to the <code>ClientsModule.register()</code> method using the <code>customClass</code> property. Below is an example of a custom <code>ClientProxy</code> that serializes each error into an <code>RpcException</code>.</p>
0 commit comments