Skip to content

Commit b2b546a

Browse files
committed
Add batch staleness, resolution, and offset management to eachBatch
1 parent 2f4bd76 commit b2b546a

File tree

1 file changed

+149
-40
lines changed

1 file changed

+149
-40
lines changed

lib/kafkajs/_consumer.js

Lines changed: 149 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ class Consumer {
130130
*/
131131
#userManagedStores = false;
132132

133+
/**
134+
* Whether the user has enabled manual offset management (commits).
135+
*/
136+
#autoCommit = false;
137+
133138
/**
134139
* Signals an intent to disconnect the consumer.
135140
*/
@@ -161,6 +166,16 @@ class Consumer {
161166
*/
162167
#fetchInProgress = false;
163168

169+
/**
170+
* Maps topic-partition key to the batch payload for marking staleness.
171+
*
172+
* Only used with eachBatch.
173+
* NOTE: given that size of this map will never exceed #concurrency, a
174+
* linear search might actually be faster over what will generally be <10 elems.
175+
* But a map makes conceptual sense. Revise at a later point if needed.
176+
*/
177+
#topicPartitionToBatchPayload = new Map();
178+
164179
/**
165180
* TODO: remove this or make it a bit more reliable.
166181
* This is a debug property for this branch.
@@ -540,6 +555,12 @@ class Consumer {
540555
this.#userManagedStores = !rdKafkaConfig['enable.auto.offset.store'];
541556
}
542557

558+
if (!Object.hasOwn(rdKafkaConfig, 'enable.auto.commit')) {
559+
this.#autoCommit = true; /* librdkafka default. */
560+
} else {
561+
this.#autoCommit = rdKafkaConfig['enable.auto.commit'];
562+
}
563+
543564
return rdKafkaConfig;
544565
}
545566

@@ -616,6 +637,66 @@ class Consumer {
616637
};
617638
}
618639

640+
/**
641+
* Method used by #createBatchPayload to resolve offsets.
642+
* Resolution stores the offset into librdkafka if needed, and into the lastConsumedOffsets map
643+
* that we use for seeking to the last consumed offset when forced to clear cache.
644+
*
645+
* @param {*} payload The payload we're creating. This is a method attached to said object.
646+
* @param {*} offsetToResolve The offset to resolve.
647+
* @param {*} leaderEpoch The leader epoch of the message (optional). We expect users to provide it, but for API-compatibility reasons, it's optional.
648+
*/
649+
#eachBatchPayload_resolveOffsets(payload, offsetToResolve, leaderEpoch = -1) {
650+
const offset = +offsetToResolve;
651+
652+
if (isNaN(offset)) {
653+
/* Not much we can do but throw and log an error. */
654+
const e = new error.KafkaJSError(`Invalid offset to resolve: ${offsetToResolve}`, { code: error.ErrorCodes.ERR__INVALID_ARG });
655+
throw e;
656+
}
657+
658+
/* The user might resolve offset N (< M) after resolving offset M. Given that in librdkafka we can only
659+
* store one offset, store the last possible one. */
660+
if (offset <= payload._lastResolvedOffset.offset)
661+
return;
662+
663+
const topic = payload.batch.topic;
664+
const partition = payload.batch.partition;
665+
const key = partitionKey({ topic, partition });
666+
667+
payload._lastResolvedOffset = { offset, leaderEpoch };
668+
669+
try {
670+
if (!this.#userManagedStores) {
671+
this.#internalClient.offsetsStore([{
672+
topic,
673+
partition,
674+
offset: offset + 1,
675+
leaderEpoch: leaderEpoch,
676+
}]);
677+
}
678+
this.#lastConsumedOffsets.set(key, offset + 1);
679+
} catch (e) {
680+
/* Not much we can do, except log the error. */
681+
if (this.#logger)
682+
this.#logger.error(`Consumer encountered error while storing offset. Error details: ${e}:${e.stack}`);
683+
}
684+
}
685+
686+
/**
687+
* Method used by #createBatchPayload to commit offsets.
688+
*/
689+
async #eachBatchPayload_commitOffsetsIfNecessary() {
690+
if (this.#autoCommit) {
691+
/* librdkafka internally handles committing of whatever we store.
692+
* We don't worry about it here. */
693+
return;
694+
}
695+
/* If the offsets are being resolved by the user, they've already called resolveOffset() at this point
696+
* We just need to commit the offsets that we've stored. */
697+
await this.commitOffsets();
698+
}
699+
619700
/**
620701
* Converts a list of messages returned by node-rdkafka into a message that can be used by the eachBatch callback.
621702
* @param {import("../..").Message[]} messages - must not be empty. Must contain messages from the same topic and partition.
@@ -666,27 +747,31 @@ class Consumer {
666747
const batch = {
667748
topic,
668749
partition,
669-
highWatermark: '-1001', // Invalid - we don't fetch it
750+
highWatermark: '-1001', /* We don't fetch it yet. We can call committed() to fetch it but that might incur network calls. */
670751
messages: messagesConverted,
671752
isEmpty: () => false,
672-
firstOffset: () => messagesConverted[0].offset,
673-
lastOffset: () => messagesConverted[messagesConverted.length - 1].offset,
753+
firstOffset: () => (messagesConverted[0].offset).toString(),
754+
lastOffset: () => (messagesConverted[messagesConverted.length - 1].offset).toString(),
674755
offsetLag: () => notImplemented(),
675756
offsetLagLow: () => notImplemented(),
676757
};
677758

678759
const returnPayload = {
679760
batch,
680-
_messageResolved: false,
681-
resolveOffset: () => { returnPayload._messageResolved = true; },
761+
_stale: false,
762+
_lastResolvedOffset: { offset: -1, leaderEpoch: -1 },
682763
heartbeat: async () => { /* no op */ },
683764
pause: this.pause.bind(this, [{ topic, partitions: [partition] }]),
684-
commitOffsetsIfNecessary: async () => { /* no op */ },
685-
uncommittedOffsets: () => notImplemented(),
765+
commitOffsetsIfNecessary: this.#eachBatchPayload_commitOffsetsIfNecessary.bind(this),
686766
isRunning: () => this.#running,
687-
isStale: () => false,
767+
isStale: () => returnPayload._stale,
768+
/* NOTE: Probably never to be implemented. Not sure exactly how we'd compute this
769+
* inexpensively. */
770+
uncommittedOffsets: () => notImplemented(),
688771
};
689772

773+
returnPayload.resolveOffset = this.#eachBatchPayload_resolveOffsets.bind(this, returnPayload);
774+
690775
return returnPayload;
691776
}
692777

@@ -1020,20 +1105,33 @@ class Consumer {
10201105
/**
10211106
* Processes a batch of messages.
10221107
*
1023-
* @param ms Messages as obtained from #consumeCachedN.
1108+
* @param ms Messages as obtained from #consumeCachedN (ms.length !== 0).
10241109
* @param config Config as passed to run().
10251110
* @returns {Promise<number>} the cache index of the message that was processed.
10261111
*/
10271112
async #batchProcessor(ms, config) {
1028-
let eachMessageProcessed = false;
1113+
const key = partitionKey(ms[0]);
10291114
const payload = this.#createBatchPayload(ms);
1115+
1116+
this.#topicPartitionToBatchPayload.set(key, payload);
1117+
1118+
let lastOffsetProcessed = { offset: -1, leaderEpoch: -1 };
1119+
const lastOffset = +(ms[ms.length - 1].offset);
1120+
const lastLeaderEpoch = ms[ms.length - 1].leaderEpoch;
10301121
try {
10311122
await config.eachBatch(payload);
1032-
if (config.eachBatchAutoResolve) {
1033-
eachMessageProcessed = true;
1034-
} else {
1035-
eachMessageProcessed = payload._messageResolved;
1123+
1124+
/* If the user isn't resolving offsets, we resolve them here. It's significant here to call this method
1125+
* because besides updating `payload._lastResolvedOffset`, this method is also storing the offsets to
1126+
* librdkafka, and accounting for any cache invalidations.
1127+
* Don't bother resolving offsets if payload became stale at some point. We can't know when the payload
1128+
* became stale, so either the user has been nice enough to keep resolving messages, or we must seek to
1129+
* the first offset to ensure no message loss. */
1130+
if (config.eachBatchAutoResolve && !payload._stale) {
1131+
payload.resolveOffset(lastOffset, lastLeaderEpoch);
10361132
}
1133+
1134+
lastOffsetProcessed = payload._lastResolvedOffset;
10371135
} catch (e) {
10381136
/* It's not only possible, but expected that an error will be thrown by eachBatch.
10391137
* This is especially true since the pattern of pause() followed by throwing an error
@@ -1053,38 +1151,24 @@ class Consumer {
10531151
this.#logger.error(`Consumer encountered error while processing message. Error details: ${e}: ${e.stack}. The same message may be reprocessed.`);
10541152

10551153
/* The value of eachBatchAutoResolve is not important. The only place where a message is marked processed
1056-
* despite an error is if the user says so, and the user can use resolveOffsets for both the possible
1154+
* despite an error is if the user says so, and the user can use resolveOffset for both the possible
10571155
* values eachBatchAutoResolve can take. */
1058-
eachMessageProcessed = payload._messageResolved;
1156+
lastOffsetProcessed = payload._lastResolvedOffset;
10591157
}
10601158

1061-
/* If the message is unprocessed, due to an error, or because the user has not resolved it, we seek back. */
1062-
/* TODO: currently we're seeking to just the first offset. Fix this to take care of messages we are resolving. */
1063-
if (!eachMessageProcessed) {
1159+
this.#topicPartitionToBatchPayload.delete(key);
1160+
1161+
/* If any message is unprocessed, either due to an error or due to the user not marking it processed, we must seek
1162+
* back to get it so it can be reprocessed. */
1163+
if (lastOffsetProcessed.offset !== lastOffset) {
1164+
const offsetToSeekTo = lastOffsetProcessed.offset === -1 ? ms[0].offset : (lastOffsetProcessed.offset + 1);
10641165
await this.seek({
10651166
topic: ms[0].topic,
10661167
partition: ms[0].partition,
1067-
offset: ms[0].offset,
1168+
offset: offsetToSeekTo,
10681169
});
10691170
}
10701171

1071-
/* Store the offsets we need to store, or at least record them for cache invalidation reasons. */
1072-
/* TODO: currently we just store the last offset of the batch. Fix it to store the last resolved one + 1. */
1073-
if (eachMessageProcessed) {
1074-
try {
1075-
if (!this.#userManagedStores) {
1076-
this.#internalClient.offsetsStore([{
1077-
topic: ms[ms.length - 1].topic, partition: ms[ms.length - 1].partition, offset: Number(ms[ms.length - 1].offset) + 1, leaderEpoch: ms[ms.length - 1].leaderEpoch
1078-
}]);
1079-
}
1080-
this.#lastConsumedOffsets.set(partitionKey(ms[ms.length - 1]), Number(ms[ms.length - 1].offset) + 1);
1081-
} catch (e) {
1082-
/* Not much we can do, except log the error. */
1083-
if (this.#logger)
1084-
this.#logger.error(`Consumer encountered error while storing offset. Error details: ${JSON.stringify(e)}`);
1085-
}
1086-
}
1087-
10881172
/* Force a immediate seek here. It's possible that there are no more messages to be passed to the user,
10891173
* but the user seeked in the call to eachMessage, or else we encountered the error catch block.
10901174
* In that case, the results of that seek will never be reflected unless we do this. */
@@ -1164,8 +1248,18 @@ class Consumer {
11641248
while (!this.#disconnectStarted) {
11651249
this.#workerTerminationScheduled = false;
11661250
const workersToSpawn = Math.max(1, Math.min(this.#concurrency, this.#partitionCount));
1167-
this.#workers = Array(workersToSpawn).fill().map((_, i) => this.#worker(config, perMessageProcessor.bind(this), fetcher.bind(this), i));
1168-
await Promise.all(this.#workers);
1251+
this.#workers =
1252+
Array(workersToSpawn)
1253+
.fill()
1254+
.map((_, i) =>
1255+
this.#worker(config, perMessageProcessor.bind(this), fetcher.bind(this), i)
1256+
.catch(e => {
1257+
if (this.#logger)
1258+
this.#logger.error(`Worker ${i} encountered an error: ${e}:${e.stack}`);
1259+
}));
1260+
1261+
/* Best we can do is log errors on worker issues - handled by the catch block above. */
1262+
await Promise.allSettled(this.#workers)
11691263

11701264
/* One of the possible reasons for the workers to end is that the cache is globally stale.
11711265
* We need to take care of expiring it. */
@@ -1416,7 +1510,17 @@ class Consumer {
14161510
}
14171511

14181512
this.#checkPendingSeeks = true;
1419-
this.#pendingSeeks.set(partitionKey(rdKafkaTopicPartitionOffset), rdKafkaTopicPartitionOffset.offset);
1513+
const key = partitionKey(rdKafkaTopicPartitionOffset)
1514+
this.#pendingSeeks.set(key, rdKafkaTopicPartitionOffset.offset);
1515+
1516+
/* Only for eachBatch:
1517+
* Immediately mark the batch it's associated with as stale, even if we don't
1518+
* do the actual 'seekInternal' at this time. This is because we need read-after-write
1519+
* consistency for eachBatch, and calling seek(toppar) from within eachBatch(toppar)
1520+
* should change the result of batch.isStale() immediately. */
1521+
if (this.#topicPartitionToBatchPayload.has(key)) {
1522+
this.#topicPartitionToBatchPayload.get(key)._stale = true;
1523+
}
14201524
}
14211525

14221526
async describeGroup() {
@@ -1490,6 +1594,11 @@ class Consumer {
14901594
* making it unusable. */
14911595
this.#messageCache.markStale(topics);
14921596

1597+
/* If anyone's using eachBatch, mark the batch as stale. */
1598+
topics.map(partitionKey)
1599+
.filter(key => this.#topicPartitionToBatchPayload.has(key))
1600+
.forEach(key => this.#topicPartitionToBatchPayload.get(key)._stale = true);
1601+
14931602
topics.map(JSON.stringify).forEach(topicPartition => this.#pausedPartitions.add(topicPartition));
14941603

14951604
return () => this.resume(topics);

0 commit comments

Comments
 (0)