Skip to content

Commit 64cba18

Browse files
committed
Add Producer flush
1 parent 40119c6 commit 64cba18

File tree

3 files changed

+149
-1
lines changed

3 files changed

+149
-1
lines changed

lib/kafkajs/_producer.js

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,61 @@ class Producer {
679679

680680
this.#internalClient.setSaslCredentials(args.username, args.password);
681681
}
682+
683+
/**
684+
* Flushes any pending messages.
685+
*
686+
* Messages are batched internally by librdkafka for performance reasons.
687+
* Continously sent messages are batched upto a timeout, or upto a maximum
688+
* size. Calling flush sends any pending messages immediately without
689+
* waiting for this size or timeout.
690+
*
691+
* @param {number} args.timeout Time to try flushing for in milliseconds.
692+
* @returns {Promise<void>} Resolves on successful flush.
693+
* @throws {KafkaJSTimeout} if the flush times out.
694+
*
695+
* @note This is only useful when using asynchronous sends.
696+
* For example, the following code does not get any benefit from flushing,
697+
* since `await`ing the send waits for the delivery report, and the message
698+
* has already been sent by the time we start flushing:
699+
* for (let i = 0; i < 100; i++) await send(...);
700+
* await flush(...) // Not useful.
701+
*
702+
* However, using the following code may put these 5 messages into a batch
703+
* and then the subsequent `flush` will send the batch altogether (as long as
704+
* batch size, etc. are conducive to batching):
705+
* for (let i = 0; i < 5; i++) send(...);
706+
* await flush({timeout: 5000});
707+
*/
708+
async flush(args = { timeout: 500 }) {
709+
if (this.#state !== ProducerState.CONNECTED) {
710+
throw new error.KafkaJSError("Cannot flush without awaiting connect()", { code: error.ErrorCodes.ERR__STATE });
711+
}
712+
713+
if (!Object.hasOwn(args, 'timeout')) {
714+
throw new error.KafkaJSError("timeout must be set for flushing", { code: error.ErrorCodes.ERR__INVALID_ARG });
715+
}
716+
717+
return new Promise((resolve, reject) => {
718+
this.#internalClient.flush(args.timeout, (err) => {
719+
if (err) {
720+
const kjsErr = createKafkaJsErrorFromLibRdKafkaError(err);
721+
if (err.code === error.ErrorCodes.ERR__TIMED_OUT) {
722+
/* See reason below for yield. Same here - but for partially processed delivery reports. */
723+
setTimeout(() => reject(kjsErr), 0);
724+
} else {
725+
reject(kjsErr);
726+
}
727+
return;
728+
}
729+
/* Yielding here allows any 'then's and 'awaits' on associated sends to be scheduled
730+
* before flush completes, which means that the user doesn't have to yield themselves.
731+
* It's not necessary that all the 'then's and 'awaits' will be able to run, but
732+
* it's better than nothing. */
733+
setTimeout(resolve, 0);
734+
});
735+
});
736+
}
682737
}
683738

684739
module.exports = { Producer, CompressionTypes };

src/workers.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,13 @@ class ErrorAwareWorker : public Nan::AsyncWorker {
3636
void HandleErrorCallback() {
3737
Nan::HandleScope scope;
3838

39+
// Construct error and add code to it.
40+
v8::Local<v8::Value> error = Nan::Error(ErrorMessage());
41+
Nan::Set(error.As<v8::Object>(), Nan::New("code").ToLocalChecked(),
42+
Nan::New(GetErrorCode()));
43+
3944
const unsigned int argc = 1;
40-
v8::Local<v8::Value> argv[argc] = { Nan::Error(ErrorMessage()) };
45+
v8::Local<v8::Value> argv[argc] = { error };
4146

4247
callback->Call(argc, argv);
4348
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
jest.setTimeout(10000);
2+
3+
const {
4+
secureRandom,
5+
createProducer,
6+
createTopic,
7+
} = require('../testhelpers');
8+
const { Kafka } = require('../../../lib').KafkaJS;
9+
const process = require('process');
10+
11+
describe('Producer > Flush', () => {
12+
let producer, topicName, message;
13+
14+
beforeEach(async () => {
15+
producer = createProducer({
16+
}, {
17+
'linger.ms': 5000, /* large linger ms to test flush */
18+
'queue.buffering.max.kbytes': 2147483647, /* effectively unbounded */
19+
});
20+
21+
topicName = `test-topic-${secureRandom()}`;
22+
message = { key: `key-${secureRandom()}`, value: `value-${secureRandom()}` };
23+
24+
await createTopic({ topic: topicName });
25+
})
26+
27+
afterEach(async () => {
28+
producer && (await producer.disconnect());
29+
})
30+
31+
32+
it('does not wait for linger.ms',
33+
async () => {
34+
await producer.connect();
35+
let messageSent = false;
36+
const startTime = process.hrtime();
37+
let diffTime;
38+
39+
producer.send({ topic: topicName, messages: [message] }).then(() => {
40+
messageSent = true;
41+
diffTime = process.hrtime(startTime);
42+
});
43+
44+
await producer.flush({ timeout: 5000 });
45+
expect(messageSent).toBe(true);
46+
47+
const diffTimeSeconds = diffTime[0] + diffTime[1] / 1e9;
48+
expect(diffTimeSeconds).toBeLessThan(5);
49+
}
50+
);
51+
52+
it('does not matter when awaiting sends',
53+
async () => {
54+
await producer.connect();
55+
let messageSent = false;
56+
const startTime = process.hrtime();
57+
let diffTime;
58+
59+
await producer.send({ topic: topicName, messages: [message] }).then(() => {
60+
messageSent = true;
61+
diffTime = process.hrtime(startTime);
62+
});
63+
64+
await producer.flush({ timeout: 1000 });
65+
expect(messageSent).toBe(true);
66+
67+
const diffTimeSeconds = diffTime[0] + diffTime[1] / 1e9;
68+
expect(diffTimeSeconds).toBeGreaterThan(5);
69+
}
70+
);
71+
72+
it('times out if messages are pending',
73+
async () => {
74+
await producer.connect();
75+
let messageSent = false;
76+
77+
/* Larger number of messages */
78+
producer.send({ topic: topicName, messages: Array(100).fill(message) }).then(() => {
79+
messageSent = true;
80+
});
81+
82+
/* Small timeout */
83+
await expect(producer.flush({ timeout: 1 })).rejects.toThrow(Kafka.KafkaJSTimeout);
84+
expect(messageSent).toBe(false);
85+
}
86+
);
87+
88+
})

0 commit comments

Comments
 (0)