Skip to content

Commit 6ad531d

Browse files
committed
Remove kafkaJsCompatibilityMode variable and assume true
1 parent 905ff9e commit 6ad531d

File tree

2 files changed

+35
-267
lines changed

2 files changed

+35
-267
lines changed

lib/kafkajs/_consumer.js

Lines changed: 35 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -213,18 +213,17 @@ class Consumer {
213213
*/
214214
#running = false;
215215

216-
/**
217-
* Whether the consumer is in KafkaJS compatibility mode.
218-
* @type {boolean}
219-
*/
220-
#kafkaJSCompatibilityMode = false;
221-
222216
/**
223217
* The message cache for KafkaJS compatibility mode.
224218
* @type {MessageCache|null}
225219
*/
226220
#messageCache = null;
227221

222+
/**
223+
* Whether the user has enabled manual offset management (stores).
224+
*/
225+
#userManagedStores = false;
226+
228227
/**
229228
* @constructor
230229
* @param {import("../../types/kafkajs").ConsumerConfig} kJSConfig
@@ -343,12 +342,13 @@ class Consumer {
343342
try {
344343
if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) {
345344

346-
if (this.#checkPendingSeeks && this.#kafkaJSCompatibilityMode)
345+
const checkPendingSeeks = this.#pendingSeeks.size !== 0;
346+
if (checkPendingSeeks)
347347
assignment = this.#assignAsPerSeekedOffsets(assignment);
348348

349349
this.#internalClient.assign(assignment);
350350

351-
if (this.#checkPendingSeeks) {
351+
if (checkPendingSeeks) {
352352
const offsetsToCommit = assignment
353353
.filter((topicPartition) => topicPartition.offset !== undefined)
354354
.map((topicPartition) => ({
@@ -491,11 +491,6 @@ class Consumer {
491491
throw new error.KafkaJSError(CompatibilityErrorMessages.runOptionsAutoCommitThreshold(), { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
492492
}
493493

494-
/* Offset storage management is manual in kafkaJS compability mode if auto-commit is turned on (ie by default). */
495-
if (rdKafkaConfig['enable.auto.commit']) {
496-
rdKafkaConfig['enable.auto.offset.store'] = false;
497-
}
498-
499494
/* Set the logger */
500495
if (Object.hasOwn(kjsConfig, 'logger')) {
501496
this.#logger = kjsConfig.logger;
@@ -517,8 +512,6 @@ class Consumer {
517512
* log level, as librdkafka will control the granularity. */
518513
if (!compatibleConfig || Object.keys(compatibleConfig).length === 0) {
519514
this.#logger.setLogLevel(logLevel.DEBUG);
520-
} else {
521-
this.#kafkaJSCompatibilityMode = true;
522515
}
523516

524517
/* Even if we are in compability mode, setting a 'debug' in the main config must override the logger's level. */
@@ -542,6 +535,21 @@ class Consumer {
542535
}
543536
rdKafkaConfig['rebalance_cb'] = this.#rebalanceCallback.bind(this);
544537

538+
/* Offset management is different from case to case.
539+
* Case 1: User has changed value of enable.auto.offset.store. In this case, we respect that.
540+
* Case 2: automatic committing is on. In this case, we turn off auto.offset.store and store offsets manually.
541+
* this is necessary for cache invalidation and management, as we want to put things into the store
542+
* after eachMessage is called, and not on consume itself.
543+
* Case 3: automatic committing is off. In this case, we turn off auto.offset.store too. Since the user might
544+
* call an empty commit() and expect things to work properly (ie. the right offsets be stored).
545+
* All this works out a singular, simple condition.
546+
*/
547+
if (!Object.hasOwn(this.#userConfig, 'enable.auto.offset.store')) {
548+
rdKafkaConfig['enable.auto.offset.store'] = false;
549+
} else {
550+
this.#userManagedStores = !rdKafkaConfig['enable.auto.offset.store'];
551+
}
552+
545553
return rdKafkaConfig;
546554
}
547555

@@ -699,10 +707,7 @@ class Consumer {
699707
}
700708

701709
const rdKafkaConfig = this.#config();
702-
703-
if (this.#kafkaJSCompatibilityMode) {
704-
this.#messageCache = new MessageCache(Math.floor(rdKafkaConfig['max.poll.interval.ms'] * 0.8));
705-
}
710+
this.#messageCache = new MessageCache(Math.floor(rdKafkaConfig['max.poll.interval.ms'] * 0.8));
706711

707712
this.#state = ConsumerState.CONNECTING;
708713
this.#internalClient = new RdKafka.KafkaConsumer(rdKafkaConfig);
@@ -804,10 +809,6 @@ class Consumer {
804809
throw new error.KafkaJSError(CompatibilityErrorMessages.runOptionsPartitionsConsumedConcurrently(), { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
805810
}
806811

807-
if (!this.#kafkaJSCompatibilityMode) {
808-
throw new error.KafkaJSError('run() can only be used in KafkaJS compatibility mode.', { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
809-
}
810-
811812
if (this.#running) {
812813
throw new error.KafkaJSError('Consumer is already running.', { code: error.ErrorCodes.ERR__STATE });
813814
}
@@ -829,7 +830,7 @@ class Consumer {
829830
/* Invalidate the message cache if needed. */
830831
if (this.#messageCache.isStale()) {
831832
await this.#clearCacheAndResetPositions(true);
832-
this.#lock.release();
833+
await this.#lock.release();
833834
continue;
834835
}
835836

@@ -841,7 +842,7 @@ class Consumer {
841842
});
842843

843844
if (!m) {
844-
this.#lock.release();
845+
await this.#lock.release();
845846
continue;
846847
}
847848

@@ -876,7 +877,7 @@ class Consumer {
876877
* This is especially true since the pattern of pause() followed by throwing an error
877878
* is encouraged. To meet the API contract, we seek one offset backward at this point (which
878879
* means seeking to the message offset). */
879-
this.seek({
880+
await this.seek({
880881
topic: m.topic,
881882
partition: m.partition,
882883
offset: m.offset,
@@ -886,7 +887,7 @@ class Consumer {
886887
/* Store the offsets we need to store, or at least record them for cache invalidation reasons. */
887888
if (eachMessageProcessed) {
888889
try {
889-
if (this.#internalConfig['enable.auto.commit']) {
890+
if (!this.#userManagedStores) {
890891
this.#internalClient.offsetsStore([{ topic: m.topic, partition: m.partition, offset: Number(m.offset) + 1 }]);
891892
}
892893
this.#lastConsumedOffsets.set(`${m.topic}|${m.partition}`, Number(m.offset) + 1);
@@ -916,14 +917,12 @@ class Consumer {
916917

917918
/**
918919
* Consumes a single message from the consumer within the given timeout.
920+
* THIS METHOD IS NOT IMPLEMENTED.
919921
* @note This method cannot be used with run(). Either that, or this must be used.
920922
*
921923
* @param {any} args
922924
* @param {number} args.timeout - the timeout in milliseconds, defaults to 1000.
923925
* @returns {import("../..").Message|null} a message, or null if the timeout was reached.
924-
*
925-
* @note This API is currently in an experimental stage and subject to change.
926-
* This should not be used in KafkaJS compatibility mode (ie with kafkaJS blocks in the config).
927926
*/
928927
async consume({ timeout } = { timeout: 1000 }) {
929928
if (this.#state !== ConsumerState.CONNECTED) {
@@ -934,10 +933,6 @@ class Consumer {
934933
throw new error.KafkaJSError('consume() and run() cannot be used together.', { code: error.ErrorCodes.ERR__CONFLICT });
935934
}
936935

937-
if (this.#kafkaJSCompatibilityMode) {
938-
throw new error.KafkaJSError('consume() cannot be used in KafkaJS compatibility mode.', { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
939-
}
940-
941936
this.#internalClient.setDefaultConsumeTimeout(timeout);
942937
let m = null;
943938

@@ -948,7 +943,8 @@ class Consumer {
948943
this.#internalClient.setDefaultConsumeTimeout(undefined);
949944
}
950945

951-
return m ?? null;
946+
throw new error.KafkaJSError('consume() is not implemented.' + m, { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
947+
// return m ?? null;
952948
}
953949

954950
async #commitOffsetsUntilNoStateErr(offsetsToCommit) {
@@ -1043,8 +1039,7 @@ class Consumer {
10431039
/* We need a complete reset of the cache if we're seeking to a different offset even for one partition.
10441040
* At a later point, this may be improved at the cost of added complexity of maintaining message generation,
10451041
* or else purging the cache of just those partitions which are seeked. */
1046-
if (this.#kafkaJSCompatibilityMode)
1047-
await this.#clearCacheAndResetPositions(true);
1042+
await this.#clearCacheAndResetPositions(true);
10481043

10491044
/* It's assumed that topicPartition is already assigned, and thus can be seeked to and committed to.
10501045
* Errors are logged to detect bugs in the internal code. */
@@ -1064,7 +1059,7 @@ class Consumer {
10641059
}
10651060

10661061
/* Offsets are committed on seek only when in compatibility mode. */
1067-
if (offsetsToCommit.length !== 0 && this.#internalConfig['enable.auto.commit'] && this.#kafkaJSCompatibilityMode) {
1062+
if (offsetsToCommit.length !== 0 && this.#internalConfig['enable.auto.commit']) {
10681063
await this.#commitOffsetsUntilNoStateErr(offsetsToCommit);
10691064
}
10701065

@@ -1078,7 +1073,7 @@ class Consumer {
10781073
* If at any time, the consumer is assigned the partition, the seek will be performed.
10791074
* Depending on the value of the librdkafka property 'enable.auto.commit', the consumer will commit the offset seeked to.
10801075
* @param {import("../../types/kafkajs").TopicPartitionOffset} topicPartitionOffset
1081-
* @returns {Promise<void>|null} a promise that resolves when the seek has been performed (only when not in compatibility mode), or null (when in compatibility mode)
1076+
* @returns {Promise<void>|null} a promise that resolves when the seek has been performed.
10821077
*/
10831078
seek(topicPartitionOffset) {
10841079
if (this.#state !== ConsumerState.CONNECTED) {
@@ -1098,12 +1093,6 @@ class Consumer {
10981093

10991094
this.#checkPendingSeeks = true;
11001095
this.#pendingSeeks.set(`${rdKafkaTopicPartitionOffset.topic}|${rdKafkaTopicPartitionOffset.partition}`, rdKafkaTopicPartitionOffset.offset);
1101-
1102-
/* Immediately realize the seek if we're not in compatibility mode. And clear pending seeks.
1103-
* We don't need them for rebalance. */
1104-
if (!this.#kafkaJSCompatibilityMode) {
1105-
return this.#seekInternal().then(() => this.#pendingSeeks.clear());
1106-
}
11071096
}
11081097

11091098
async describeGroup() {
@@ -1160,8 +1149,7 @@ class Consumer {
11601149
return;
11611150
}
11621151
this.#internalClient.pause(topics);
1163-
if (this.#kafkaJSCompatibilityMode)
1164-
this.#messageCache.stale = true;
1152+
this.#messageCache.stale = true;
11651153

11661154
topics.map(JSON.stringify).forEach(topicPartition => this.#pausedPartitions.add(topicPartition));
11671155

0 commit comments

Comments
 (0)