You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Additionally you can add serializers to modify the value of a produce for a key or value before it is sent over to Kafka.
331
331
332
332
```js
333
-
producer.setValueSerializer(function(value) {
333
+
producer.setValueSerializer((value)=> {
334
334
returnBuffer.from(JSON.stringify(value));
335
335
});
336
336
```
@@ -342,7 +342,7 @@ Otherwise the behavior of the class should be exactly the same.
342
342
To read messages from Kafka, you use a `KafkaConsumer`. You instantiate a `KafkaConsumer` object as follows:
343
343
344
344
```js
345
-
var consumer =newKafka.KafkaConsumer({
345
+
constconsumer=newKafka.KafkaConsumer({
346
346
'group.id':'kafka',
347
347
'metadata.broker.list':'localhost:9092',
348
348
}, {});
@@ -357,10 +357,10 @@ The `group.id` and `metadata.broker.list` properties are required for a consumer
357
357
Rebalancing is managed internally by `librdkafka` by default. If you would like to override this functionality, you may provide your own logic as a rebalance callback.
358
358
359
359
```js
360
-
var consumer =newKafka.KafkaConsumer({
360
+
constconsumer=newKafka.KafkaConsumer({
361
361
'group.id':'kafka',
362
362
'metadata.broker.list':'localhost:9092',
363
-
'rebalance_cb':function(err, assignment) {
363
+
'rebalance_cb': (err, assignment)=> {
364
364
365
365
if (err.code===Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
366
366
// Note: this can throw when you are disconnected. Take care and wrap it in
@@ -385,10 +385,10 @@ var consumer = new Kafka.KafkaConsumer({
385
385
When you commit in `node-rdkafka`, the standard way is to queue the commit request up with the next `librdkafka` request to the broker. When doing this, there isn't a way to know the result of the commit. Luckily there is another callback you can listen to to get this information
@@ -455,15 +455,15 @@ The following example illustrates flowing mode:
455
455
consumer.connect();
456
456
457
457
consumer
458
-
.on('ready', function() {
458
+
.on('ready', () => {
459
459
consumer.subscribe(['librdtesting-01']);
460
460
461
461
// Consume from the librdtesting-01 topic. This is what determines
462
462
// the mode we are running in. By not specifying a callback (or specifying
463
463
// only a callback) we get messages as soon as they are available.
464
464
consumer.consume();
465
465
})
466
-
.on('data', function(data) {
466
+
.on('data', (data)=> {
467
467
// Output the actual message contents
468
468
console.log(data.value.toString());
469
469
});
@@ -474,17 +474,17 @@ The following example illustrates non-flowing mode:
474
474
consumer.connect();
475
475
476
476
consumer
477
-
.on('ready', function() {
477
+
.on('ready', () => {
478
478
// Subscribe to the librdtesting-01 topic
479
479
// This makes subsequent consumes read from that topic.
480
480
consumer.subscribe(['librdtesting-01']);
481
481
482
482
// Read one message every 1000 milliseconds
483
-
setInterval(function() {
483
+
setInterval(() => {
484
484
consumer.consume(1);
485
485
}, 1000);
486
486
})
487
-
.on('data', function(data) {
487
+
.on('data', (data)=> {
488
488
console.log('Message found! Contents below.');
489
489
console.log(data.value.toString());
490
490
});
@@ -524,15 +524,15 @@ The following table lists events for this API.
524
524
Some times you find yourself in the situation where you need to know the latest (and earliest) offset for one of your topics. Connected producers and consumers both allow you to query for these through `queryWaterMarkOffsets` like follows:
0 commit comments