Skip to content

Commit 9f5c734

Browse files
committed
Add consumer errors and clean up consumer code
1 parent 15755c1 commit 9f5c734

File tree

4 files changed

+356
-95
lines changed

4 files changed

+356
-95
lines changed

MIGRATION.md

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@
6161

6262
### Producer
6363

64-
* `sendBatch` is currently unsupported - but will be supported. TODO. However, the actual batching semantics are handled by librdkafka.
64+
* `sendBatch` is not supported (YET). However, the actual batching semantics are handled by librdkafka.
6565
* Changes to `send`:
66-
1. `acks`, `compression` and `timeout` are not set on a per-send basis. Rather, they must be configured in the configuration.
66+
* `acks`, `compression` and `timeout` are not set on a per-send basis. Rather, they must be configured in the configuration.
6767
Before:
6868
```javascript
6969
const kafka = new Kafka({/* ... */});
@@ -99,8 +99,80 @@
9999
});
100100
```
101101

102-
* Error-handling for a failed `send` is stricter. While sending multiple messages, even if one of the messages fails, the method throws an error.
102+
* Error-handling for a failed `send` is stricter. While sending multiple messages, even if one of the messages fails, the method throws an error.
103103

104104
### Consumer
105105

106+
* While passing a list of topics to `subscribe`, the `fromBeginning` property is not supported. Instead, the property `auto.offset.reset` needs to be used.
107+
Before:
108+
```javascript
109+
const kafka = new Kafka({ /* ... */ });
110+
const consumer = kafka.consumer({
111+
groupId: 'test-group',
112+
});
113+
await consumer.connect();
114+
await consumer.subscribe({ topics: ["topic"], fromBeginning: true});
115+
```
116+
117+
After:
118+
```javascript
119+
const kafka = new Kafka({ /* ... */ });
120+
const consumer = kafka.consumer({
121+
groupId: 'test-group',
122+
rdKafka: {
123+
topicConfig: {
124+
'auto.offset.reset': 'earliest',
125+
},
126+
}
127+
});
128+
await consumer.connect();
129+
await consumer.subscribe({ topics: ["topic"] });
130+
```
131+
132+
* For auto-commiting using a consumer, the properties on `run` are no longer used. Instead, corresponding rdKafka properties must be set.
133+
* `autoCommit` corresponds to `enable.auto.commit`.
134+
* `autoCommitInterval` corresponds to `auto.commit.interval.ms`.
135+
* `autoCommitThreshold` is no longer supported.
136+
137+
Before:
138+
```javascript
139+
const kafka = new Kafka({ /* ... */ });
140+
const consumer = kafka.consumer({ /* ... */ });
141+
await consumer.connect();
142+
await consumer.subscribe({ topics: ["topic"] });
143+
consumer.run({
144+
eachMessage: someFunc,
145+
autoCommit: true,
146+
autoCommitThreshold: 5000,
147+
});
148+
```
149+
150+
After:
151+
```javascript
152+
const kafka = new Kafka({ /* ... */ });
153+
const consumer = kafka.consumer({
154+
/* ... */,
155+
rdKafka: {
156+
globalConfig: {
157+
"enable.auto.commit": "true",
158+
"auto.commit.interval.ms": "5000",
159+
}
160+
},
161+
});
162+
await consumer.connect();
163+
await consumer.subscribe({ topics: ["topic"] });
164+
consumer.run({
165+
eachMessage: someFunc,
166+
});
167+
```
168+
169+
* For the `eachMessage` method while running the consumer:
170+
* The `heartbeat()` no longer needs to be called. Heartbeats are automatically managed by librdkafka.
171+
* The `partitionsConsumedConcurrently` property is not supported (YET).
172+
* The `eachBatch` method is not supported.
173+
* `commitOffsets` does not (YET) support sending metadata for topic partitions being commited.
174+
* `paused()` is not (YET) supported.
175+
* Custom partition assignors are not supported.
176+
177+
106178
## node-rdkafka

lib/kafkajs/_common.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,15 @@ function convertToRdKafkaHeaders(kafkaJSHeaders) {
125125
return headers;
126126
}
127127

128+
129+
function notImplemented(msg = 'Not implemented') {
130+
throw new error.KafkaJSError(msg, { code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
131+
}
132+
128133
module.exports = {
129134
kafkaJSToRdKafkaConfig,
130135
topicPartitionOffsetToRdKafka,
131136
createKafkaJsErrorFromLibRdKafkaError,
132137
convertToRdKafkaHeaders,
138+
notImplemented,
133139
};

0 commit comments

Comments
 (0)