Skip to content

Commit 74d84f9

Browse files
authored
Ensure that all raced promises are resolved after the race (#151)
* Ensure that all raced promises are resolved after the race * Add CHANGELOG.md entry * Reduce flakiness in test * Address review comments
1 parent 3099517 commit 74d84f9

File tree

4 files changed

+31
-17
lines changed

4 files changed

+31
-17
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
12
# confluent-kafka-javascript v0.5.0
23

34
v0.5.0 is a limited availability feature release. It is supported for all usage.
45

56
## Enhancements
67

78
1. Add support for an Admin API to delete records.(#141).
9+
2. Fixes an issue with unresolved raced Promises leaking in the consumer (#151).
810

911

1012
# confluent-kafka-javascript v0.4.0

lib/kafkajs/_common.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,11 @@ class Timer {
669669
* or the passed promise resolves, when it's passed, clearing the timeout
670670
* in any case.
671671
*
672+
* WARNING: it must be avoided to call `withTimeout` with the same promise
673+
* more than once, as `Promise.race` will add more callbacks to it,
674+
* creating a memory leak if the promise is never resolved or not resolved
675+
* soon enough.
676+
*
672677
* @param {number} timeoutMs The timeout in milliseconds.
673678
* @param {Promise|undefined} promise The promise to wait for,
674679
* alternatively to the timeout, or `undefined` to just wait for the timeout.

lib/kafkajs/_consumer.js

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const {
2323
const { Buffer } = require('buffer');
2424
const MessageCache = require('./_consumer_cache');
2525
const { hrtime } = require('process');
26+
const { LinkedList } = require('./_linked-list');
2627

2728
const ConsumerState = Object.freeze({
2829
INIT: 0,
@@ -170,9 +171,9 @@ class Consumer {
170171
#fetchInProgress;
171172

172173
/**
173-
* Promise that resolves when there is something we need to poll for (messages, rebalance, etc).
174+
* List of DeferredPromises waiting on consumer queue to be non-empty.
174175
*/
175-
#queueNonEmpty = new DeferredPromise();
176+
#queueWaiters = new LinkedList();
176177

177178
/**
178179
* Whether any rebalance callback is in progress.
@@ -1270,8 +1271,9 @@ class Consumer {
12701271
}
12711272

12721273
#queueNonEmptyCb() {
1273-
/* Unconditionally resolve the promise - not a problem if it's already resolved. */
1274-
this.#queueNonEmpty.resolve();
1274+
for (const waiter of this.#queueWaiters) {
1275+
waiter.resolve();
1276+
}
12751277
}
12761278

12771279
async #nextFetchRetry() {
@@ -1280,15 +1282,21 @@ class Consumer {
12801282
} else {
12811283
/* Backoff a little. If m is null, we might be without messages
12821284
* or in available partition starvation, and calling consumeSingleCached
1283-
* in a tight loop will help no one. We still keep it to 1000ms because we
1284-
* want to keep polling, though (ideally) we could increase it all the way
1285-
* up to max.poll.interval.ms.
1285+
* in a tight loop will help no one.
12861286
* In case there is any message in the queue, we'll be woken up before the
1287-
* timer expires. */
1288-
await Timer.withTimeout(1000, this.#queueNonEmpty);
1289-
if (this.#queueNonEmpty.resolved) {
1290-
this.#queueNonEmpty = new DeferredPromise();
1291-
}
1287+
* timer expires.
1288+
* We have a per-worker promise, otherwise we end up awakening
1289+
* other workers when they've already looped and just restarted awaiting.
1290+
* The `Promise` passed to `Timer.withTimeout` cannot be reused
1291+
* in next call to this method, to avoid memory leaks caused
1292+
* by `Promise.race`. */
1293+
const waiter = new DeferredPromise();
1294+
const waiterNode = this.#queueWaiters.addLast(waiter);
1295+
await Timer.withTimeout(1000, waiter);
1296+
1297+
/* Resolves the "extra" promise that has been spawned when creating the timer. */
1298+
waiter.resolve();
1299+
this.#queueWaiters.remove(waiterNode);
12921300
}
12931301
}
12941302

@@ -1374,10 +1382,7 @@ class Consumer {
13741382
let interval = Number(cacheExpiration - now) / 1e6;
13751383
if (interval < 100)
13761384
interval = 100;
1377-
const promises = Promise.race([this.#workerTerminationScheduled,
1378-
this.#maxPollIntervalRestart]);
1379-
await Timer.withTimeout(interval,
1380-
promises);
1385+
await Timer.withTimeout(interval, this.#maxPollIntervalRestart);
13811386
if (this.#maxPollIntervalRestart.resolved)
13821387
this.#maxPollIntervalRestart = new DeferredPromise();
13831388
}

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,8 +520,10 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
520520

521521
let calls = 0;
522522
let failedSeek = false;
523+
let eachMessageStarted = false;
523524
consumer.run({
524525
eachMessage: async ({ message }) => {
526+
eachMessageStarted = true;
525527
/* Take a long time to process the message. */
526528
await sleep(7000);
527529
try {
@@ -540,7 +542,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
540542

541543
/* Waiting for assignment and then a bit more means that the first eachMessage starts running. */
542544
await waitFor(() => consumer.assignment().length > 0, () => { }, { delay: 50 });
543-
await sleep(200);
545+
await waitFor(() => eachMessageStarted, () => { }, { delay: 50 });
544546
await consumer.disconnect();
545547

546548
/* Even without explicitly waiting for it, a pending call to eachMessage must complete before disconnect does. */

0 commit comments

Comments
 (0)