Skip to content

Commit 52a6ccb

Browse files
committed
Remove store from promisified API
1 parent de76f03 commit 52a6ccb

File tree

3 files changed

+14
-274
lines changed

3 files changed

+14
-274
lines changed

deps/librdkafka

Submodule librdkafka updated 101 files

lib/kafkajs/_consumer.js

Lines changed: 13 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,6 @@ class Consumer {
125125
*/
126126
#messageCache = null;
127127

128-
/**
129-
* Whether the user has enabled manual offset management (stores).
130-
*/
131-
#userManagedStores = false;
132-
133128
/**
134129
* Whether the user has enabled manual offset management (commits).
135130
*/
@@ -536,20 +531,14 @@ class Consumer {
536531
rdKafkaConfig['offset_commit_cb'] = true;
537532
rdKafkaConfig['rebalance_cb'] = this.#rebalanceCallback.bind(this);
538533

539-
/* Offset management is different from case to case.
540-
* Case 1: User has changed value of enable.auto.offset.store. In this case, we respect that.
541-
* Case 2: automatic committing is on. In this case, we turn off auto.offset.store and store offsets manually.
542-
* this is necessary for cache invalidation and management, as we want to put things into the store
543-
* after eachMessage is called, and not on consume itself.
544-
* Case 3: automatic committing is off. In this case, we turn off auto.offset.store too. Since the user might
545-
* call an empty commit() and expect things to work properly (ie. the right offsets be stored).
546-
* All this works out a singular, simple condition.
547-
*/
548-
if (!Object.hasOwn(this.#userConfig, 'enable.auto.offset.store')) {
549-
rdKafkaConfig['enable.auto.offset.store'] = false;
550-
} else {
551-
this.#userManagedStores = !rdKafkaConfig['enable.auto.offset.store'];
534+
/* We handle offset storage within the promisified API by ourselves. Thus we don't allow the user to change this
535+
* setting and set it to false. */
536+
if (Object.hasOwn(this.#userConfig, 'enable.auto.offset.store')) {
537+
throw new error.KafkaJSError(
538+
"Changing 'enable.auto.offset.store' is unsupported while using the promisified API.",
539+
{ code: error.ErrorCodes.ERR__INVALID_ARG });
552540
}
541+
rdKafkaConfig['enable.auto.offset.store'] = false;
553542

554543
if (!Object.hasOwn(rdKafkaConfig, 'enable.auto.commit')) {
555544
this.#autoCommit = true; /* librdkafka default. */
@@ -663,13 +652,11 @@ class Consumer {
663652
payload._lastResolvedOffset = { offset, leaderEpoch };
664653

665654
try {
666-
if (!this.#userManagedStores) {
667-
this.#internalClient._offsetsStoreSingle(
668-
topic,
669-
partition,
670-
offset + 1,
671-
leaderEpoch);
672-
}
655+
this.#internalClient._offsetsStoreSingle(
656+
topic,
657+
partition,
658+
offset + 1,
659+
leaderEpoch);
673660
this.#lastConsumedOffsets.set(key, offset + 1);
674661
} catch (e) {
675662
/* Not much we can do, except log the error. */
@@ -1076,9 +1063,7 @@ class Consumer {
10761063
/* Store the offsets we need to store, or at least record them for cache invalidation reasons. */
10771064
if (eachMessageProcessed) {
10781065
try {
1079-
if (!this.#userManagedStores) {
1080-
this.#internalClient._offsetsStoreSingle(m.topic, m.partition, Number(m.offset) + 1, m.leaderEpoch);
1081-
}
1066+
this.#internalClient._offsetsStoreSingle(m.topic, m.partition, Number(m.offset) + 1, m.leaderEpoch);
10821067
this.#lastConsumedOffsets.set(partitionKey(m), Number(m.offset) + 1);
10831068
} catch (e) {
10841069
/* Not much we can do, except log the error. */
@@ -1299,30 +1284,6 @@ class Consumer {
12991284
// return m ?? null;
13001285
}
13011286

1302-
/**
1303-
* Store offsets for the given topic partitions.
1304-
*
1305-
* Stored offsets will be commited automatically at a later point if autoCommit is enabled.
1306-
* Otherwise, they will be committed when commitOffsets is called without arguments.
1307-
*
1308-
* enable.auto.offset.store must be set to false to use this API.
1309-
* @param {import("../../types/kafkajs").TopicPartitionOffset[]?} topicPartitions
1310-
*/
1311-
storeOffsets(topicPartitions) {
1312-
if (this.#state !== ConsumerState.CONNECTED) {
1313-
throw new error.KafkaJSError('Store can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
1314-
}
1315-
1316-
if (!this.#userManagedStores) {
1317-
throw new error.KafkaJSError(
1318-
'Store can only be called when enable.auto.offset.store is explicitly set to false.', { code: error.ErrorCodes.ERR__INVALID_ARG });
1319-
}
1320-
1321-
const topicPartitionsRdKafka = topicPartitions.map(
1322-
topicPartitionOffsetMetadataToRdKafka);
1323-
this.#internalClient.offsetsStore(topicPartitionsRdKafka);
1324-
}
1325-
13261287
async #commitOffsetsUntilNoStateErr(offsetsToCommit) {
13271288
let err = { code: error.ErrorCodes.ERR_NO_ERROR };
13281289
do {

test/promisified/consumer/store.spec.js

Lines changed: 0 additions & 221 deletions
This file was deleted.

0 commit comments

Comments
 (0)