Skip to content

Commit 6a3f5ce

Browse files
committed
Exhaustive config compability
1 parent 9f5c734 commit 6a3f5ce

File tree

7 files changed

+384
-53
lines changed

7 files changed

+384
-53
lines changed

MIGRATION.md

Lines changed: 125 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,70 @@
44

55
### Common
66

7-
* Configuration changes
7+
#### Configuration changes
8+
```javascript
9+
const kafka = new Kafka({/* common configuration changes */});
10+
```
11+
There are several changes in the common configuration. Each config property is discussed.
12+
If there needs to be any change, the property is highlighted.
813

9-
* Error Handling: Some possible subtypes of `KafkaJSError` have been removed,
14+
* An `rdKafka` block can be added to the config. It allows directly setting librdkafka properties.
15+
If you are starting to make the configuration anew, it is best to specify properties using
16+
the `rdKafka` block. [Complete list of properties here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
17+
18+
Example:
19+
```javascript
20+
const kafka = new Kafka({
21+
rdKafka: {
22+
globalConfig: { /* properties mentioned within the 'global config' section of the list */ }
23+
topicConfig: { /* properties mentioned within the 'topic config' section of the list */ }
24+
},
25+
/* ... */
26+
});
27+
```
28+
* **`brokers`** list of strings, representing the bootstrap brokers.
29+
a function is no longer allowed as an argument for this.
30+
* **`ssl`**: boolean, set true if ssl needs to be enabled.
31+
In case additional properties, like CA, Certificate, Key etc. need to be added, use the `rdKafka` block.
32+
* **`sasl`**: omit if the brokers need no authentication, otherwise, an object of the following forms:
33+
- For SASL PLAIN or SASL SCRAM : `{ mechanism: 'plain'|'scram-sha-256'|'scram-sha-512', username: string, password: string }`
34+
- For SASL OAUTHBEARER: not supported yet.
35+
- For AWS IAM or custom mechanisms: not supported with no planned support.
36+
- For GSSAPI/Kerberos: use the `rdKafka` configuration.
37+
* `clientId`: string for identifying this client.
38+
* **`connectionTimeout`** and **`authenticationTimeout`**:
39+
These timeouts (specified in milliseconds) are not enforced individually. Instead, the sum of these values is
40+
enforced. The default value of the sum is 30000. It corresponds to librdkafka's `socket.connection.setup.timeout.ms`.
41+
* **`reauthenticationThreshold`**: no longer checked, librdkafka handles reauthentication on its own.
42+
* **`requestTimeout`**: number of milliseconds for a network request to timeout. The default value has been changed to 60000. It now corresponds to librdkafka's `socket.timeout.ms`.
43+
* **`enforceRequestTimeout`**: if this is set to false, `requestTimeout` is set to 5 minutes. The timeout cannot be disabled completely.
44+
* **`retry`** is partially supported. It must be an object, with the following (optional) properties
45+
- `maxRetryTime`: maximum time to backoff a retry, in milliseconds. Corresponds to librdkafka's `retry.backoff.max.ms`. The default is 1000.
46+
- `initialRetryTime`: minimum time to backoff a retry, in milliseconds. Corresponds to librdkafka's `retry.backoff.ms`. The default is 100.
47+
- `retries`: maximum number of retries, *only* applicable to Produce messages. However, it's recommended to keep this unset.
48+
Librdkafka handles the number of retries, and rather than capping the number of retries, caps the total time spent
49+
while sending the message, controlled by `message.timeout.ms`.
50+
- `factor` and `multiplier` cannot be changed from their defaults of 0.2 and 2.
51+
* **`restartOnFailure`**: this cannot be changed, and will always be true (the consumer recovers from errors on its own).
52+
* `logLevel` is mapped to the syslog(3) levels supported by librdkafka. `LOG_NOTHING` is not YET supported, as some panic situations are still logged.
53+
* **`socketFactory`** is no longer supported.
54+
55+
#### Error Handling
56+
57+
Some possible subtypes of `KafkaJSError` have been removed,
1058
and additional information has been added into `KafkaJSError`.
11-
Internally, fields have been added denoting if the error is fatal, retriable, or abortable (the latter two only relevant for a
12-
transactional producer).
13-
Some error-specific fields have also been removed. An exhaustive list is at the bottom of this section.
59+
Fields have been added denoting if the error is fatal, retriable, or abortable (the latter two only relevant for a transactional producer).
60+
Some error-specific fields have also been removed.
61+
62+
An exhaustive list of changes is at the bottom of this section.
1463

15-
For compability, as many error types as possible have been retained, but it is
64+
For compatibility, as many error types as possible have been retained, but it is
1665
better to switch to checking the `error.code`.
1766

1867
**Action**: Convert any checks based on `instanceof` and `error.name` or to error
1968
checks based on `error.code` or `error.type`.
2069

21-
**Example:**:
70+
**Example:**
2271
```javascript
2372
try {
2473
await producer.send(/* args */);
@@ -61,6 +110,35 @@
61110

62111
### Producer
63112

113+
#### Configuration changes
114+
115+
```javascript
116+
const producer = kafka.producer({ /* producer-specific configuration changes. */});
117+
```
118+
119+
There are several changes in the common configuration. Each config property is discussed.
120+
If there needs to be any change, the property is highlighted.
121+
122+
* **`createPartitioner`**: this is not supported (YET). For behaviour identical to the Java client (the DefaultPartitioner),
123+
use the `rdKafka` block, and set the property `partitioner` to `murmur2_random`. This is critical
124+
when planning to produce to topics where messages with certain keys have been produced already.
125+
* **`retry`**: See the section for retry above. The producer config `retry` takes precedence over the common config `retry`.
126+
* `metadataMaxAge`: Time in milliseconds after which to refresh metadata for known topics. The default value remains 5min. This
127+
corresponds to the librdkafka property `topic.metadata.refresh.interval.ms` (and not `metadata.max.age.ms`).
128+
* `allowAutoTopicCreation`: determines if a topic should be created if it doesn't exist while producing. True by default.
129+
* `transactionTimeout`: The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction
130+
status update from the producer before proactively aborting the ongoing transaction. The default value remains 60000.
131+
Only applicable when `transactionalId` is set to true.
132+
* `idempotent`: if set to true, ensures that messages are delivered exactly once and in order. False by default.
133+
In case this is set to true, certain constraints must be respected for other properties, `maxInFlightRequests <= 5`, `retry.retries >= 0`.
134+
* **`maxInFlightRequests`**: Maximum number of in-flight requests *per broker connection*. If not set, a very high limit is used.
135+
* `transactionalId`: if set, turns this into a transactional producer with this identifier. This also automatically sets `idempotent` to true.
136+
* An `rdKafka` block can be added to the config. It allows directly setting librdkafka properties.
137+
If you are starting to make the configuration anew, it is best to specify properties using
138+
the `rdKafka` block. [Complete list of properties here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
139+
140+
#### Semantic and Per-Method Changes
141+
64142
* `sendBatch` is not supported (YET). However, the actual batching semantics are handled by librdkafka.
65143
* Changes to `send`:
66144
* `acks`, `compression` and `timeout` are not set on a per-send basis. Rather, they must be configured in the configuration.
@@ -103,6 +181,44 @@
103181

104182
### Consumer
105183

184+
#### Configuration changes
185+
186+
```javascript
187+
const consumer = kafka.consumer({ /* producer-specific configuration changes. */});
188+
```
189+
There are several changes in the common configuration. Each config property is discussed.
190+
If there needs to be any change, the property is highlighted. The change could be a change in
191+
the default values, some added/missing features, or a change in semantics.
192+
193+
* **`partitionAssigners`**: The **default value** of this is changed to `[PartitionAssigners.range,PartitionAssigners.roundRobin]`. Support for range, roundRobin and cooperativeSticky
194+
partition assignors is provided. The cooperative assignor cannot be used along with the other two, and there
195+
is no support for custom assignors. An alias for these properties is also made available, `partitionAssignors` and `PartitionAssignors` to maintain
196+
parlance with the Java client's terminology.
197+
* **`sessionTimeout`**: If no heartbeats are received by the broker for a group member within the session timeout, the broker will remove the consumer from
198+
the group and trigger a rebalance. The **default value** is changed to 45000.
199+
* **`rebalanceTimeout`**: The maximum allowed time for each member to join the group once a rebalance has begun. The **default value** is changed to 300000.
200+
Note, before changing: setting this value *also* changes the max poll interval. Message processing in `eachMessage` must not take more than this time.
201+
* `heartbeatInterval`: The expected time in milliseconds between heartbeats to the consumer coordinator. The default value remains 3000.
202+
* `metadataMaxAge`: Time in milliseconds after which to refresh metadata for known topics. The default value remains 5min. This
203+
corresponds to the librdkafka property `topic.metadata.refresh.interval.ms` (and not `metadata.max.age.ms`).
204+
* **`allowAutoTopicCreation`**: determines if a topic should be created if it doesn't exist while producing. The **default value** is changed to false.
205+
* **`maxBytesPerPartition`**: determines how many bytes can be fetched in one request from a single partition. The default value remains 1048576.
206+
There is a slight change in semantics, this size grows dynamically if a single message larger than this is encountered,
207+
and the client does not get stuck.
208+
* `minBytes`: Minimum number of bytes the broker responds with (or wait until `maxWaitTimeInMs`). The default remains 1.
209+
* **`maxBytes`**: Maximum number of bytes the broker responds with. The **default value** is changed to 52428800 (50MB).
210+
* **`maxWaitTimeInMs`**: Maximum time in milliseconds the broker waits for the `minBytes` to be fulfilled. The **default value** is changed to 500.
211+
* **`retry`**: See the section for retry above. The consumer config `retry` takes precedence over the common config `retry`.
212+
* `readUncommitted`: if true, consumer will read transactional messages which have not been committed. The default value remains false.
213+
* **`maxInFlightRequests`**: Maximum number of in-flight requests *per broker connection*. If not set, a very high limit is used.
214+
* `rackId`: Can be set to an arbitrary string which will be used for fetch-from-follower if set up on the cluster.
215+
* An `rdKafka` block can be added to the config. It allows directly setting librdkafka properties.
216+
If you are starting to make the configuration anew, it is best to specify properties using
217+
the `rdKafka` block. [Complete list of properties here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md).
218+
219+
#### Semantic and Per-Method Changes
220+
221+
106222
* While passing a list of topics to `subscribe`, the `fromBeginning` property is not supported. Instead, the property `auto.offset.reset` needs to be used.
107223
Before:
108224
```javascript
@@ -129,7 +245,7 @@
129245
await consumer.subscribe({ topics: ["topic"] });
130246
```
131247

132-
* For auto-commiting using a consumer, the properties on `run` are no longer used. Instead, corresponding rdKafka properties must be set.
248+
* For auto-committing using a consumer, the properties on `run` are no longer used. Instead, corresponding rdKafka properties must be set.
133249
* `autoCommit` corresponds to `enable.auto.commit`.
134250
* `autoCommitInterval` corresponds to `auto.commit.interval.ms`.
135251
* `autoCommitThreshold` is no longer supported.
@@ -170,9 +286,8 @@
170286
* The `heartbeat()` no longer needs to be called. Heartbeats are automatically managed by librdkafka.
171287
* The `partitionsConsumedConcurrently` property is not supported (YET).
172288
* The `eachBatch` method is not supported.
173-
* `commitOffsets` does not (YET) support sending metadata for topic partitions being commited.
289+
* `commitOffsets` does not (YET) support sending metadata for topic partitions being committed.
174290
* `paused()` is not (YET) supported.
175291
* Custom partition assignors are not supported.
176292

177-
178293
## node-rdkafka

lib/kafkajs/_common.js

Lines changed: 128 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,151 @@
11
const error = require("./_error");
2+
const process = require("process");
3+
4+
const logLevel = Object.freeze({
5+
NOTHING: 0,
6+
ERROR: 1,
7+
WARN: 2,
8+
INFO: 3,
9+
DEBUG: 4,
10+
});
211

312
/**
4-
* @function kafkaJSToRdKafkaConfig()
13+
* Converts the common configuration from KafkaJS to a format that can be used by node-rdkafka.
514
* @param {object} config
615
* @returns {{globalConfig: import("../../types/config").ConsumerGlobalConfig|import("../../types/config").ProducerTopicConfig, topicConfig: import("../../types/config").ConsumerTopicConfig|import("../../types/config").ProducerTopicConfig}}
16+
* @throws {error.KafkaJSError} if the configuration is invalid.
17+
* The error code will be ERR__INVALID_ARG in case of invalid arguments or features that are not supported.
18+
* The error code will be ERR__NOT_IMPLEMENTED in case of features that are not yet implemented.
719
*/
820
async function kafkaJSToRdKafkaConfig(config) {
9-
const globalConfig = {
10-
"allow.auto.create.topics": "false",
11-
};
21+
const globalConfig = {};
1222
const topicConfig = {};
23+
24+
if (!Array.isArray(config["brokers"])) {
25+
throw new error.KafkaJSError("brokers must be an list of strings", {
26+
code: error.ErrorCodes.ERR__INVALID_ARG,
27+
});
28+
}
1329
globalConfig["bootstrap.servers"] = config["brokers"].join(",");
1430

31+
if (Object.hasOwn(config, "clientId")) {
32+
globalConfig["client.id"] = config.clientId;
33+
}
34+
1535
let withSASL = false;
1636

17-
if (config.sasl) {
37+
if (Object.hasOwn(config, "sasl")) {
1838
const sasl = config.sasl;
19-
if (
20-
sasl.mechanism === "plain" &&
21-
typeof sasl.username === "string" &&
22-
typeof sasl.password === "string"
23-
) {
24-
globalConfig["sasl.mechanism"] = "PLAIN";
25-
globalConfig["sasl.username"] = sasl.username;
26-
globalConfig["sasl.password"] = sasl.password;
27-
withSASL = true;
39+
const mechanism = sasl.mechanism.toUpperCase();
40+
41+
if (mechanism === 'OAUTHBEARER') {
42+
throw new error.KafkaJSError("OAUTHBEARER is not supported", {
43+
code: error.ErrorCodes.ERR__NOT_IMPLEMENTED,
44+
});
45+
}
46+
47+
/* The mechanism must be PLAIN or SCRAM. */
48+
49+
if (typeof sasl.username !== "string" || typeof sasl.password !== "string") {
50+
throw new error.KafkaJSError("username and password must be present and be strings", {
51+
code: error.ErrorCodes.ERR__INVALID_ARG,
52+
});
2853
}
54+
55+
globalConfig["sasl.mechanism"] = mechanism;
56+
globalConfig["sasl.username"] = sasl.username;
57+
globalConfig["sasl.password"] = sasl.password;
58+
withSASL = true;
2959
}
3060

31-
if (config.ssl === true && withSASL) {
61+
if (Object.hasOwn(config, "ssl") && withSASL) {
3262
globalConfig["security.protocol"] = "sasl_ssl";
3363
} else if (withSASL) {
3464
globalConfig["security.protocol"] = "sasl_plaintext";
3565
}
3666

67+
if (Object.hasOwn(config, "requestTimeout")) {
68+
globalConfig["socket.timeout.ms"] = config.requestTimeout;
69+
}
70+
71+
if (Object.hasOwn(config, "enforceRequestTimeout")) {
72+
globalConfig["socket.timeout.ms"] = 300000;
73+
}
74+
75+
const connectionTimeout = config.connectionTimeout ?? 0;
76+
const authenticationTimeout = config.authenticationTimeout ?? 0;
77+
let totalConnectionTimeout = Number(connectionTimeout) + Number(authenticationTimeout);
78+
79+
/* The minimum value for socket.connection.setup.timeout.ms is 1000. */
80+
if (totalConnectionTimeout) {
81+
totalConnectionTimeout = Math.max(totalConnectionTimeout, 1000);
82+
globalConfig["socket.connection.setup.timeout.ms"] = totalConnectionTimeout;
83+
}
84+
85+
if (Object.hasOwn(config, "retry")) {
86+
const { maxRetryTime, initialRetryTime, factor, multiplier, retries } = config.retry;
87+
88+
if (maxRetryTime) {
89+
globalConfig["retry.backoff.max.ms"] = maxRetryTime;
90+
}
91+
92+
if (initialRetryTime) {
93+
globalConfig["retry.backoff.ms"] = initialRetryTime;
94+
}
95+
96+
if (retries) {
97+
globalConfig["retries"] = retries;
98+
}
99+
100+
if (factor || multiplier) {
101+
throw new error.KafkaJSError("retry.factor and retry.multiplier are not supported", {
102+
code: error.ErrorCodes.ERR__INVALID_ARG,
103+
});
104+
}
105+
}
106+
107+
if (Object.hasOwn(config, "restartOnFailure") && !config.restartOnFailure) {
108+
throw new error.KafkaJSError("restartOnFailure cannot be false, it must be true or unset", {
109+
code: error.ErrorCodes.ERR__INVALID_ARG,
110+
});
111+
}
112+
113+
if (Object.hasOwn(config, "socketFactory")) {
114+
throw new error.KafkaJSError("socketFactory is not supported", {
115+
code: error.ErrorCodes.ERR__INVALID_ARG,
116+
});
117+
}
118+
119+
if (Object.hasOwn(config, "logLevel")) {
120+
let setLevel = config.logLevel;
121+
122+
if (process.env.KAFKAJS_LOG_LEVEL) {
123+
setLevel = logLevel[process.env.KAFKAJS_LOG_LEVEL.toUpperCase()];
124+
}
125+
126+
switch (setLevel) {
127+
case logLevel.NOTHING:
128+
globalConfig["log_level"] = 0; /* LOG_EMERG - we don't have a true log nothing yet */
129+
break;
130+
case logLevel.ERROR:
131+
globalConfig["log_level"] = 3 /* LOG_ERR */;
132+
break;
133+
case logLevel.WARN:
134+
globalConfig["log_level"] = 4 /* LOG_WARNING */;
135+
break;
136+
case logLevel.INFO:
137+
globalConfig["log_level"] = 6 /* LOG_INFO */;
138+
break;
139+
case logLevel.DEBUG:
140+
globalConfig["debug"] = "all" /* this will set librdkafka log_level to 7 */;
141+
break;
142+
default:
143+
throw new error.KafkaJSError("Invalid logLevel", {
144+
code: error.ErrorCodes.ERR__INVALID_ARG,
145+
});
146+
}
147+
}
148+
37149
if (config.rdKafka) {
38150
if (config.rdKafka.constructor === Function) {
39151
await config.rdKafka(globalConfig, topicConfig);
@@ -136,4 +248,5 @@ module.exports = {
136248
createKafkaJsErrorFromLibRdKafkaError,
137249
convertToRdKafkaHeaders,
138250
notImplemented,
251+
logLevel,
139252
};

0 commit comments

Comments
 (0)