Skip to content

Commit 17b06e2

Browse files
committed
Add logger support to configuration
1 parent 11aa5e4 commit 17b06e2

File tree

3 files changed

+141
-21
lines changed

3 files changed

+141
-21
lines changed

lib/kafkajs/_common.js

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,89 @@ const logLevel = Object.freeze({
99
DEBUG: 4,
1010
});
1111

12+
const severityToLogLevel = Object.freeze({
13+
0: logLevel.NOTHING,
14+
1: logLevel.ERROR,
15+
2: logLevel.ERROR,
16+
3: logLevel.ERROR,
17+
4: logLevel.WARN,
18+
5: logLevel.WARN,
19+
6: logLevel.INFO,
20+
7: logLevel.DEBUG,
21+
});
22+
23+
/**
24+
* Default logger implementation.
25+
* @type import("../../types/kafkajs").Logger
26+
*/
27+
class DefaultLogger {
28+
constructor() {
29+
this.logLevel = logLevel.INFO;
30+
}
31+
32+
setLogLevel(logLevel) {
33+
this.logLevel = logLevel;
34+
}
35+
36+
info(message, extra) {
37+
if (this.logLevel >= logLevel.INFO)
38+
console.info({ message, ...extra });
39+
}
40+
41+
error(message, extra) {
42+
if (this.logLevel >= logLevel.ERROR)
43+
console.error({ message, ...extra });
44+
}
45+
46+
warn(message, extra) {
47+
if (this.logLevel >= logLevel.WARN)
48+
console.warn({ message, ...extra });
49+
}
50+
51+
debug(message, extra) {
52+
if (this.logLevel >= logLevel.DEBUG)
53+
console.log({ message, ...extra });
54+
}
55+
56+
namespace() {
57+
return this;
58+
}
59+
}
60+
61+
/**
62+
* Trampoline for user defined logger, if any.
63+
* @param {{severity: number, fac: string, message: string}} msg
64+
*
65+
*/
66+
function loggerTrampoline(msg, logger) {
67+
if (!logger) {
68+
return;
69+
}
70+
71+
const level = severityToLogLevel[msg.severity];
72+
switch (level) {
73+
case logLevel.NOTHING:
74+
break;
75+
case logLevel.ERROR:
76+
logger.error(msg.message, { fac: msg.fac, timestamp: Date.now() });
77+
break;
78+
case logLevel.WARN:
79+
logger.warn(msg.message, { fac: msg.fac, timestamp: Date.now() });
80+
break;
81+
case logLevel.INFO:
82+
logger.info(msg.message, { fac: msg.fac, timestamp: Date.now() });
83+
break;
84+
case logLevel.DEBUG:
85+
logger.debug(msg.message, { fac: msg.fac, timestamp: Date.now() });
86+
break;
87+
default:
88+
throw new error.KafkaJSError("Invalid logLevel", {
89+
code: error.ErrorCodes.ERR__INVALID_ARG,
90+
});
91+
}
92+
93+
}
94+
1295
/**
1396
* Converts the common configuration from KafkaJS to a format that can be used by node-rdkafka.
1497
* @param {object} config
@@ -249,4 +332,6 @@ module.exports = {
249332
convertToRdKafkaHeaders,
250333
notImplemented,
251334
logLevel,
335+
loggerTrampoline,
336+
DefaultLogger,
252337
};

lib/kafkajs/_consumer.js

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ const {
55
kafkaJSToRdKafkaConfig,
66
topicPartitionOffsetToRdKafka,
77
createKafkaJsErrorFromLibRdKafkaError,
8-
notImplemented
8+
notImplemented,
9+
loggerTrampoline,
10+
DefaultLogger,
11+
logLevel,
912
} = require('./_common');
1013
const { Buffer } = require('buffer');
1114

@@ -79,6 +82,12 @@ class Consumer {
7982
*/
8083
#storedSubscriptions = [];
8184

85+
/**
86+
* A logger for the consumer.
87+
* @type {import("../../types/kafkajs").Logger}
88+
*/
89+
#logger = new DefaultLogger();
90+
8291
/**
8392
* @constructor
8493
* @param {import("../../types/kafkajs").ConsumerConfig} kJSConfig
@@ -109,16 +118,16 @@ class Consumer {
109118
case LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS:
110119
call = (this.#kJSConfig.rebalanceListener.onPartitionsAssigned ?
111120
this.#kJSConfig.rebalanceListener.onPartitionsAssigned(assignment) :
112-
Promise.resolve()).catch(console.error);
121+
Promise.resolve()).catch(this.#logger.error);
113122
break;
114123
case LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS:
115124
call = (this.#kJSConfig.rebalanceListener.onPartitionsRevoked ?
116125
this.#kJSConfig.rebalanceListener.onPartitionsRevoked(assignment) :
117-
Promise.resolve()).catch(console.error);
126+
Promise.resolve()).catch(this.#logger.error);
118127
break;
119128
default:
120129
call = Promise.reject(`Unexpected rebalanceListener error code ${err.code}`).catch((e) => {
121-
console.error(e);
130+
this.#logger.error(e);
122131
});
123132
break;
124133
}
@@ -166,6 +175,12 @@ class Consumer {
166175
/* This sets the common configuration options for the client. */
167176
const { globalConfig, topicConfig } = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
168177

178+
/* Set the logger */
179+
if (Object.hasOwn(this.#kJSConfig, 'logger')) {
180+
this.#logger = this.#kJSConfig.logger;
181+
}
182+
this.#logger.setLogLevel(this.#kJSConfig.logLevel || logLevel.INFO);
183+
169184
/* Consumer specific configuration */
170185

171186
if (Object.hasOwn(this.#kJSConfig, 'groupId')) {
@@ -264,11 +279,7 @@ class Consumer {
264279
if (this.#state === ConsumerState.CONNECTING) {
265280
this.#connectPromiseFunc['reject'](err);
266281
} else {
267-
/* TODO: we should log the error returned here, depending on the log level.
268-
* Right now, we're just using console.err, but we should allow for a custom
269-
* logger, or at least make a function in _common.js that handles consumer
270-
* and producer. */
271-
console.error(err);
282+
this.#logger.error(err);
272283
}
273284
}
274285

@@ -377,7 +388,7 @@ class Consumer {
377388
this.#internalClient = new RdKafka.KafkaConsumer(globalConfig, topicConfig);
378389
this.#internalClient.on('ready', this.#readyCb.bind(this));
379390
this.#internalClient.on('event.error', this.#errorCb.bind(this));
380-
this.#internalClient.on('event.log', console.log);
391+
this.#internalClient.on('event.log', (msg) => loggerTrampoline(msg, this.#logger));
381392

382393
return new Promise((resolve, reject) => {
383394
this.#connectPromiseFunc = { resolve, reject };
@@ -636,7 +647,7 @@ class Consumer {
636647

637648
/* It's assumed that topicPartition is already assigned, and thus can be seeked to and committed to.
638649
* Errors are logged to detect bugs in the internal code. */
639-
this.#internalClient.seek(topicPartitionOffset, 0, err => err ? console.error(err) : null);
650+
this.#internalClient.seek(topicPartitionOffset, 0, err => err ? this.#logger.error(err) : null);
640651
offsetsToCommit.push({
641652
topic: topicPartition.topic,
642653
partition: topicPartition.partition,
@@ -800,8 +811,11 @@ class Consumer {
800811
notImplemented();
801812
}
802813

814+
/**
815+
* @returns {import("../../types/kafkajs").Logger} the logger associated to this consumer.
816+
*/
803817
logger() {
804-
notImplemented();
818+
return this.#logger;
805819
}
806820

807821
get events() {

lib/kafkajs/_producer.js

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
const RdKafka = require('../rdkafka');
2-
const { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka, createKafkaJsErrorFromLibRdKafkaError, convertToRdKafkaHeaders } = require('./_common');
2+
const { kafkaJSToRdKafkaConfig,
3+
topicPartitionOffsetToRdKafka,
4+
createKafkaJsErrorFromLibRdKafkaError,
5+
convertToRdKafkaHeaders,
6+
DefaultLogger,
7+
loggerTrampoline,
8+
logLevel, } = require('./_common');
39
const { Consumer } = require('./_consumer');
410
const error = require('./_error');
511
const { Buffer } = require('buffer');
@@ -51,6 +57,12 @@ class Producer {
5157
*/
5258
#ongoingTransaction = false;
5359

60+
/**
61+
* A logger for the producer.
62+
* @type {import("../../types/kafkajs").Logger}
63+
*/
64+
#logger = new DefaultLogger();
65+
5466
/**
5567
* @constructor
5668
* @param {import("../../types/kafkajs").ProducerConfig} kJSConfig
@@ -69,6 +81,12 @@ class Producer {
6981
/* This sets the common configuration options for the client. */
7082
const { globalConfig, topicConfig } = await kafkaJSToRdKafkaConfig(this.#kJSConfig);
7183

84+
/* Set the logger */
85+
if (Object.hasOwn(this.#kJSConfig, 'logger')) {
86+
this.#logger = this.#kJSConfig.logger;
87+
}
88+
this.#logger.setLogLevel(this.#kJSConfig.logLevel || logLevel.INFO);
89+
7290
/* Create producer specific configuration. */
7391
globalConfig.dr_cb = 'true';
7492

@@ -169,7 +187,7 @@ class Producer {
169187
throw new error.KafkaJSError(`Ready callback called in invalid state ${this.#state}`, { code: error.ErrorCodes.ERR__STATE });
170188
}
171189

172-
const {globalConfig} = await this.#config();
190+
const { globalConfig } = await this.#config();
173191
if (Object.hasOwn(globalConfig, 'transactional.id') && this.#state !== ProducerState.INITIALIZED_TRANSACTIONS) {
174192
this.#state = ProducerState.INITIALIZING_TRANSACTIONS;
175193
this.#internalClient.initTransactions(5000 /* default: 5s */, this.#readyTransactions.bind(this));
@@ -201,11 +219,7 @@ class Producer {
201219
if (this.#state === ProducerState.CONNECTING) {
202220
this.#connectPromiseFunc["reject"](err);
203221
} else {
204-
/* TODO: we should log the error returned here, depending on the log level.
205-
* Right now, we're just using console.err, but we should allow for a custom
206-
* logger, or at least make a function in _common.js that handles consumer
207-
* and producer. */
208-
console.error(err);
222+
this.#logger.error(err);
209223
}
210224
}
211225

@@ -224,7 +238,7 @@ class Producer {
224238
this.#internalClient = new RdKafka.Producer(globalConfig, topicConfig);
225239
this.#internalClient.on('ready', this.#readyCb.bind(this));
226240
this.#internalClient.on('event.error', this.#errorCb.bind(this));
227-
this.#internalClient.on('event.log', console.log);
241+
this.#internalClient.on('event.log', (msg) => loggerTrampoline(msg, this.#logger));
228242

229243
return new Promise((resolve, reject) => {
230244
this.#connectPromiseFunc = { resolve, reject };
@@ -452,7 +466,7 @@ class Producer {
452466
const opaque = { resolve, reject };
453467
try {
454468
this.#internalClient.produce(sendOptions.topic, msg.partition, msg.value, msg.key, msg.timestamp, opaque, msg.headers);
455-
} catch(err) {
469+
} catch (err) {
456470
reject(err);
457471
}
458472
}));
@@ -528,6 +542,13 @@ class Producer {
528542
const records = await Promise.all(sentPromises);
529543
return records.flat();
530544
}
545+
546+
/**
547+
* @returns {import("../../types/kafkajs").Logger} the logger associated to this producer.
548+
*/
549+
logger() {
550+
return this.#logger;
551+
}
531552
}
532553

533554
module.exports = { Producer }

0 commit comments

Comments
 (0)