Skip to content

Commit 184e124

Browse files
committed
Add test for consumer.run().
1 parent 818ce7b commit 184e124

File tree

8 files changed

+997
-25
lines changed

8 files changed

+997
-25
lines changed

MIGRATION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@
178178

179179
* Error-handling for a failed `send` is stricter. While sending multiple messages, even if one of the messages fails, the method throws an error.
180180
* `sendBatch` is supported. However, the actual batching semantics are handled by librdkafka, and it just acts as a wrapper around `send` (See `send` for changes).
181+
* A transactional producer (with a `transactionId`) set, can only send messages after calling `producer.transaction()`.
181182

182183
### Consumer
183184

examples/kafkajs/eos.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ async function eosStart() {
6666
{
6767
topic,
6868
partitions: [
69-
{ partition, offset: message.offset },
69+
/* The message.offset indicates current offset, so we need to add 1 to it, since committed offset denotes
70+
* the next offset to consume. */
71+
{ partition, offset: message.offset + 1 },
7072
],
7173
}
7274
],

lib/kafkajs/_consumer.js

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,14 @@ class Consumer {
449449

450450
while (this.#state === ConsumerState.CONNECTED) {
451451
const m = await this.#consumeSingle();
452+
453+
/* It's possible for the state to change as we await something.
454+
* Unfortunately, we have no alternative but to break and drop the message, if it exists.
455+
* TODO: fix this, possibly with a flag in disconnect that waits until the run loop winds down.
456+
*/
457+
if (this.#state !== ConsumerState.CONNECTED)
458+
break;
459+
452460
if (!m) {
453461
continue;
454462
}
@@ -470,35 +478,57 @@ class Consumer {
470478
/* Don't pass this message on to the user if this topic partition was seeked to. */
471479
continue;
472480
}
481+
482+
/* It's possible for the state to change as we await something.
483+
* Unfortunately, we have no alternative but to break and drop the message.
484+
* TODO: fix this, possibly with a flag in disconnect that waits until the run loop winds down.
485+
*/
486+
if (this.#state !== ConsumerState.CONNECTED)
487+
break;
473488
}
474489

475490
try {
476491
await config.eachMessage(
477492
this.#createPayload(m)
478493
)
494+
495+
/* It's possible for the state to change as we await something.
496+
* Unfortunately, we have no alternative but to break without taking any action that the user might need.
497+
*/
498+
if (this.#state !== ConsumerState.CONNECTED)
499+
break;
479500
} catch (e) {
480501
/* It's not only possible, but expected that an error will be thrown by eachMessage.
481502
* This is especially true since the pattern of pause() followed by throwing an error
482503
* is encouraged. To meet the API contract, we seek one offset backward at this point (which
483504
* means seeking to the message offset). */
484-
this.seek({
485-
topic: m.topic,
486-
partition: m.partition,
487-
offset: m.offset,
488-
});
505+
if (this.#state === ConsumerState.CONNECTED)
506+
this.seek({
507+
topic: m.topic,
508+
partition: m.partition,
509+
offset: m.offset,
510+
});
489511
}
490512

491513
/* Force a immediate seek here. It's possible that there are no more messages to be passed to the user,
492514
* but the user seeked in the call to eachMessage, or else we encountered the error catch block.
493515
* In that case, the results of that seek will never be reflected unless we do this. */
494-
if (this.#checkPendingSeeks)
516+
if (this.#checkPendingSeeks && this.#state === ConsumerState.CONNECTED)
495517
await this.#seekInternal();
496518

497519
/* TODO: another check we need to do here is to see how kafkaJS is handling
498520
* commits. Are they commmitting after a message is _processed_?
499521
* In that case we need to turn off librdkafka's auto-commit, and commit
500522
* inside this function.
501523
*/
524+
525+
/* Yield for a bit to allow other scheduled tasks on the event loop to run.
526+
* For instance, if disconnect() is called during/after we await eachMessage, and
527+
* we don't await anything else after that, this loop will run despite needing to
528+
* disconnect.
529+
* It's better than any pending tasks be processed here, while we've processed one
530+
* message completely, rather than between message processing. */
531+
await new Promise((resolve) => setTimeout(resolve, 0));
502532
}
503533
}
504534

@@ -573,7 +603,6 @@ class Consumer {
573603
*/
574604
async #seekInternal(messageTopicPartition) {
575605
this.#checkPendingSeeks = false;
576-
577606
const assignment = this.assignment();
578607
const offsetsToCommit = [];
579608
let invalidateMessage = false;

lib/kafkajs/_producer.js

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ class Producer {
169169
throw new error.KafkaJSError(`Ready callback called in invalid state ${this.#state}`, { code: error.ErrorCodes.ERR__STATE });
170170
}
171171

172-
const config = await this.#config();
173-
if (Object.hasOwn(config, 'transactional.id') && this.#state !== ProducerState.INITIALIZED_TRANSACTIONS) {
172+
const {globalConfig} = await this.#config();
173+
if (Object.hasOwn(globalConfig, 'transactional.id') && this.#state !== ProducerState.INITIALIZED_TRANSACTIONS) {
174174
this.#state = ProducerState.INITIALIZING_TRANSACTIONS;
175175
this.#internalClient.initTransactions(5000 /* default: 5s */, this.#readyTransactions.bind(this));
176176
return;
@@ -450,7 +450,11 @@ class Producer {
450450

451451
msgPromises.push(new Promise((resolve, reject) => {
452452
const opaque = { resolve, reject };
453-
this.#internalClient.produce(sendOptions.topic, msg.partition, msg.value, msg.key, msg.timestamp, opaque, msg.headers);
453+
try {
454+
this.#internalClient.produce(sendOptions.topic, msg.partition, msg.value, msg.key, msg.timestamp, opaque, msg.headers);
455+
} catch(err) {
456+
reject(err);
457+
}
454458
}));
455459

456460
}

0 commit comments

Comments
 (0)