Skip to content

Commit 15755c1

Browse files
committed
Add error types to producer, and fix some errors
1 parent 446dee1 commit 15755c1

File tree

6 files changed

+285
-88
lines changed

6 files changed

+285
-88
lines changed

MIGRATION.md

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@
44

55
### Common
66

7-
1. Error Handling: Some possible subtypes of `KafkaJSError` have been removed,
8-
and additional information has been added into `KafkaJSError`.
7+
* Configuration changes
8+
9+
* Error Handling: Some possible subtypes of `KafkaJSError` have been removed,
10+
and additional information has been added into `KafkaJSError`.
911
Internally, fields have been added denoting if the error is fatal, retriable, or abortable (the latter two only relevant for a
1012
transactional producer).
1113
Some error-specific fields have also been removed. An exhaustive list is at the bottom of this section.
@@ -17,7 +19,7 @@
1719
checks based on `error.code` or `error.type`.
1820

1921
**Example:**:
20-
```js
22+
```javascript
2123
try {
2224
await producer.send(/* args */);
2325
} catch (error) {
@@ -59,6 +61,46 @@
5961

6062
### Producer
6163

64+
* `sendBatch` is currently unsupported - but will be supported. TODO. However, the actual batching semantics are handled by librdkafka.
65+
* Changes to `send`:
66+
1. `acks`, `compression` and `timeout` are not set on a per-send basis. Rather, they must be configured in the configuration.
67+
Before:
68+
```javascript
69+
const kafka = new Kafka({/* ... */});
70+
const producer = kafka.producer();
71+
await producer.connect();
72+
73+
await producer.send({
74+
topic: 'test',
75+
messages: [ /* ... */ ],
76+
acks: 1,
77+
compression: CompressionTypes.GZIP,
78+
timeout: 30000,
79+
});
80+
```
81+
82+
After:
83+
```javascript
84+
const kafka = new Kafka({/* ... */});
85+
const producer = kafka.producer({
86+
rdKafka: {
87+
topicConfig: {
88+
"acks": "1",
89+
"compression.codec": "gzip",
90+
"message.timeout.ms": "30000",
91+
},
92+
}
93+
});
94+
await producer.connect();
95+
96+
await producer.send({
97+
topic: 'test',
98+
messages: [ /* ... */ ],
99+
});
100+
```
101+
102+
* Error-handling for a failed `send` is stricter. While sending multiple messages, even if one of the messages fails, the method throws an error.
103+
62104
### Consumer
63105

64-
## node-rdkafka
106+
## node-rdkafka

lib/kafkajs/_common.js

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
const error = require("./_error");
2-
const LibrdKafkaError = require('../error');
32

43
/**
54
* @function kafkaJSToRdKafkaConfig()
@@ -47,7 +46,13 @@ async function kafkaJSToRdKafkaConfig(config) {
4746
return { globalConfig, topicConfig };
4847
}
4948

49+
/**
50+
* Converts a topicPartitionOffset from KafkaJS to a format that can be used by node-rdkafka.
51+
* @param {import("../../types/kafkajs").TopicPartitionOffset} tpo
52+
* @returns {{topic: string, partition: number, offset: number}}
53+
*/
5054
function topicPartitionOffsetToRdKafka(tpo) {
55+
// TODO: do we need some checks for negative offsets and stuff? Or 'named' offsets?
5156
return {
5257
topic: tpo.topic,
5358
partition: tpo.partition,
@@ -57,8 +62,8 @@ function topicPartitionOffsetToRdKafka(tpo) {
5762

5863
/**
5964
* Convert a librdkafka error from node-rdkafka into a KafkaJSError.
60-
* @param {LibrdKafkaError} librdKafkaError to convert from.
61-
* @returns KafkaJSError
65+
* @param {import("../error")} librdKafkaError to convert from.
66+
* @returns {error.KafkaJSError} the converted error.
6267
*/
6368
function createKafkaJsErrorFromLibRdKafkaError(librdKafkaError) {
6469
const properties = {
@@ -72,34 +77,57 @@ function createKafkaJsErrorFromLibRdKafkaError(librdKafkaError) {
7277
let err = null;
7378

7479
if (properties.code === error.ErrorCodes.ERR_OFFSET_OUT_OF_RANGE) {
75-
err = new error.KafkaJSOffsetOutOfRange(e, properties);
80+
err = new error.KafkaJSOffsetOutOfRange(librdKafkaError, properties);
7681
} else if (properties.code === error.ErrorCodes.ERR_REQUEST_TIMED_OUT) {
77-
err = new error.KafkaJSRequestTimeoutError(e, properties);
82+
err = new error.KafkaJSRequestTimeoutError(librdKafkaError, properties);
7883
} else if (properties.code === error.ErrorCodes.ERR__PARTIAL) {
79-
err = new error.KafkaJSPartialMessageError(e, properties);
84+
err = new error.KafkaJSPartialMessageError(librdKafkaError, properties);
8085
} else if (properties.code === error.ErrorCodes.ERR__AUTHENTICATION) {
81-
err = new error.KafkaJSSASLAuthenticationError(e, properties);
86+
err = new error.KafkaJSSASLAuthenticationError(librdKafkaError, properties);
8287
} else if (properties.code === error.ErrorCodes.ERR_GROUP_COORDINATOR_NOT_AVAILABLE) {
83-
err = new error.KafkaJSGroupCoordinatorNotAvailableError(e, properties);
88+
err = new error.KafkaJSGroupCoordinatorNotAvailableError(librdKafkaError, properties);
8489
} else if (properties.code === error.ErrorCodes.ERR__NOT_IMPLEMENTED) {
85-
err = new error.KafkaJSNotImplemented(e, properties);
90+
err = new error.KafkaJSNotImplemented(librdKafkaError, properties);
8691
} else if (properties.code === error.ErrorCodes.ERR__TIMED_OUT) {
87-
err = new error.KafkaJSTimedOut(e, properties);
92+
err = new error.KafkaJSTimedOut(librdKafkaError, properties);
8893
} else if (properties.code === error.ErrorCodes.ERR__ALL_BROKERS_DOWN) {
89-
err = new error.KafkaJSNoBrokerAvailableError(e, properties);
94+
err = new error.KafkaJSNoBrokerAvailableError(librdKafkaError, properties);
9095
} else if (properties.code === error.ErrorCodes.ERR__TRANSPORT) {
91-
err = new error.KafkaJSConnectionError(e, properties);
96+
err = new error.KafkaJSConnectionError(librdKafkaError, properties);
9297
} else if (properties.code > 0) { /* Indicates a non-local error */
93-
err = new error.KafkaJSProtocolError(e, properties);
98+
err = new error.KafkaJSProtocolError(librdKafkaError, properties);
9499
} else {
95-
err = new error.KafkaJSError(e, properties);
100+
err = new error.KafkaJSError(librdKafkaError, properties);
96101
}
97102

103+
console.log("Converted err = " + JSON.stringify(err, null, 2) + " librdkafka erro = " + JSON.stringify(librdKafkaError, null, 2));
98104
return err;
99105
}
100106

107+
/**
108+
* Converts KafkaJS headers to a format that can be used by node-rdkafka.
109+
* @param {import("../../types/kafkajs").IHeaders|null} kafkaJSHeaders
110+
* @returns {import("../../").MessageHeader[]|null} the converted headers.
111+
*/
112+
function convertToRdKafkaHeaders(kafkaJSHeaders) {
113+
if (!kafkaJSHeaders) return null;
114+
115+
const headers = [];
116+
for (const [key, value] of Object.entries(kafkaJSHeaders)) {
117+
if (value.constructor === Array) {
118+
for (const v of value) {
119+
headers.push({ key, value: v });
120+
}
121+
} else {
122+
headers.push({ key, value });
123+
}
124+
}
125+
return headers;
126+
}
127+
101128
module.exports = {
102129
kafkaJSToRdKafkaConfig,
103130
topicPartitionOffsetToRdKafka,
104131
createKafkaJsErrorFromLibRdKafkaError,
132+
convertToRdKafkaHeaders,
105133
};

lib/kafkajs/_error.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class KafkaJSError extends Error {
3030

3131
const errTypes = Object
3232
.keys(LibrdKafkaError.codes)
33-
.filter(k => LibrdKafkaError.codes[k] === kjsErr.code);
33+
.filter(k => LibrdKafkaError.codes[k] === this.code);
3434

3535
if (errTypes.length !== 1) {
3636
this.type = LibrdKafkaError.codes.ERR_UNKNOWN;
@@ -184,4 +184,4 @@ module.exports = {
184184
isRebalancing,
185185
isKafkaJSError,
186186
ErrorCodes: LibrdKafkaError.codes,
187-
};
187+
};

lib/kafkajs/_kafka.js

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,29 @@ const error = require('./_error');
55
class Kafka {
66
#commonClientConfig = {};
77

8+
/**
9+
*
10+
* @param {import("../../types/kafkajs").KafkaConfig} config
11+
*/
812
constructor(config) {
913
this.#commonClientConfig = config;
1014
}
1115

16+
/**
17+
* Merge the producer/consumer specific configuration with the common configuration.
18+
* @param {import("../../types/kafkajs").ProducerConfig|import("../../types/kafkajs").ConsumerConfig} config
19+
* @returns
20+
*/
1221
#mergeConfiguration(config) {
1322
let baseConfig = Object.assign({}, this.#commonClientConfig);
1423
config = Object.assign({}, config);
1524

25+
// TODO: there's some confusion around this, as we currently allow
26+
// rdKafka to be a function, but here, we don't seem to treat it as such.
27+
// Correct this, so that only objects are allowed for `rdKafka`.
1628
let rdKafka = baseConfig.rdKafka;
1729
Object.assign(baseConfig, config);
18-
if (rdKafka && config.rdKafka) {
30+
if (typeof rdKafka === 'object' && typeof config.rdKafka === 'object') {
1931
baseConfig.rdKafka = {
2032
...rdKafka,
2133
...config.rdKafka,
@@ -24,10 +36,20 @@ class Kafka {
2436
return baseConfig;
2537
}
2638

39+
/**
40+
* Creates a new producer.
41+
* @param {import("../../types/kafkajs").ProducerConfig} config
42+
* @returns {Producer}
43+
*/
2744
producer(config) {
2845
return new Producer(this.#mergeConfiguration(config));
2946
}
3047

48+
/**
49+
* Creates a new consumer.
50+
* @param {import("../../types/kafkajs").Consumer} config
51+
* @returns {Consumer}
52+
*/
3153
consumer(config) {
3254
return new Consumer(this.#mergeConfiguration(config));
3355
}

0 commit comments

Comments
 (0)