Skip to content

Commit b140a48

Browse files
committed
Add partition level concurrency to faux-eachBatch
1 parent 4ea26f0 commit b140a48

File tree

1 file changed

+117
-100
lines changed

1 file changed

+117
-100
lines changed

lib/kafkajs/_consumer.js

Lines changed: 117 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,6 @@ class Consumer {
665665

666666
return new Promise((resolve, reject) => {
667667
this.#internalClient.consume(this.#messageCache.maxSize, (err, messages) => {
668-
669668
if (err) {
670669
reject(createKafkaJsErrorFromLibRdKafkaError(err));
671670
return;
@@ -882,6 +881,9 @@ class Consumer {
882881
* So - do nothing but a debug log, but at this point eachMessageProcessed is false.
883882
*/
884883
this.#logger.debug(`Consumer encountered error while processing message. Error details: ${e}: ${e.stack}. The same message may be reprocessed.`);
884+
885+
/* TODO: log error if error type is not KafkaJSError and if no pause() has been called */
886+
this.#logger.error(`Consumer encountered error while processing message. Error details: ${e}: ${e.stack}. The same message may be reprocessed.`);
885887
}
886888

887889
/* If the message is unprocessed, due to an error, or because the user has not resolved it, we seek back. */
@@ -920,6 +922,82 @@ class Consumer {
920922
return m.index;
921923
}
922924

925+
/**
926+
* Processes a batch message (a single message as of now).
927+
*
928+
* @param m Message as obtained from #consumeSingleCached.
929+
* @param config Config as passed to run().
930+
* @returns {Promise<number>} the cache index of the message that was processed.
931+
*/
932+
async #batchProcessor(m, config) {
933+
let eachMessageProcessed = false;
934+
const payload = this.#createBatchPayload(m);
935+
try {
936+
await config.eachBatch(payload);
937+
if (config.eachBatchAutoResolve) {
938+
eachMessageProcessed = true;
939+
} else {
940+
eachMessageProcessed = payload._messageResolved;
941+
}
942+
} catch (e) {
943+
/* It's not only possible, but expected that an error will be thrown by eachBatch.
944+
* This is especially true since the pattern of pause() followed by throwing an error
945+
* is encouraged. To meet the API contract, we seek one offset backward (which
946+
* means seeking to the message offset).
947+
* However, we don't do this inside the catch, but just outside it. This is because throwing an
948+
* error is not the only case where we might want to seek back. We might want to seek back
949+
* if the user has not called `resolveOffset` manually in case of using eachBatch without
950+
* eachBatchAutoResolve being set.
951+
*
952+
* So - do nothing but a debug log, but at this point eachMessageProcessed needs to be false unless
953+
* the user has explicitly marked it as true.
954+
*/
955+
this.#logger.debug(`Consumer encountered error while processing message. Error details: ${e}: ${e.stack}. The same message may be reprocessed.`);
956+
957+
/* TODO: log error if error type is not KafkaJSError and if no pause() has been called */
958+
this.#logger.error(`Consumer encountered error while processing message. Error details: ${e}: ${e.stack}. The same message may be reprocessed.`);
959+
960+
/* The value of eachBatchAutoResolve is not important. The only place where a message is marked processed
961+
* despite an error is if the user says so, and the user can use resolveOffsets for both the possible
962+
* values eachBatchAutoResolve can take. */
963+
if (config.eachBatch)
964+
eachMessageProcessed = payload._messageResolved
965+
}
966+
967+
/* If the message is unprocessed, due to an error, or because the user has not resolved it, we seek back. */
968+
if (!eachMessageProcessed) {
969+
await this.seek({
970+
topic: m.topic,
971+
partition: m.partition,
972+
offset: m.offset,
973+
});
974+
}
975+
976+
/* Store the offsets we need to store, or at least record them for cache invalidation reasons. */
977+
if (eachMessageProcessed) {
978+
try {
979+
if (!this.#userManagedStores) {
980+
this.#internalClient.offsetsStore([{
981+
topic: m.topic, partition: m.partition, offset: Number(m.offset) + 1, leaderEpoch: m.leaderEpoch
982+
}]);
983+
}
984+
this.#lastConsumedOffsets.set(partitionKey(m), Number(m.offset) + 1);
985+
} catch (e) {
986+
/* Not much we can do, except log the error. */
987+
if (this.#logger)
988+
this.#logger.error(`Consumer encountered error while storing offset. Error details: ${JSON.stringify(e)}`);
989+
}
990+
}
991+
992+
/* Force a immediate seek here. It's possible that there are no more messages to be passed to the user,
993+
* but the user seeked in the call to eachMessage, or else we encountered the error catch block.
994+
* In that case, the results of that seek will never be reflected unless we do this. */
995+
if (this.#checkPendingSeeks)
996+
await this.#seekInternal();
997+
998+
return m.index;
999+
}
1000+
9231001
/**
9241002
* Awaits the completion of a single message's processing.
9251003
*
@@ -1030,130 +1108,69 @@ class Consumer {
10301108
/* Internal polling loop.
10311109
* It accepts the same config object that `run` accepts, but config.eachBatch must be set. */
10321110
async #runInternalEachBatch(config) {
1033-
let savedIdx = -1;
1034-
while (this.#state === ConsumerState.CONNECTED) {
1111+
const concurrency = config.partitionsConsumedConcurrently;
1112+
let nextIdx = -1;
1113+
while (!(await acquireOrLog(this.#lock, this.#logger)));
10351114

1036-
/* We need to acquire a lock here, because we need to ensure that we don't
1037-
* disconnect while in the middle of processing a message. */
1038-
if (!(await acquireOrLog(this.#lock, this.#logger)))
1039-
continue;
1115+
while (this.#state === ConsumerState.CONNECTED) {
1116+
/* Release lock and cleanup if we intend to disconnect. */
1117+
if (this.#disconnectStarted) {
1118+
const indices = await this.waitAll();
1119+
indices.forEach(idx => this.#messageCache.return(idx));
1120+
if (nextIdx !== -1) {
1121+
this.#messageCache.return(nextIdx);
1122+
}
1123+
nextIdx = -1;
1124+
this.#lock.release();
1125+
break;
1126+
}
10401127

10411128
/* Invalidate the message cache if needed */
10421129
const locallyStale = this.#messageCache.popLocallyStale();
10431130
if (this.#messageCache.isStale()) { /* global staleness */
1131+
const indices = await this.waitAll();
1132+
indices.forEach(idx => this.#messageCache.return(idx));
1133+
if (nextIdx !== -1) {
1134+
this.#messageCache.return(nextIdx);
1135+
}
1136+
nextIdx = -1;
10441137
await this.#clearCacheAndResetPositions();
1045-
await this.#lock.release();
10461138
continue;
10471139
} else if (locallyStale.length !== 0) { /* local staleness */
1140+
// TODO: is it correct to await some concurrent promises for eachMessage here?
1141+
// to be safe we can do it, but I don't think we really need to do that for
1142+
// any correctness reason.
10481143
await this.#clearCacheAndResetPositions(locallyStale);
1049-
await this.#lock.release();
10501144
continue;
10511145
}
10521146

1053-
const m = await this.#consumeSingleCached(savedIdx).catch(e => {
1147+
const m = await this.#consumeSingleCached(nextIdx).catch(e => {
10541148
/* Since this error cannot be exposed to the user in the current situation, just log and retry.
10551149
* This is due to restartOnFailure being set to always true. */
10561150
if (this.#logger)
10571151
this.#logger.error(`Consumer encountered error while consuming. Retrying. Error details: ${JSON.stringify(e)}`);
10581152
});
10591153

1060-
if (!m) {
1061-
savedIdx = -1;
1062-
await this.#lock.release();
1063-
continue;
1064-
}
1065-
savedIdx = m.index;
1066-
1067-
/* TODO: add partitionsConsumedConcurrently-based concurrency here.
1068-
* If we maintain a map of topic partitions to promises, and a counter,
1069-
* we can probably achieve it with the correct guarantees of ordering
1070-
* though to maximize performance, we need to consume only from partitions for which
1071-
* an eachMessage call is not already going.
1072-
* It's risky to consume, and then store the message in something like an
1073-
* array/list until it can be processed, because librdkafka marks it as
1074-
* 'stored'... but anyway - we can implement something like this.
1075-
*/
1076-
1077-
/* Make pending seeks 'concrete'. */
1078-
if (this.#checkPendingSeeks) {
1079-
const invalidateMessage = await this.#seekInternal({ topic: m.topic, partition: m.partition });
1080-
if (invalidateMessage) {
1081-
/* Don't pass this message on to the user if this topic partition was seeked to. */
1082-
this.#lock.release();
1083-
continue;
1084-
}
1085-
}
1154+
nextIdx = -1;
10861155

1087-
let eachMessageProcessed = false;
1088-
const payload = this.#createBatchPayload(m);
1089-
try {
1090-
await config.eachBatch(payload);
1091-
if (config.eachBatchAutoResolve) {
1092-
eachMessageProcessed = true;
1093-
} else {
1094-
eachMessageProcessed = payload._messageResolved;
1156+
if (!m) {
1157+
// await any concurrency related promises right here if this is null, if any such promise exists.
1158+
// see note in consumeSingleCached
1159+
if (this.#runningPromises.length) {
1160+
nextIdx = await this.waitOne();
10951161
}
1096-
} catch (e) {
1097-
/* It's not only possible, but expected that an error will be thrown by eachBatch.
1098-
* This is especially true since the pattern of pause() followed by throwing an error
1099-
* is encouraged. To meet the API contract, we seek one offset backward (which
1100-
* means seeking to the message offset).
1101-
* However, we don't do this inside the catch, but just outside it. This is because throwing an
1102-
* error is not the only case where we might want to seek back. We might want to seek back
1103-
* if the user has not called `resolveOffset` manually in case of using eachBatch without
1104-
* eachBatchAutoResolve being set.
1105-
*
1106-
* So - do nothing but a debug log, but at this point eachMessageProcessed needs to be false unless
1107-
* the user has explicitly marked it as true.
1108-
*/
1109-
this.#logger.debug(`Consumer encountered error while processing message. Error details: ${e}: ${e.stack}. The same message may be reprocessed.`);
1110-
1111-
/* The value of eachBatchAutoResolve is not important. The only place where a message is marked processed
1112-
* despite an error is if the user says so, and the user can use resolveOffsets for both the possible
1113-
* values eachBatchAutoResolve can take. */
1114-
if (config.eachBatch)
1115-
eachMessageProcessed = payload._messageResolved
1162+
continue;
11161163
}
11171164

1118-
/* If the message is unprocessed, due to an error, or because the user has not resolved it, we seek back. */
1119-
if (!eachMessageProcessed) {
1120-
await this.seek({
1121-
topic: m.topic,
1122-
partition: m.partition,
1123-
offset: m.offset,
1124-
});
1125-
}
1165+
const p = this.#batchProcessor(m, config);
1166+
this.#runningPromises.push(p);
1167+
this.#savedIndexToPromiseIndex.push(m.index);
11261168

1127-
/* Store the offsets we need to store, or at least record them for cache invalidation reasons. */
1128-
if (eachMessageProcessed) {
1129-
try {
1130-
if (!this.#userManagedStores) {
1131-
this.#internalClient.offsetsStore([{
1132-
topic: m.topic, partition: m.partition, offset: Number(m.offset) + 1, leaderEpoch: m.leaderEpoch
1133-
}]);
1134-
}
1135-
this.#lastConsumedOffsets.set(partitionKey(m), Number(m.offset) + 1);
1136-
} catch (e) {
1137-
/* Not much we can do, except log the error. */
1138-
if (this.#logger)
1139-
this.#logger.error(`Consumer encountered error while storing offset. Error details: ${JSON.stringify(e)}`);
1140-
}
1169+
if (this.#runningPromises.length < concurrency) {
1170+
continue;
11411171
}
11421172

1143-
/* Force a immediate seek here. It's possible that there are no more messages to be passed to the user,
1144-
* but the user seeked in the call to eachMessage, or else we encountered the error catch block.
1145-
* In that case, the results of that seek will never be reflected unless we do this. */
1146-
if (this.#checkPendingSeeks)
1147-
await this.#seekInternal();
1148-
1149-
/* TODO: another check we need to do here is to see how kafkaJS is handling
1150-
* commits. Are they commmitting after a message is _processed_?
1151-
* In that case we need to turn off librdkafka's auto-commit, and commit
1152-
* inside this function.
1153-
*/
1154-
1155-
/* Release the lock so that any pending disconnect can go through. */
1156-
await this.#lock.release();
1173+
nextIdx = await this.waitOne();
11571174
}
11581175
}
11591176

0 commit comments

Comments
 (0)