Skip to content

Commit 31abae4

Browse files
authored
Fix log level config in light of binding logs
* Also add rule for no trailing spaces and enforce it * Fix log level config in light of binding logs
1 parent 2173850 commit 31abae4

File tree

11 files changed

+94
-57
lines changed

11 files changed

+94
-57
lines changed

eslint.config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const ckjsSpecificSettings = {
2121
"no-caller": "error",
2222
"no-new": "error",
2323
"no-eq-null": "error",
24+
"no-trailing-spaces": "error",
2425
"no-constant-condition": "off",
2526
"semi": "error"
2627
}

lib/kafka-consumer.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ KafkaConsumer.prototype.assignments = function() {
345345
*
346346
* @note This method should only be called from within the rebalance callback
347347
* when partitions are revoked.
348-
*
348+
*
349349
* @return {boolean} true if assignment was lost
350350
*/
351351

lib/kafkajs/_admin.js

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,23 @@ class Admin {
116116
#finalizedConfig() {
117117
let compatibleConfig = this.#kafkaJSToAdminConfig(this.#userConfig.kafkaJS);
118118

119-
/* Set the logger's level in case we're not in compatibility mode - just set it to DEBUG, the broadest
120-
* log level, as librdkafka will control the granularity. */
121-
if (!compatibleConfig || Object.keys(compatibleConfig).length === 0) {
119+
/* There can be multiple different and conflicting config directives for setting the log level:
120+
* 1. If there's a kafkaJS block:
121+
* a. If there's a logLevel directive in the kafkaJS block, set the logger level accordingly.
122+
* b. If there's no logLevel directive, set the logger level to INFO.
123+
* (both these are already handled in the conversion method above).
124+
* 2. If there is a log_level or debug directive in the main config, set the logger level accordingly.
125+
* !This overrides any different value provided in the kafkaJS block!
126+
* a. If there's a log_level directive, set the logger level accordingly.
127+
* b. If there's a debug directive, set the logger level to DEBUG regardless of anything else. This is because
128+
* librdkafka ignores log_level if debug is set, and our behaviour should be identical.
129+
* 3. There's nothing at all. Take no action in this case, let the logger use its default log level.
130+
*/
131+
if (Object.hasOwn(this.#userConfig, 'log_level')) {
132+
this.#logger.setLogLevel(severityToLogLevel[this.#userConfig.log_level]);
133+
}
134+
135+
if (Object.hasOwn(this.#userConfig, 'debug')) {
122136
this.#logger.setLogLevel(logLevel.DEBUG);
123137
}
124138

lib/kafkajs/_common.js

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,6 @@ function kafkaJSToRdKafkaConfig(config) {
443443
rdkafkaConfig["log_level"] = 6 /* LOG_INFO */;
444444
break;
445445
case logLevel.DEBUG:
446-
rdkafkaConfig["debug"] = "all" /* Turn on debug logs for everything, otherwise this log level is not useful*/;
447446
rdkafkaConfig["log_level"] = 7 /* LOG_DEBUG */;
448447
break;
449448
default:
@@ -639,7 +638,7 @@ class DeferredPromise extends Promise{
639638
* that takes the same parameter a normal Promise constructor does.
640639
* The DeferredPromise cannot be rejected to avoid unhandled rejections
641640
* entirely.
642-
* @param {(resolve: (value: any) => void, reject: (error: Error) => void) => void} resolver
641+
* @param {(resolve: (value: any) => void, reject: (error: Error) => void) => void} resolver
643642
*/
644643
constructor(resolver) {
645644
let resolveF;
@@ -662,12 +661,12 @@ class DeferredPromise extends Promise{
662661
/**
663662
* Utility class for time related functions
664663
*/
665-
class Timer {
664+
class Timer {
666665
/**
667666
* Function that resolves when the given timeout is reached
668667
* or the passed promise resolves, when it's passed, clearing the timeout
669668
* in any case.
670-
*
669+
*
671670
* @param {number} timeoutMs The timeout in milliseconds.
672671
* @param {Promise|undefined} promise The promise to wait for,
673672
* alternatively to the timeout, or `undefined` to just wait for the timeout.
@@ -698,13 +697,13 @@ class Timer {
698697
class Lock {
699698
// Total number of readers, not increases when already holding a write lock
700699
#readers = 0;
701-
700+
702701
// Total number of writers, increased only by a single write and
703702
// its reentrant calls
704703
#writers = 0;
705704

706705
#asyncLocalStorage = new AsyncLocalStorage();
707-
706+
708707
// Promise to resolve and recreate when there are no readers or writers
709708
// This is used to notify all waiting writers so at least one can proceed.
710709
// It's also used to notify all waiting readers so they can can check
@@ -789,7 +788,7 @@ class Lock {
789788
this.#notifyZeroReadersAndWriters();
790789
}
791790

792-
/**
791+
/**
793792
* Acquire a write (exclusive) lock while executing
794793
* the given task.
795794
* @param {function} task The task to execute.
@@ -807,8 +806,8 @@ class Lock {
807806
await this.#runAsyncStack(1, withWriteLock);
808807
}
809808

810-
811-
/**
809+
810+
/**
812811
* Acquire a read (shared) lock while executing
813812
* the given task.
814813
* @param {function} task The task to execute.

lib/kafkajs/_consumer.js

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ class Consumer {
336336
else
337337
this.#addPendingOperation(() => this.#unassign(userAssignment));
338338
};
339-
339+
340340
try {
341341
err = LibrdKafkaError.create(err);
342342
const userSpecifiedRebalanceCb = this.#userConfig['rebalance_cb'];
@@ -549,13 +549,22 @@ class Consumer {
549549
/* Creates an rdkafka config based off the kafkaJS block. Switches to compatibility mode if the block exists. */
550550
let compatibleConfig = this.#kafkaJSToConsumerConfig(this.#userConfig.kafkaJS);
551551

552-
/* Set the logger's level in case we're not in compatibility mode - just set it to DEBUG, the broadest
553-
* log level, as librdkafka will control the granularity. */
554-
if (!compatibleConfig || Object.keys(compatibleConfig).length === 0) {
555-
this.#logger.setLogLevel(logLevel.DEBUG);
552+
/* There can be multiple different and conflicting config directives for setting the log level:
553+
* 1. If there's a kafkaJS block:
554+
* a. If there's a logLevel directive in the kafkaJS block, set the logger level accordingly.
555+
* b. If there's no logLevel directive, set the logger level to INFO.
556+
* (both these are already handled in the conversion method above).
557+
* 2. If there is a log_level or debug directive in the main config, set the logger level accordingly.
558+
* !This overrides any different value provided in the kafkaJS block!
559+
* a. If there's a log_level directive, set the logger level accordingly.
560+
* b. If there's a debug directive, set the logger level to DEBUG regardless of anything else. This is because
561+
* librdkafka ignores log_level if debug is set, and our behaviour should be identical.
562+
* 3. There's nothing at all. Take no action in this case, let the logger use its default log level.
563+
*/
564+
if (Object.hasOwn(this.#userConfig, 'log_level')) {
565+
this.#logger.setLogLevel(severityToLogLevel[this.#userConfig.log_level]);
556566
}
557567

558-
/* Even if we are in compability mode, setting a 'debug' in the main config must override the logger's level. */
559568
if (Object.hasOwn(this.#userConfig, 'debug')) {
560569
this.#logger.setLogLevel(logLevel.DEBUG);
561570
}
@@ -568,8 +577,8 @@ class Consumer {
568577
/* Certain properties that the user has set are overridden. We use trampolines to accommodate the user's callbacks.
569578
* TODO: add trampoline method for offset commit callback. */
570579
rdKafkaConfig['offset_commit_cb'] = true;
571-
rdKafkaConfig['rebalance_cb'] = (err, assignment) => this.#rebalanceCallback(err, assignment).catch(e =>
572-
{
580+
rdKafkaConfig['rebalance_cb'] = (err, assignment) => this.#rebalanceCallback(err, assignment).catch(e =>
581+
{
573582
if (this.#logger)
574583
this.#logger.error(`Error from rebalance callback: ${e.stack}`);
575584
});
@@ -1307,7 +1316,7 @@ class Consumer {
13071316
}
13081317

13091318
async #checkMaxPollIntervalNotExceeded(now) {
1310-
const maxPollExpiration = this.#lastFetchClockNs +
1319+
const maxPollExpiration = this.#lastFetchClockNs +
13111320
BigInt((this.#cacheExpirationTimeoutMs + this.#maxPollIntervalMs)
13121321
* 1e6);
13131322

@@ -1317,15 +1326,15 @@ class Consumer {
13171326
await Timer.withTimeout(interval,
13181327
this.#maxPollIntervalRestart);
13191328
now = hrtime.bigint();
1320-
1329+
13211330
if (now > (maxPollExpiration - 1000000n)) {
13221331
this.#markBatchPayloadsStale(this.assignment());
13231332
}
13241333
}
13251334

13261335
/**
13271336
* Clears the cache and resets the positions when
1328-
* the internal client hasn't been polled for more than
1337+
* the internal client hasn't been polled for more than
13291338
* max poll interval since the last fetch.
13301339
* After that it waits until barrier is reached or
13311340
* max poll interval is reached. In the latter case it
@@ -1334,11 +1343,11 @@ class Consumer {
13341343
async #cacheExpirationLoop() {
13351344
while (!this.#workerTerminationScheduled.resolved) {
13361345
let now = hrtime.bigint();
1337-
const cacheExpiration = this.#lastFetchClockNs +
1346+
const cacheExpiration = this.#lastFetchClockNs +
13381347
BigInt(this.#cacheExpirationTimeoutMs * 1e6);
13391348

13401349
if (now > cacheExpiration) {
1341-
this.#addPendingOperation(() =>
1350+
this.#addPendingOperation(() =>
13421351
this.#clearCacheAndResetPositions());
13431352
await this.#checkMaxPollIntervalNotExceeded(now);
13441353
break;
@@ -1558,7 +1567,7 @@ class Consumer {
15581567

15591568
// Uncomment to test an additional delay in seek
15601569
// await Timer.withTimeout(1000);
1561-
1570+
15621571
const seekedPartitions = [];
15631572
const pendingSeeks = new Map();
15641573
const assignmentSet = new Set();
@@ -1608,7 +1617,7 @@ class Consumer {
16081617
/* Offsets are committed on seek only when in compatibility mode. */
16091618
if (offsetsToCommit.length !== 0 && this.#internalConfig['enable.auto.commit']) {
16101619
await this.#commitOffsetsUntilNoStateErr(offsetsToCommit);
1611-
}
1620+
}
16121621
}
16131622

16141623
#markBatchPayloadsStale(topicPartitions) {
@@ -1828,7 +1837,7 @@ class Consumer {
18281837
}
18291838
flattenedToppars.map(partitionKey).
18301839
forEach(key => this.#pausedPartitions.delete(key));
1831-
1840+
18321841
this.#addPendingOperation(() =>
18331842
this.#resumeInternal(flattenedToppars));
18341843
}

lib/kafkajs/_consumer_cache.js

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class PerPartitionMessageCache {
6161

6262
/**
6363
* MessageCache defines a dynamically sized cache for messages.
64-
* Internally, it uses PerPartitionMessageCache to store messages for each partition.
64+
* Internally, it uses PerPartitionMessageCache to store messages for each partition.
6565
*/
6666
class MessageCache {
6767
#size;
@@ -90,7 +90,7 @@ class MessageCache {
9090

9191
/**
9292
* Assign a new partition to the consumer, if available.
93-
*
93+
*
9494
* @returns {PerPartitionMessageCache} - the partition assigned to the consumer, or null if none available.
9595
*/
9696
#assignNewPartition() {
@@ -105,7 +105,7 @@ class MessageCache {
105105

106106
/**
107107
* Remove an empty partition from the cache.
108-
*
108+
*
109109
* @param {PerPartitionMessageCache} ppc The partition to remove from the cache.
110110
*/
111111
#removeEmptyPartition(ppc) {
@@ -118,7 +118,7 @@ class MessageCache {
118118
/**
119119
* Add a single message to a PPC.
120120
* In case the PPC does not exist, it is created.
121-
*
121+
*
122122
* @param {Object} message - the message to add to the cache.
123123
*/
124124
#add(message) {
@@ -172,7 +172,7 @@ class MessageCache {
172172
/**
173173
* Adds many messages into the cache, partitioning them as per their toppar.
174174
* Increases cache size by the number of messages added.
175-
*
175+
*
176176
* @param {Array} messages - the messages to add to the cache.
177177
*/
178178
addMessages(messages) {
@@ -183,9 +183,9 @@ class MessageCache {
183183

184184
/**
185185
* Allows returning the PPC without asking for another message.
186-
*
186+
*
187187
* @param {PerPartitionMessageCache} ppc - the partition to return.
188-
*
188+
*
189189
* @note this is a no-op if the PPC is not assigned.
190190
*/
191191
return(ppc) {
@@ -203,7 +203,7 @@ class MessageCache {
203203
*
204204
* If the current PPC is exhausted, it moves to the next PPC.
205205
* If all PPCs are exhausted, it returns null.
206-
*
206+
*
207207
* @param {PerPartitionMessageCache} ppc - after a consumer has consumed a message, it must return the PPC back to us via this parameter.
208208
* otherwise, no messages from that topic partition will be consumed.
209209
* @returns {Array} - the next message in the cache, or null if none exists, and the corresponding PPC.
@@ -245,7 +245,7 @@ class MessageCache {
245245
if (!nextN.length)
246246
return this.nextN(null, size);
247247

248-
this.#size -= nextN.length;
248+
this.#size -= nextN.length;
249249
return [nextN, ppc];
250250
}
251251

lib/kafkajs/_linked-list.js

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ class LinkedList {
6262
/**
6363
* Removes given node from the list,
6464
* if it is not already removed.
65-
*
66-
* @param {LinkedListNode} node
65+
*
66+
* @param {LinkedListNode} node
6767
*/
6868
remove(node) {
6969
if (node._removed) {
@@ -89,7 +89,7 @@ class LinkedList {
8989
/**
9090
* Removes the first node from the list and returns it,
9191
* or null if the list is empty.
92-
*
92+
*
9393
* @returns {any} The value of the first node in the list or null.
9494
*/
9595
removeFirst() {
@@ -105,7 +105,7 @@ class LinkedList {
105105
/**
106106
* Removes the last node from the list and returns its value,
107107
* or null if the list is empty.
108-
*
108+
*
109109
* @returns {any} The value of the last node in the list or null.
110110
*/
111111
removeLast() {
@@ -120,31 +120,31 @@ class LinkedList {
120120

121121
/**
122122
* Add a new node to the beginning of the list and returns it.
123-
*
124-
* @param {any} value
123+
*
124+
* @param {any} value
125125
* @returns {LinkedListNode} The new node.
126126
*/
127-
addFirst(value) {
127+
addFirst(value) {
128128
const node = new LinkedListNode(value);
129129
return this.#insertInBetween(node, null,
130130
this._head);
131131
}
132132

133133
/**
134134
* Add a new node to the end of the list and returns it.
135-
*
135+
*
136136
* @param {any} value Node value.
137137
* @returns {LinkedListNode} The new node.
138138
*/
139-
addLast(value) {
139+
addLast(value) {
140140
const node = new LinkedListNode(value);
141141
return this.#insertInBetween(node, this._tail, null);
142142
}
143143

144144
/**
145145
* Add a new node before the given node and returns it.
146146
* Given node must not be removed.
147-
*
147+
*
148148
* @param {LinkedListNode} node Reference node.
149149
* @param {any} value New node value.
150150
* @returns {LinkedListNode} The new node.
@@ -159,7 +159,7 @@ class LinkedList {
159159
/**
160160
* Add a new node after the given node and returns it.
161161
* Given node must not be removed.
162-
*
162+
*
163163
* @param {LinkedListNode} node Reference node.
164164
* @param {any} value New node value.
165165
* @returns {LinkedListNode} The new node.
@@ -173,7 +173,7 @@ class LinkedList {
173173

174174
/**
175175
* Concatenates the given list to the end of this list.
176-
*
176+
*
177177
* @param {LinkedList} list List to concatenate.
178178
*/
179179
concat(list) {

0 commit comments

Comments
 (0)