Skip to content

Commit 3aaf9e5

Browse files
committed
Merge branch 'dev_producer_testing_and_fixes' into dev_early_access_development_branch
2 parents 00a4d03 + 51a8bfa commit 3aaf9e5

File tree

6 files changed

+497
-144
lines changed

6 files changed

+497
-144
lines changed

MIGRATION.md

Lines changed: 40 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,18 @@
3737
* `clientId`: string for identifying this client.
3838
* **`connectionTimeout`** and **`authenticationTimeout`**:
3939
These timeouts (specified in milliseconds) are not enforced individually. Instead, the sum of these values is
40-
enforced. The default value of the sum is 30000. It corresponds to librdkafka's `socket.connection.setup.timeout.ms`.
40+
enforced. The default value of the sum is 11000, same as for KafkaJS.
41+
It corresponds to librdkafka's `socket.connection.setup.timeout.ms`.
4142
* **`reauthenticationThreshold`**: no longer checked, librdkafka handles reauthentication on its own.
4243
* **`requestTimeout`**: number of milliseconds for a network request to timeout. The default value has been changed to 60000. It now corresponds to librdkafka's `socket.timeout.ms`.
4344
* **`enforceRequestTimeout`**: if this is set to false, `requestTimeout` is set to 5 minutes. The timeout cannot be disabled completely.
4445
* **`retry`** is partially supported. It must be an object, with the following (optional) properties
45-
- `maxRetryTime`: maximum time to backoff a retry, in milliseconds. Corresponds to librdkafka's `retry.backoff.max.ms`. The default is 1000.
46-
- `initialRetryTime`: minimum time to backoff a retry, in milliseconds. Corresponds to librdkafka's `retry.backoff.ms`. The default is 100.
47-
- `retries`: maximum number of retries, *only* applicable to Produce messages. However, it's recommended to keep this unset.
48-
Librdkafka handles the number of retries, and rather than capping the number of retries, caps the total time spent
49-
while sending the message, controlled by `message.timeout.ms`.
50-
- `factor` and `multiplier` cannot be changed from their defaults of 0.2 and 2.
51-
* **`restartOnFailure`**: this cannot be changed, and will always be true (the consumer recovers from errors on its own).
52-
* `logLevel` is mapped to the syslog(3) levels supported by librdkafka. `LOG_NOTHING` is not YET supported, as some panic situations are still logged.
46+
- `maxRetryTime`: maximum time to backoff a retry, in milliseconds. Corresponds to librdkafka's `retry.backoff.max.ms`.
47+
- `initialRetryTime`: minimum time to backoff a retry, in milliseconds. Corresponds to librdkafka's `retry.backoff.ms`.
48+
- `retries`: maximum number of retries, *only* applicable to Produce messages.
49+
- **`factor`** and **`multiplier`** cannot be changed from their defaults of 0.2 and 2.
50+
- **`restartOnFailure`**: this cannot be changed, and will always be true (the consumer recovers from errors on its own).
51+
* `logLevel` is mapped automatically to the syslog(3) levels supported by librdkafka. `LOG_NOTHING` is not YET supported, as some panic situations are still logged.
5352
* **`socketFactory`** is no longer supported.
5453

5554
#### Error Handling
@@ -119,9 +118,9 @@
119118
There are several changes in the common configuration. Each config property is discussed.
120119
If there needs to be any change, the property is highlighted.
121120

122-
* **`createPartitioner`**: this is not supported (YET). For behaviour identical to the Java client (the DefaultPartitioner),
123-
use the `rdKafka` block, and set the property `partitioner` to `murmur2_random`. This is critical
124-
when planning to produce to topics where messages with certain keys have been produced already.
121+
* **`createPartitioner`**: this is not supported yet. The default behaviour is identical to the DefaultPartitioner, and compatible with Java client's default
122+
partitioner.
123+
This corresponds to the librdkafka property `partitioner` and the value `murmur2_random`.
125124
* **`retry`**: See the section for retry above. The producer config `retry` takes precedence over the common config `retry`.
126125
* `metadataMaxAge`: Time in milliseconds after which to refresh metadata for known topics. The default value remains 5min. This
127126
corresponds to the librdkafka property `topic.metadata.refresh.interval.ms` (and not `metadata.max.age.ms`).
@@ -131,7 +130,7 @@
131130
Only applicable when `transactionalId` is set to true.
132131
* `idempotent`: if set to true, ensures that messages are delivered exactly once and in order. False by default.
133132
In case this is set to true, certain constraints must be respected for other properties, `maxInFlightRequests <= 5`, `retry.retries >= 0`.
134-
* **`maxInFlightRequests`**: Maximum number of in-flight requests *per broker connection*. If not set, a very high limit is used.
133+
* **`maxInFlightRequests`**: Maximum number of in-flight requests *per broker connection*. If not set, it is practically unbounded (same as KafkaJS).
135134
* `transactionalId`: if set, turns this into a transactional producer with this identifier. This also automatically sets `idempotent` to true.
136135
* An `rdKafka` block can be added to the config. It allows directly setting librdkafka properties.
137136
If you are starting to make the configuration anew, it is best to specify properties using
@@ -140,7 +139,8 @@
140139
#### Semantic and Per-Method Changes
141140

142141
* Changes to `send`:
143-
* `acks`, `compression` and `timeout` are not set on a per-send basis. Rather, they must be configured in the configuration.
142+
* `acks`, `compression` and `timeout` are not set on a per-send basis. Rather, they must be configured in the top-level configuration.
143+
Additionally, there are several more compression types available by default besides GZIP.
144144
Before:
145145
```javascript
146146
const kafka = new Kafka({/* ... */});
@@ -160,13 +160,9 @@
160160
```javascript
161161
const kafka = new Kafka({/* ... */});
162162
const producer = kafka.producer({
163-
rdKafka: {
164-
topicConfig: {
165-
"acks": "1",
166-
"compression.codec": "gzip",
167-
"message.timeout.ms": "30000",
168-
},
169-
}
163+
acks: 1,
164+
compression: CompressionTypes.GZIP|CompressionTypes.SNAPPY|CompressionTypes.LZ4|CompressionTypes.ZSTD|CompressionTypes.NONE,
165+
timeout: 30000,
170166
});
171167
await producer.connect();
172168
@@ -191,27 +187,28 @@
191187
If there needs to be any change, the property is highlighted. The change could be a change in
192188
the default values, some added/missing features, or a change in semantics.
193189

194-
* **`partitionAssigners`**: The **default value** of this is changed to `[PartitionAssigners.range,PartitionAssigners.roundRobin]`. Support for range, roundRobin and cooperativeSticky
195-
partition assignors is provided. The cooperative assignor cannot be used along with the other two, and there
196-
is no support for custom assignors. An alias for these properties is also made available, `partitionAssignors` and `PartitionAssignors` to maintain
197-
parlance with the Java client's terminology.
198-
* **`sessionTimeout`**: If no heartbeats are received by the broker for a group member within the session timeout, the broker will remove the consumer from
199-
the group and trigger a rebalance. The **default value** is changed to 45000.
190+
* `partitionAssigners`: Support for range and roundRobin assignors is provided. Custom assignors are not supported.
191+
The default value of this remains `[PartitionAssigners.roundRobin]`.
192+
Support for cooperative-sticky assignor will be added soon.
193+
An alias for these properties is also made available, `partitionAssignors` and `PartitionAssignors` to maintain
194+
parlance with the Java client's terminology.
195+
* `sessionTimeout`: If no heartbeats are received by the broker for a group member within the session timeout, the broker will remove the consumer from
196+
the group and trigger a rebalance.
200197
* **`rebalanceTimeout`**: The maximum allowed time for each member to join the group once a rebalance has begun. The **default value** is changed to 300000.
201198
Note, before changing: setting this value *also* changes the max poll interval. Message processing in `eachMessage` must not take more than this time.
202199
* `heartbeatInterval`: The expected time in milliseconds between heartbeats to the consumer coordinator. The default value remains 3000.
203200
* `metadataMaxAge`: Time in milliseconds after which to refresh metadata for known topics. The default value remains 5min. This
204201
corresponds to the librdkafka property `topic.metadata.refresh.interval.ms` (and not `metadata.max.age.ms`).
205-
* **`allowAutoTopicCreation`**: determines if a topic should be created if it doesn't exist while producing. The **default value** is changed to false.
202+
* `allowAutoTopicCreation`: determines if a topic should be created if it doesn't exist while producing.
206203
* **`maxBytesPerPartition`**: determines how many bytes can be fetched in one request from a single partition. The default value remains 1048576.
207-
There is a slight change in semantics, this size grows dynamically if a single message larger than this is encountered,
204+
There is a change in semantics, this size grows dynamically if a single message larger than this is encountered,
208205
and the client does not get stuck.
209206
* `minBytes`: Minimum number of bytes the broker responds with (or wait until `maxWaitTimeInMs`). The default remains 1.
210-
* **`maxBytes`**: Maximum number of bytes the broker responds with. The **default value** is changed to 52428800 (50MB).
211-
* **`maxWaitTimeInMs`**: Maximum time in milliseconds the broker waits for the `minBytes` to be fulfilled. The **default value** is changed to 500.
207+
* `maxBytes`: Maximum number of bytes the broker responds with.
208+
* `maxWaitTimeInMs`: Maximum time in milliseconds the broker waits for the `minBytes` to be fulfilled.
212209
* **`retry`**: See the section for retry above. The consumer config `retry` takes precedence over the common config `retry`.
213210
* `readUncommitted`: if true, consumer will read transactional messages which have not been committed. The default value remains false.
214-
* **`maxInFlightRequests`**: Maximum number of in-flight requests *per broker connection*. If not set, a very high limit is used.
211+
* **`maxInFlightRequests`**: Maximum number of in-flight requests *per broker connection*. If not set, it is practically unbounded (same as KafkaJS).
215212
* `rackId`: Can be set to an arbitrary string which will be used for fetch-from-follower if set up on the cluster.
216213
* An `rdKafka` block can be added to the config. It allows directly setting librdkafka properties.
217214
If you are starting to make the configuration anew, it is best to specify properties using
@@ -225,10 +222,9 @@
225222
* Subscribe must be called after `connect`.
226223
* An optional parameter, `replace` is provided. If set to true, the current subscription is replaced with the new one. If set to false, the new subscription is added to the current one.
227224
The default value is false.
228-
* While passing a list of topics to `subscribe`, the `fromBeginning` property is not supported. Instead, the property `auto.offset.reset` needs to be used.
225+
* While passing a list of topics to `subscribe`, the `fromBeginning` is not set on a per-subscribe basis. Rather, it must be configured in the top-level configuration.
229226
Before:
230227
```javascript
231-
const kafka = new Kafka({ /* ... */ });
232228
const consumer = kafka.consumer({
233229
groupId: 'test-group',
234230
});
@@ -237,23 +233,17 @@
237233
```
238234
After:
239235
```javascript
240-
const kafka = new Kafka({ /* ... */ });
241236
const consumer = kafka.consumer({
242237
groupId: 'test-group',
243-
rdKafka: {
244-
topicConfig: {
245-
'auto.offset.reset': 'earliest',
246-
},
247-
}
238+
fromBeginning: true,
248239
});
249240
await consumer.connect();
250241
await consumer.subscribe({ topics: ["topic"] });
251242
```
252243

253-
* For auto-committing using a consumer, the properties on `run` are no longer used. Instead, corresponding rdKafka properties must be set.
254-
* `autoCommit` corresponds to `enable.auto.commit`.
255-
* `autoCommitInterval` corresponds to `auto.commit.interval.ms`.
256-
* `autoCommitThreshold` is no longer supported.
244+
* For auto-committing using a consumer, the properties `autoCommit` and `autoCommitInterval` on `run` are not set on a per-subscribe basis.
245+
Rather, they must be configured in the top-level configuration.
246+
`autoCommitThreshold` is not supported.
257247

258248
Before:
259249
```javascript
@@ -273,12 +263,8 @@
273263
const kafka = new Kafka({ /* ... */ });
274264
const consumer = kafka.consumer({
275265
/* ... */,
276-
rdKafka: {
277-
globalConfig: {
278-
"enable.auto.commit": "true",
279-
"auto.commit.interval.ms": "5000",
280-
}
281-
},
266+
autoCommit: true,
267+
autoCommitThreshold: 5000,
282268
});
283269
await consumer.connect();
284270
await consumer.subscribe({ topics: ["topic"] });
@@ -287,18 +273,17 @@
287273
});
288274
```
289275

276+
* The `partitionsConsumedConcurrently` property is not supported at the moment.
277+
* The `eachBatch` method is not supported.
290278
* For the `eachMessage` method while running the consumer:
291279
* The `heartbeat()` no longer needs to be called. Heartbeats are automatically managed by librdkafka.
292-
* The `partitionsConsumedConcurrently` property is not supported (YET).
293-
* The `eachBatch` method is not supported.
294-
* `commitOffsets` does not (YET) support sending metadata for topic partitions being committed.
280+
* `commitOffsets` does not yet support sending metadata for topic partitions being committed.
295281
* `paused()` is supported without any changes.
296-
* Custom partition assignors are not supported.
297282
* Changes to `seek`:
298283
* The restriction to call seek only after `run` is removed. It can be called any time.
299-
* Rather than the `autoCommit` property of `run` deciding if the offset is committed, the librdkafka property `enable.auto.commit` of the consumer config is used.
300284
* `pause` and `resume` MUST be called after the consumer group is joined. In practice, this means it can be called whenever `consumer.assignment()` has a non-zero size, or within the `eachMessage`
301285
callback.
286+
* `stop` is not yet supported, and the user must disconnect the consumer.
302287

303288
### Admin Client
304289

0 commit comments

Comments
 (0)