Skip to content

Commit 82b57d5

Browse files
committed
Separate eachMessage and eachBatch internal consume loop
1 parent d147944 commit 82b57d5

File tree

1 file changed

+121
-17
lines changed

1 file changed

+121
-17
lines changed

lib/kafkajs/_consumer.js

Lines changed: 121 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -902,11 +902,16 @@ class Consumer {
902902
}
903903

904904
/* We deliberately don't await this. */
905-
this.#runInternal(config);
905+
if (config.eachMessage) {
906+
this.#runInternalEachMessage(config);
907+
} else {
908+
this.#runInternalEachBatch(config);
909+
}
906910
}
907911

908-
/* Internal polling loop. It accepts the same config object that `run` accepts. */
909-
async #runInternal(config) {
912+
/* Internal polling loop.
913+
* It accepts the same config object that `run` accepts, but config.eachMessage must be set. */
914+
async #runInternalEachMessage(config) {
910915
while (this.#state === ConsumerState.CONNECTED) {
911916

912917
/* We need to acquire a lock here, because we need to ensure that we don't
@@ -954,26 +959,125 @@ class Consumer {
954959
}
955960

956961
let eachMessageProcessed = false;
957-
let payload;
958-
if (config.eachMessage) {
959-
payload = this.#createPayload(m);
960-
} else {
961-
payload = this.#createBatchPayload(m);
962+
const payload = this.#createPayload(m);
963+
try {
964+
await config.eachMessage(payload);
965+
eachMessageProcessed = true;
966+
} catch (e) {
967+
/* It's not only possible, but expected that an error will be thrown by eachMessage.
968+
* This is especially true since the pattern of pause() followed by throwing an error
969+
* is encouraged. To meet the API contract, we seek one offset backward (which
970+
* means seeking to the message offset).
971+
* However, we don't do this inside the catch, but just outside it. This is because throwing an
972+
* error is not the only case where we might want to seek back.
973+
*
974+
* So - do nothing but a debug log, but at this point eachMessageProcessed is false.
975+
*/
976+
this.#logger.debug(`Consumer encountered error while processing message. Error details: ${JSON.stringify(e)}. The same message may be reprocessed.`);
977+
}
978+
979+
/* If the message is unprocessed, due to an error, or because the user has not resolved it, we seek back. */
980+
if (!eachMessageProcessed) {
981+
await this.seek({
982+
topic: m.topic,
983+
partition: m.partition,
984+
offset: m.offset,
985+
});
986+
}
987+
988+
/* Store the offsets we need to store, or at least record them for cache invalidation reasons. */
989+
if (eachMessageProcessed) {
990+
try {
991+
if (!this.#userManagedStores) {
992+
this.#internalClient.offsetsStore([{
993+
topic: m.topic, partition: m.partition, offset: Number(m.offset) + 1, leaderEpoch: m.leaderEpoch
994+
}]);
995+
}
996+
this.#lastConsumedOffsets.set(`${m.topic}|${m.partition}`, Number(m.offset) + 1);
997+
} catch (e) {
998+
/* Not much we can do, except log the error. */
999+
if (this.#logger)
1000+
this.#logger.error(`Consumer encountered error while storing offset. Error details: ${JSON.stringify(e)}`);
1001+
}
1002+
}
1003+
1004+
/* Force a immediate seek here. It's possible that there are no more messages to be passed to the user,
1005+
* but the user seeked in the call to eachMessage, or else we encountered the error catch block.
1006+
* In that case, the results of that seek will never be reflected unless we do this. */
1007+
if (this.#checkPendingSeeks)
1008+
await this.#seekInternal();
1009+
1010+
/* TODO: another check we need to do here is to see how kafkaJS is handling
1011+
* commits. Are they commmitting after a message is _processed_?
1012+
* In that case we need to turn off librdkafka's auto-commit, and commit
1013+
* inside this function.
1014+
*/
1015+
1016+
/* Release the lock so that any pending disconnect can go through. */
1017+
await this.#lock.release();
1018+
}
1019+
}
1020+
1021+
/* Internal polling loop.
1022+
* It accepts the same config object that `run` accepts, but config.eachBatch must be set. */
1023+
async #runInternalEachBatch(config) {
1024+
while (this.#state === ConsumerState.CONNECTED) {
1025+
1026+
/* We need to acquire a lock here, because we need to ensure that we don't
1027+
* disconnect while in the middle of processing a message. */
1028+
if (!(await acquireOrLog(this.#lock, this.#logger)))
1029+
continue;
1030+
1031+
/* Invalidate the message cache if needed. */
1032+
if (this.#messageCache.isStale()) {
1033+
await this.#clearCacheAndResetPositions(true);
1034+
await this.#lock.release();
1035+
continue;
1036+
}
1037+
1038+
const m = await this.#consumeSingleCached().catch(e => {
1039+
/* Since this error cannot be exposed to the user in the current situation, just log and retry.
1040+
* This is due to restartOnFailure being set to always true. */
1041+
if (this.#logger)
1042+
this.#logger.error(`Consumer encountered error while consuming. Retrying. Error details: ${JSON.stringify(e)}`);
1043+
});
1044+
1045+
if (!m) {
1046+
await this.#lock.release();
1047+
continue;
1048+
}
1049+
1050+
/* TODO: add partitionsConsumedConcurrently-based concurrency here.
1051+
* If we maintain a map of topic partitions to promises, and a counter,
1052+
* we can probably achieve it with the correct guarantees of ordering
1053+
* though to maximize performance, we need to consume only from partitions for which
1054+
* an eachMessage call is not already going.
1055+
* It's risky to consume, and then store the message in something like an
1056+
* array/list until it can be processed, because librdkafka marks it as
1057+
* 'stored'... but anyway - we can implement something like this.
1058+
*/
1059+
1060+
/* Make pending seeks 'concrete'. */
1061+
if (this.#checkPendingSeeks) {
1062+
const invalidateMessage = await this.#seekInternal({ topic: m.topic, partition: m.partition });
1063+
if (invalidateMessage) {
1064+
/* Don't pass this message on to the user if this topic partition was seeked to. */
1065+
this.#lock.release();
1066+
continue;
1067+
}
9621068
}
1069+
1070+
let eachMessageProcessed = false;
1071+
const payload = this.#createBatchPayload(m);
9631072
try {
964-
if (config.eachMessage) {
965-
await config.eachMessage(payload);
1073+
await config.eachBatch(payload);
1074+
if (config.eachBatchAutoResolve) {
9661075
eachMessageProcessed = true;
9671076
} else {
968-
await config.eachBatch(payload);
969-
if (config.eachBatchAutoResolve) {
970-
eachMessageProcessed = true;
971-
} else {
972-
eachMessageProcessed = payload._messageResolved;
973-
}
1077+
eachMessageProcessed = payload._messageResolved;
9741078
}
9751079
} catch (e) {
976-
/* It's not only possible, but expected that an error will be thrown by eachMessage or eachBatch.
1080+
/* It's not only possible, but expected that an error will be thrown by eachBatch.
9771081
* This is especially true since the pattern of pause() followed by throwing an error
9781082
* is encouraged. To meet the API contract, we seek one offset backward (which
9791083
* means seeking to the message offset).

0 commit comments

Comments
 (0)