Skip to content

Commit 92a6fdc

Browse files
committed
Poll producer in send() loops
1 parent b1036e0 commit 92a6fdc

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

CONTRIBUTING.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,16 @@ gdb node
192192

193193
You can add breakpoints and so on after that.
194194

195+
### Debugging and Profiling JavaScript
196+
197+
Run the code with the `--inspect` flag, and then open `chrome://inspect` in Chrome and connect to the debugger.
198+
199+
Example:
200+
201+
```
202+
node --inspect path/to/file.js
203+
```
204+
195205
## Updating librdkafka version
196206

197207
The librdkafka should be periodically updated to the latest release in https://github.com/confluentinc/librdkafka/releases

lib/kafkajs/_producer.js

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@ const ProducerState = Object.freeze({
2525
});
2626

2727
const CompressionTypes = Object.freeze({
28-
None: 'none',
28+
NONE: 'none',
2929
GZIP: 'gzip',
3030
SNAPPY: 'snappy',
3131
LZ4: 'lz4',
3232
ZSTD: 'zstd',
33-
})
33+
});
34+
35+
const producerPollIntervalMs = 500;
3436

3537
class Producer {
3638
/**
@@ -75,6 +77,14 @@ class Producer {
7577
*/
7678
#logger = new DefaultLogger();
7779

80+
/**
81+
* Stores the time of the last poll.
82+
* In case we are producing in a tight loop, the interval timer will not
83+
* fire, and we won't poll. By maintaining the last poll time, we can
84+
* poll at the end of send() and sendBatch().
85+
*/
86+
#lastPollTime = process.hrtime();
87+
7888
/**
7989
* @constructor
8090
* @param {import("../../types/kafkajs").ProducerConfig} kJSConfig
@@ -273,7 +283,8 @@ class Producer {
273283
return;
274284
}
275285
this.#internalClient.poll();
276-
}, 500);
286+
this.#lastPollTime = process.hrtime();
287+
}, producerPollIntervalMs);
277288

278289
this.#internalClient.on('delivery-report', this.#deliveryCallback.bind(this));
279290

@@ -541,6 +552,15 @@ class Producer {
541552
}
542553
}));
543554

555+
/* Poll if we haven't polled in a while. This can be the case if we're producing
556+
* in a tight loop without awaiting the produce. */
557+
const elapsed = process.hrtime(this.#lastPollTime);
558+
const elapsedInNanos = elapsed[0] * 1e9 + elapsed[1];
559+
if (elapsedInNanos > producerPollIntervalMs * 1000) {
560+
this.#lastPollTime = process.hrtime();
561+
this.#internalClient.poll();
562+
}
563+
544564
}
545565

546566
/* The delivery report will be handled by the delivery-report event handler, and we can simply wait for it here. */

0 commit comments

Comments
 (0)