Skip to content

Commit d1c30f4

Browse files
committed
Add first test for promisified API: consumer.
Adds test helpers and jest.
1 parent 8d2daeb commit d1c30f4

File tree

10 files changed

+4068
-1054
lines changed

10 files changed

+4068
-1054
lines changed

MIGRATION.md

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@
139139

140140
#### Semantic and Per-Method Changes
141141

142-
* `sendBatch` is not supported (YET). However, the actual batching semantics are handled by librdkafka.
143142
* Changes to `send`:
144143
* `acks`, `compression` and `timeout` are not set on a per-send basis. Rather, they must be configured in the configuration.
145144
Before:
@@ -178,6 +177,7 @@
178177
```
179178

180179
* Error-handling for a failed `send` is stricter. While sending multiple messages, even if one of the messages fails, the method throws an error.
180+
* `sendBatch` is supported. However, the actual batching semantics are handled by librdkafka, and it just acts as a wrapper around `send` (See `send` for changes).
181181

182182
### Consumer
183183

@@ -219,7 +219,10 @@
219219
#### Semantic and Per-Method Changes
220220

221221

222-
* While passing a list of topics to `subscribe`, the `fromBeginning` property is not supported. Instead, the property `auto.offset.reset` needs to be used.
222+
* Changes to subscribe:
223+
* Regex flags are ignored while passing a topic subscription (like 'i' or 'g').
224+
* Subscribe must be called after `connect`.
225+
* While passing a list of topics to `subscribe`, the `fromBeginning` property is not supported. Instead, the property `auto.offset.reset` needs to be used.
223226
Before:
224227
```javascript
225228
const kafka = new Kafka({ /* ... */ });
@@ -229,7 +232,6 @@
229232
await consumer.connect();
230233
await consumer.subscribe({ topics: ["topic"], fromBeginning: true});
231234
```
232-
233235
After:
234236
```javascript
235237
const kafka = new Kafka({ /* ... */ });

Makefile

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ endif
1313
NODE ?= node
1414
CPPLINT ?= cpplint.py
1515
BUILDTYPE ?= Release
16-
TESTS = "test/**/*.js"
16+
TESTS = $(ls test/producer/*.js test/*.js test/tools/*.js)
17+
PROMISIFIED_TESTS = "test/promisified"
1718
E2E_TESTS = $(wildcard e2e/*.spec.js)
1819
TEST_REPORTER =
1920
TEST_OUTPUT =
@@ -24,7 +25,7 @@ CONFIG_OUTPUTS = \
2425

2526
CPPLINT_FILES = $(wildcard src/*.cc src/*.h)
2627
CPPLINT_FILTER = -legal/copyright
27-
JSLINT_FILES = lib/*.js test/*.js e2e/*.js
28+
JSLINT_FILES = lib/*.js test/*.js e2e/*.js lib/kafkajs/*.js
2829

2930
PACKAGE = $(shell node -pe 'require("./package.json").name.split("/")[1]')
3031
VERSION = $(shell node -pe 'require("./package.json").version')
@@ -58,6 +59,7 @@ $(CONFIG_OUTPUTS): node_modules/.dirstamp binding.gyp
5859

5960
test: node_modules/.dirstamp
6061
@./node_modules/.bin/mocha --ui exports $(TEST_REPORTER) $(TESTS) $(TEST_OUTPUT)
62+
@./node_modules/.bin/jest --ci --runInBand $(PROMISIFIED_TESTS)
6163

6264
check: node_modules/.dirstamp
6365
@$(NODE) util/test-compile.js

lib/kafkajs/_admin.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ class Admin {
9090
* @returns {Promise<void>} Resolves when disconnect is complete, rejects on error.
9191
*/
9292
async disconnect() {
93+
/* Not yet connected - no error. */
94+
if (this.#state == AdminState.INIT) {
95+
return;
96+
}
97+
98+
/* Already disconnecting, or disconnected. */
9399
if (this.#state >= AdminState.DISCONNECTING) {
94100
return;
95101
}

lib/kafkajs/_consumer.js

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -342,16 +342,48 @@ class Consumer {
342342

343343
/**
344344
* Subscribes the consumer to the given topics.
345-
* @param {import("../../types/kafkajs").ConsumerSubscribeTopics} subscription
345+
* @param {import("../../types/kafkajs").ConsumerSubscribeTopics | import("../../types/kafkajs").ConsumerSubscribeTopic} subscription
346346
*/
347347
async subscribe(subscription) {
348-
if (typeof subscription.fromBeginning == 'boolean') {
348+
if (this.#state !== ConsumerState.CONNECTED) {
349+
throw new error.KafkaJSError('Subscribe can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
350+
}
351+
352+
if (typeof subscription.fromBeginning === 'boolean') {
349353
throw new error.KafkaJSError(
350-
'fromBeginning is not supported by subscribe(), but must be passed as rdKafka properties to the consumer',
351-
{ code: error.ErrorCodes.ERR__INVALID_ARG });
354+
'fromBeginning is not supported by subscribe(), but must be passed as an rdKafka property to the consumer.',
355+
{ code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
352356
}
353357

354-
this.#internalClient.subscribe(subscription.topics);
358+
if (!Object.hasOwn(subscription, 'topics') && !Object.hasOwn(subscription, 'topic')) {
359+
throw new error.KafkaJSError('Either topics or topic must be specified.', { code: error.ErrorCodes.ERR__INVALID_ARG });
360+
}
361+
362+
let topics = [];
363+
if (subscription.topic) {
364+
topics.push(subscription.topic);
365+
} else if (Array.isArray(subscription.topics)) {
366+
topics = subscription.topics;
367+
} else {
368+
throw new error.KafkaJSError('topics must be an object of the type ConsumerSubscribeTopics.', { code: error.ErrorCodes.ERR__INVALID_ARG });
369+
}
370+
371+
topics = topics.map(topic => {
372+
if (typeof topic === 'string') {
373+
return topic;
374+
} else if (topic instanceof RegExp) {
375+
// Flags are not supported, and librdkafka only considers a regex match if the first character of the regex is ^.
376+
const regexSource = topic.source;
377+
if (regexSource.charAt(0) !== '^')
378+
return '^' + regexSource;
379+
else
380+
return regexSource;
381+
} else {
382+
throw new error.KafkaJSError('Invalid topic ' + topic + ' (' + typeof topic + '), the topic name has to be a String or a RegExp', { code: error.ErrorCodes.ERR__INVALID_ARG });
383+
}
384+
});
385+
386+
this.#internalClient.subscribe(topics);
355387
}
356388

357389
async stop() {
@@ -541,10 +573,18 @@ class Consumer {
541573
* @returns {Promise<void>} a promise that resolves when the consumer has disconnected.
542574
*/
543575
async disconnect() {
544-
if (this.#state === ConsumerState.INIT) {
545-
throw new error.KafkaJSError('Disconnect can only be called once consumer is connected.', { code: error.ErrorCodes.ERR__STATE });
576+
/* Not yet connected - no error. */
577+
if (this.#state == ConsumerState.INIT) {
578+
return;
546579
}
547580

581+
/* TODO: We should handle a case where we are connecting, we should
582+
* await the connection and then schedule a disconnect. */
583+
584+
/* Already disconnecting, or disconnected. */
585+
if (this.#state >= ConsumerState.DISCONNECTING) {
586+
return;
587+
}
548588
if (this.#state >= ConsumerState.DISCONNECTING) {
549589
return;
550590
}

lib/kafkajs/_producer.js

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,15 @@ class Producer {
240240
* @returns {Promise<void>} Resolves when disconnect is complete, rejects on error.
241241
*/
242242
async disconnect() {
243+
/* Not yet connected - no error. */
244+
if (this.#state == ProducerState.INIT) {
245+
return;
246+
}
247+
248+
/* TODO: We should handle a case where we are connecting, we should
249+
* await the connection and then schedule a disconnect. */
250+
251+
/* Already disconnecting, or disconnected. */
243252
if (this.#state >= ProducerState.DISCONNECTING) {
244253
return;
245254
}
@@ -477,6 +486,44 @@ class Producer {
477486
}
478487
return ret;
479488
}
489+
490+
/**
491+
* sendBatch(batch: ProducerBatch): Promise<RecordMetadata[]>
492+
* @param {import('../../types/kafkajs').ProducerBatch} sendOptions - The record to send. The keys `acks`, `timeout`, and `compression` are not used, and should not be set, rather, they should be set in the global config.
493+
* @returns {Promise<import("../../types/kafkajs").RecordMetadata[]>} Resolves with the record metadata for the messages.
494+
*/
495+
async sendBatch(sendOptions) {
496+
if (this.#state !== ProducerState.CONNECTED) {
497+
throw new error.KafkaJSError("Cannot send without awaiting connect()", { code: error.ErrorCodes.ERR__STATE });
498+
}
499+
500+
if (sendOptions === null || !(sendOptions instanceof Object)) {
501+
throw new error.KafkaJSError("sendOptions must be set correctly", { code: error.ErrorCodes.ERR__INVALID_ARG });
502+
}
503+
504+
// Ignore all properties except topic and messages.
505+
if (Object.hasOwn(sendOptions, "acks") || Object.hasOwn(sendOptions, "timeout") || Object.hasOwn(sendOptions, "compression")) {
506+
throw new error.KafkaJSError("sendOptions must not contain acks, timeout, or compression", { code: error.ErrorCodes.ERR__INVALID_ARG });
507+
}
508+
509+
if (sendOptions.topicMessages !== null && !Array.isArray(sendOptions.topicMessages)) {
510+
throw new error.KafkaJSError("sendOptions.topicMessages must be an array if set", { code: error.ErrorCodes.ERR__INVALID_ARG });
511+
}
512+
513+
if (!sendOptions.topicMessages || sendOptions.topicMessages.length === 0) {
514+
return Promise.resolve([]);
515+
}
516+
517+
// Internally, we just use send() because the batching is handled by librdkafka.
518+
const sentPromises = [];
519+
520+
for (const topicMessage of sendOptions.topicMessages) {
521+
sentPromises.push(this.send(topicMessage));
522+
}
523+
524+
const records = await Promise.all(sentPromises);
525+
return records.flat();
526+
}
480527
}
481528

482529
module.exports = { Producer }

0 commit comments

Comments
 (0)