Skip to content

Commit e5f216d

Browse files
committed
Add functional changes for kafkaJs block
1 parent a5cbdfd commit e5f216d

File tree

5 files changed

+282
-205
lines changed

5 files changed

+282
-205
lines changed

lib/kafkajs/_common.js

Lines changed: 38 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -97,11 +97,11 @@ function createReplacementErrorMessage(cOrP, fnCall, property, propertyVal, repl
9797
}
9898
return `'${property}' is not supported as a property to '${fnCall}', but must be passed to the ${cOrP} during creation.\n` +
9999
`Before: \n` +
100-
`\tconst ${cOrP} = kafka.${cOrP}({ ... });\n` +
100+
`\tconst ${cOrP} = kafka.${cOrP}({ kafkaJs: { ... }, });\n` +
101101
`\tawait ${cOrP}.connect();\n` +
102102
`\t${cOrP}.${fnCall}({ ${propertyVal}, ... });\n` +
103103
`After: \n` +
104-
`\tconst ${cOrP} = kafka.${cOrP}({ ${replacementVal}, ... });\n` +
104+
`\tconst ${cOrP} = kafka.${cOrP}({ kafkaJs: { ${replacementVal}, ... }, });\n` +
105105
`\tawait ${cOrP}.connect();\n` +
106106
`\t${cOrP}.${fnCall}({ ... });\n` +
107107
(isLK ? `For more details on what can be used inside the rdKafka block, see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md\n` : '');
@@ -179,24 +179,28 @@ const CompatibilityErrorMessages = Object.freeze({
179179
/**
180180
* Converts the common configuration from KafkaJS to a format that can be used by node-rdkafka.
181181
* @param {object} config
182-
* @returns {{globalConfig: import("../../types/config").ConsumerGlobalConfig|import("../../types/config").ProducerTopicConfig, topicConfig: import("../../types/config").ConsumerTopicConfig|import("../../types/config").ProducerTopicConfig}}
182+
* @returns {import('../../types/config').ProducerGlobalConfig | import('../../types/config').ConsumerGlobalConfig} the converted configuration
183183
* @throws {error.KafkaJSError} if the configuration is invalid.
184184
* The error code will be ERR__INVALID_ARG in case of invalid arguments or features that are not supported.
185185
* The error code will be ERR__NOT_IMPLEMENTED in case of features that are not yet implemented.
186186
*/
187-
async function kafkaJSToRdKafkaConfig(config) {
188-
const globalConfig = {};
189-
const topicConfig = {};
190-
191-
if (!Array.isArray(config["brokers"])) {
192-
throw new error.KafkaJSError(CompatibilityErrorMessages.brokerString(), {
193-
code: error.ErrorCodes.ERR__INVALID_ARG,
194-
});
187+
function kafkaJSToRdKafkaConfig(config) {
188+
/* Since the kafkaJs block is specified, we operate in
189+
* kafkaJs compatibility mode. That means we change the defaults
190+
* match the kafkaJs defaults. */
191+
const rdkafkaConfig = {};
192+
193+
if (Object.hasOwn(config, "brokers")) {
194+
if (!Array.isArray(config["brokers"])) {
195+
throw new error.KafkaJSError(CompatibilityErrorMessages.brokerString(), {
196+
code: error.ErrorCodes.ERR__INVALID_ARG,
197+
});
198+
}
199+
rdkafkaConfig["bootstrap.servers"] = config["brokers"].join(",");
195200
}
196-
globalConfig["bootstrap.servers"] = config["brokers"].join(",");
197201

198202
if (Object.hasOwn(config, "clientId")) {
199-
globalConfig["client.id"] = config.clientId;
203+
rdkafkaConfig["client.id"] = config.clientId;
200204
}
201205

202206
let withSASL = false;
@@ -224,18 +228,18 @@ async function kafkaJSToRdKafkaConfig(config) {
224228
});
225229
}
226230

227-
globalConfig["sasl.mechanism"] = mechanism;
228-
globalConfig["sasl.username"] = sasl.username;
229-
globalConfig["sasl.password"] = sasl.password;
231+
rdkafkaConfig["sasl.mechanism"] = mechanism;
232+
rdkafkaConfig["sasl.username"] = sasl.username;
233+
rdkafkaConfig["sasl.password"] = sasl.password;
230234
withSASL = true;
231235
}
232236

233237
if (Object.hasOwn(config, "ssl") && config.ssl && withSASL) {
234-
globalConfig["security.protocol"] = "sasl_ssl";
238+
rdkafkaConfig["security.protocol"] = "sasl_ssl";
235239
} else if (withSASL) {
236-
globalConfig["security.protocol"] = "sasl_plaintext";
240+
rdkafkaConfig["security.protocol"] = "sasl_plaintext";
237241
} else if (Object.hasOwn(config, "ssl") && config.ssl) {
238-
globalConfig["security.protocol"] = "ssl";
242+
rdkafkaConfig["security.protocol"] = "ssl";
239243
}
240244

241245
/* TODO: add best-effort support for ssl besides just true/false */
@@ -246,14 +250,14 @@ async function kafkaJSToRdKafkaConfig(config) {
246250
}
247251

248252
if (Object.hasOwn(config, "requestTimeout")) {
249-
globalConfig["socket.timeout.ms"] = config.requestTimeout;
253+
rdkafkaConfig["socket.timeout.ms"] = config.requestTimeout;
250254
} else {
251255
/* KafkaJS default */
252-
globalConfig["socket.timeout.ms"] = 30000;
256+
rdkafkaConfig["socket.timeout.ms"] = 30000;
253257
}
254258

255259
if (Object.hasOwn(config, "enforceRequestTimeout") && !config.enforceRequestTimeout) {
256-
globalConfig["socket.timeout.ms"] = 300000;
260+
rdkafkaConfig["socket.timeout.ms"] = 300000;
257261
}
258262

259263
const connectionTimeout = config.connectionTimeout ?? 1000;
@@ -262,14 +266,14 @@ async function kafkaJSToRdKafkaConfig(config) {
262266

263267
/* The minimum value for socket.connection.setup.timeout.ms is 1000. */
264268
totalConnectionTimeout = Math.max(totalConnectionTimeout, 1000);
265-
globalConfig["socket.connection.setup.timeout.ms"] = totalConnectionTimeout;
269+
rdkafkaConfig["socket.connection.setup.timeout.ms"] = totalConnectionTimeout;
266270

267271
const retry = config.retry ?? {};
268272
const { maxRetryTime, initialRetryTime, factor, multiplier, retries, restartOnFailure } = retry;
269273

270-
globalConfig["retry.backoff.max.ms"] = maxRetryTime ?? 30000;
271-
globalConfig["retry.backoff.ms"] = initialRetryTime ?? 300;
272-
globalConfig["retries"] = retries ?? 5;
274+
rdkafkaConfig["retry.backoff.max.ms"] = maxRetryTime ?? 30000;
275+
rdkafkaConfig["retry.backoff.ms"] = initialRetryTime ?? 300;
276+
rdkafkaConfig["retries"] = retries ?? 5;
273277

274278
if ((typeof factor === 'number') || (typeof multiplier === 'number')) {
275279
throw new error.KafkaJSError(CompatibilityErrorMessages.retryFactorMultiplier(), {
@@ -295,6 +299,7 @@ async function kafkaJSToRdKafkaConfig(config) {
295299
});
296300
}
297301

302+
rdkafkaConfig["log_level"] = 6 /* LOG_INFO - default in KafkaJS compatibility mode. */;
298303
if (Object.hasOwn(config, "logLevel")) {
299304
let setLevel = config.logLevel;
300305

@@ -303,19 +308,20 @@ async function kafkaJSToRdKafkaConfig(config) {
303308
}
304309
switch (setLevel) {
305310
case logLevel.NOTHING:
306-
globalConfig["log_level"] = 0; /* LOG_EMERG - we don't have a true log nothing yet */
311+
rdkafkaConfig["log_level"] = 0; /* LOG_EMERG - we don't have a true log nothing yet */
307312
break;
308313
case logLevel.ERROR:
309-
globalConfig["log_level"] = 3 /* LOG_ERR */;
314+
rdkafkaConfig["log_level"] = 3 /* LOG_ERR */;
310315
break;
311316
case logLevel.WARN:
312-
globalConfig["log_level"] = 4 /* LOG_WARNING */;
317+
rdkafkaConfig["log_level"] = 4 /* LOG_WARNING */;
313318
break;
314319
case logLevel.INFO:
315-
globalConfig["log_level"] = 6 /* LOG_INFO */;
320+
rdkafkaConfig["log_level"] = 6 /* LOG_INFO */;
316321
break;
317322
case logLevel.DEBUG:
318-
globalConfig["debug"] = "all" /* this will set librdkafka log_level to 7 */;
323+
rdkafkaConfig["debug"] = "all" /* Turn on debug logs for everything, otherwise this log level is not useful*/;
324+
rdkafkaConfig["log_level"] = 7 /* LOG_DEBUG */;
319325
break;
320326
default:
321327
throw new error.KafkaJSError(CompatibilityErrorMessages.logLevelName(setLevel), {
@@ -324,20 +330,7 @@ async function kafkaJSToRdKafkaConfig(config) {
324330
}
325331
}
326332

327-
if (config.rdKafka) {
328-
if (config.rdKafka.constructor === Function) {
329-
await config.rdKafka(globalConfig, topicConfig);
330-
} else {
331-
Object.assign(globalConfig, config.rdKafka.globalConfig);
332-
Object.assign(topicConfig, config.rdKafka.topicConfig);
333-
}
334-
}
335-
336-
337-
if (!Object.hasOwn(globalConfig, 'log_level'))
338-
globalConfig['log_level'] = Object.hasOwn(globalConfig, 'debug') ? 7 /* LOG_DEBUG */ : 6 /* LOG_INFO */;
339-
340-
return { globalConfig, topicConfig };
333+
return rdkafkaConfig;
341334
}
342335

343336
function checkAllowedKeys(allowedKeysSpecific, config) {
@@ -421,7 +414,6 @@ function createKafkaJsErrorFromLibRdKafkaError(librdKafkaError) {
421414
err = new error.KafkaJSError(librdKafkaError, properties);
422415
}
423416

424-
console.log("Converted err = " + JSON.stringify(err, null, 2) + " librdkafka erro = " + JSON.stringify(librdKafkaError, null, 2));
425417
return err;
426418
}
427419

0 commit comments

Comments
 (0)