Skip to content

Commit 1b20ce6

Browse files
committed
Add errors for promisified API
1 parent f29bc15 commit 1b20ce6

File tree

6 files changed

+308
-5
lines changed

6 files changed

+308
-5
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ You can add breakpoints and so on after that.
195195

196196
## Updating librdkafka version
197197

198-
The librdkafka should be periodically updated to the latest release in https://github.com/edenhill/librdkafka/releases
198+
The librdkafka should be periodically updated to the latest release in https://github.com/confluentinc/librdkafka/releases
199199

200200
Steps to update:
201201
1. Update the `librdkafka` property in [`package.json`](https://github.com/confluentinc/confluent-kafka-js/blob/master/package.json) to the desired version.

LICENSE.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
The MIT License (MIT)
22
Copyright (c) 2016-2023 Blizzard Entertainment
3+
2023 Confluent, Inc.
34

45
Permission is hereby granted, free of charge, to any person obtaining a copy of
56
this software and associated documentation files (the "Software"), to deal in

MIGRATION.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Migration Guide
2+
3+
## KafkaJS
4+
5+
### Common
6+
7+
1. Error Handling: Some possible subtypes of `KafkaJSError` have been removed,
8+
and additional information has been added into `KafkaJSError`.
9+
Internally, fields have been added denoting if the error is fatal, retriable, or abortable (the latter two only relevant for a
10+
transactional producer).
11+
Some error-specific fields have also been removed. An exhaustive list is at the bottom of this section.
12+
13+
For compability, as many error types as possible have been retained, but it is
14+
better to switch to checking the `error.code`.
15+
16+
**Action**: Convert any checks based on `instanceof` and `error.name` or to error
17+
checks based on `error.code` or `error.type`.
18+
19+
**Example:**:
20+
```js
21+
try {
22+
await producer.send(/* args */);
23+
} catch (error) {
24+
if (!Kafka.isKafkaJSError(error)) { /* unrelated err handling */ }
25+
else if (error.fatal) { /* fatal error, abandon producer */ }
26+
else if (error.code === Kafka.ErrorCode.ERR__QUEUE_FULL) { /*...*/ }
27+
else if (error.type === 'ERR_MSG_SIZE_TOO_LARGE') { /*...*/ }
28+
/* and so on for specific errors */
29+
}
30+
```
31+
32+
Exhaustive list of error types and error fields removed:
33+
1. `KafkaJSNonRetriableError`: retriable errors are automatically retried by librdkafka, so there's no need for this type.
34+
Note that `error.retriable` still exists, but it's applicable only for transactional producer,
35+
where users are expected to retry an action themselves.
36+
All error types using this as a superclass now use `KafkaJSError` as their superclass.
37+
2. `topic` and `partition` are removed from `KafkaJSOffsetOutOfRange`.
38+
3. `KafkaJSMemberIdRequired`: removed as automatically handled by librdkafka.
39+
4. `KafkaJSNumberOfRetriesExceeded`: removed as retries are handled by librdkafka.
40+
5. `broker, correlationId, createdAt, sentAt` and `pendingDuration` are removed from `KafkaJSNumberOfRetriesExceeded`.
41+
6. `KafkaJSMetadataNotLoaded`: removed as metadata is automatically reloaded by librdkafka.
42+
7. `KafkaJSTopicMetadataNotLoaded`: removed as topic metadata is automatically reloaded by librdkafka.
43+
8. `KafkaJSStaleTopicMetadataAssignment`: removed as it's automatically refreshed by librdkafka.
44+
9. `KafkaJSDeleteGroupsError`: removed, as the Admin Client doesn't have this yet. May be added back again, or changed.
45+
10. `KafkaJSServerDoesNotSupportApiKey`: removed, as this error isn't generally exposed to user in librdkafka. If raised,
46+
it is subsumed into `KafkaJSError` where `error.code === Kafka.ErrorCode.ERR_UNSUPPORTED_VERSION`.
47+
11. `KafkaJSBrokerNotFound`: removed, as this error isn't exposed directly to the user in librdkafka.
48+
12. `KafkaJSLockTimeout`: removed, as such an error is not applicable while using librdkafka.
49+
13. `KafkaJSUnsupportedMagicByteInMessageSet`: removed. It is subsumed into `KafkaJSError` where `error.code === Kafka.ErrorCode.ERR_UNSUPPORTED_VERSION`.
50+
14. `KafkaJSDeleteTopicRecordsError`: removed, as the Admin Client doesn't have this yet. May be added back again, or changed.
51+
15. `KafkaJSInvariantViolation`: removed, as it's not applicable to librdkafka. Errors in internal state are subsumed into `KafkaJSError` where `error.code === Kafka.ErrorCode.ERR__STATE`.
52+
16. `KafkaJSInvalidVarIntError`: removed, as it's not exposed to the user in librdkafka.
53+
17. `KafkaJSInvalidLongError`: removed, as it's not exposed to the user in librdkafka.
54+
18. `KafkaJSCreateTopicError`: removed, as the Admin Client doesn't have this yet. May be added back again, or changed.
55+
19. `KafkaJSAlterPartitionReassignmentsError`: removed, as the RPC is not used in librdkafka.
56+
20. `KafkaJSFetcherRebalanceError`: removed, it's not exposed to the user in librdkafka.
57+
21. `broker` is removed from `KafkaJSConnectionError`.
58+
22. `KafkaJSConnectionClosedError`: removed, and subsumed into `KafkaJSConnectionError` as librdkafka treats them equivalently.
59+
60+
### Producer
61+
62+
### Consumer
63+
64+
## node-rdkafka

lib/kafkajs/_common.js

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
const error = require("./_error");
2+
const LibrdKafkaError = require('../error');
3+
14
/**
25
* @function kafkaJSToRdKafkaConfig()
36
* @param {object} config
@@ -52,4 +55,51 @@ function topicPartitionOffsetToRdKafka(tpo) {
5255
};
5356
}
5457

55-
module.exports = { kafkaJSToRdKafkaConfig, topicPartitionOffsetToRdKafka };
58+
/**
59+
* Convert a librdkafka error from node-rdkafka into a KafkaJSError.
60+
* @param {LibrdKafkaError} librdKafkaError to convert from.
61+
* @returns KafkaJSError
62+
*/
63+
function createKafkaJsErrorFromLibRdKafkaError(librdKafkaError) {
64+
const properties = {
65+
retriable: librdKafkaError.retriable,
66+
fatal: librdKafkaError.fatal,
67+
abortable: librdKafkaError.abortable,
68+
stack: librdKafkaError.stack,
69+
code: librdKafkaError.code,
70+
};
71+
72+
let err = null;
73+
74+
if (properties.code === error.ErrorCodes.ERR_OFFSET_OUT_OF_RANGE) {
75+
err = new error.KafkaJSOffsetOutOfRange(e, properties);
76+
} else if (properties.code === error.ErrorCodes.ERR_REQUEST_TIMED_OUT) {
77+
err = new error.KafkaJSRequestTimeoutError(e, properties);
78+
} else if (properties.code === error.ErrorCodes.ERR__PARTIAL) {
79+
err = new error.KafkaJSPartialMessageError(e, properties);
80+
} else if (properties.code === error.ErrorCodes.ERR__AUTHENTICATION) {
81+
err = new error.KafkaJSSASLAuthenticationError(e, properties);
82+
} else if (properties.code === error.ErrorCodes.ERR_GROUP_COORDINATOR_NOT_AVAILABLE) {
83+
err = new error.KafkaJSGroupCoordinatorNotAvailableError(e, properties);
84+
} else if (properties.code === error.ErrorCodes.ERR__NOT_IMPLEMENTED) {
85+
err = new error.KafkaJSNotImplemented(e, properties);
86+
} else if (properties.code === error.ErrorCodes.ERR__TIMED_OUT) {
87+
err = new error.KafkaJSTimedOut(e, properties);
88+
} else if (properties.code === error.ErrorCodes.ERR__ALL_BROKERS_DOWN) {
89+
err = new error.KafkaJSNoBrokerAvailableError(e, properties);
90+
} else if (properties.code === error.ErrorCodes.ERR__TRANSPORT) {
91+
err = new error.KafkaJSConnectionError(e, properties);
92+
} else if (properties.code > 0) { /* Indicates a non-local error */
93+
err = new error.KafkaJSProtocolError(e, properties);
94+
} else {
95+
err = new error.KafkaJSError(e, properties);
96+
}
97+
98+
return err;
99+
}
100+
101+
module.exports = {
102+
kafkaJSToRdKafkaConfig,
103+
topicPartitionOffsetToRdKafka,
104+
createKafkaJsErrorFromLibRdKafkaError,
105+
};

lib/kafkajs/_error.js

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
const LibrdKafkaError = require('../error');
2+
3+
/**
4+
* @typedef {Object} KafkaJSError represents an error when using the promisified interface.
5+
*/
6+
class KafkaJSError extends Error {
7+
/**
8+
* @param {Error | string} error an Error or a string describing the error.
9+
* @param {object} properties a set of optional error properties.
10+
* @param {boolean} [properties.retriable=false] whether the error is retriable. Applies only to the transactional producer
11+
* @param {boolean} [properties.fatal=false] whether the error is fatal. Applies only to the transactional producer.
12+
* @param {boolean} [properties.abortable=false] whether the error is abortable. Applies only to the transactional producer.
13+
* @param {string} [properties.stack] the stack trace of the error.
14+
* @param {number} [properties.code=LibrdKafkaError.codes.ERR_UNKNOWN] the error code.
15+
*/
16+
constructor(e, { retriable = false, fatal = false, abortable = false, stack = null, code = LibrdKafkaError.codes.ERR_UNKNOWN } = {}) {
17+
super(e, {});
18+
this.name = 'KafkaJSError';
19+
this.message = e.message || e;
20+
this.retriable = retriable;
21+
this.fatal = fatal;
22+
this.abortable = abortable;
23+
this.code = code;
24+
25+
if (stack) {
26+
this.stack = stack;
27+
} else {
28+
Error.captureStackTrace(this, this.constructor);
29+
}
30+
31+
const errTypes = Object
32+
.keys(LibrdKafkaError.codes)
33+
.filter(k => LibrdKafkaError.codes[k] === kjsErr.code);
34+
35+
if (errTypes.length !== 1) {
36+
this.type = LibrdKafkaError.codes.ERR_UNKNOWN;
37+
} else {
38+
this.type = errTypes[0];
39+
}
40+
}
41+
}
42+
43+
/**
44+
* @typedef {Object} KafkaJSProtocolError represents an error that is caused when a Kafka Protocol RPC has an embedded error.
45+
*/
46+
class KafkaJSProtocolError extends KafkaJSError {
47+
constructor() {
48+
super(...arguments);
49+
this.name = 'KafkaJSProtocolError';
50+
}
51+
}
52+
53+
/**
54+
* @typedef {Object} KafkaJSOffsetOutOfRange represents the error raised when fetching from an offset out of range.
55+
*/
56+
class KafkaJSOffsetOutOfRange extends KafkaJSProtocolError {
57+
constructor() {
58+
super(...arguments);
59+
this.name = 'KafkaJSOffsetOutOfRange';
60+
}
61+
}
62+
63+
/**
64+
* @typedef {Object} KafkaJSConnectionError represents the error raised when a connection to a broker cannot be established or is broken unexpectedly.
65+
*/
66+
class KafkaJSConnectionError extends KafkaJSError {
67+
constructor() {
68+
super(...arguments);
69+
this.name = 'KafkaJSConnectionError';
70+
}
71+
}
72+
73+
/**
74+
* @typedef {Object} KafkaJSRequestTimeoutError represents the error raised on a timeout for one request.
75+
*/
76+
class KafkaJSRequestTimeoutError extends KafkaJSError {
77+
constructor() {
78+
super(...arguments);
79+
this.name = 'KafkaJSRequestTimeoutError';
80+
}
81+
}
82+
83+
/**
84+
* @typedef {Object} KafkaJSPartialMessageError represents the error raised when a response does not contain all expected information.
85+
*/
86+
class KafkaJSPartialMessageError extends KafkaJSError {
87+
constructor() {
88+
super(...arguments);
89+
this.name = 'KafkaJSPartialMessageError';
90+
}
91+
}
92+
93+
/**
94+
* @typedef {Object} KafkaJSSASLAuthenticationError represents an error raised when authentication fails.
95+
*/
96+
class KafkaJSSASLAuthenticationError extends KafkaJSError {
97+
constructor() {
98+
super(...arguments);
99+
this.name = 'KafkaJSSASLAuthenticationError';
100+
}
101+
}
102+
103+
/**
104+
* @typedef {Object} KafkaJSGroupCoordinatorNotFound represents an error raised when the group coordinator is not found.
105+
*/
106+
class KafkaJSGroupCoordinatorNotFound extends KafkaJSError {
107+
constructor() {
108+
super(...arguments);
109+
this.name = 'KafkaJSGroupCoordinatorNotFound';
110+
}
111+
}
112+
113+
/**
114+
* @typedef {Object} KafkaJSNotImplemented represents an error raised when a feature is not implemented for this particular client.
115+
*/
116+
class KafkaJSNotImplemented extends KafkaJSError {
117+
constructor() {
118+
super(...arguments);
119+
this.name = 'KafkaJSNotImplemented';
120+
}
121+
}
122+
123+
/**
124+
* @typedef {Object} KafkaJSTimeout represents an error raised when a timeout for an operation occurs (including retries).
125+
*/
126+
class KafkaJSTimeout extends KafkaJSError {
127+
constructor() {
128+
super(...arguments);
129+
this.name = 'KafkaJSTimeout';
130+
}
131+
}
132+
133+
/**
134+
* @typedef {Object} KafkaJSAggregateError represents an error raised when multiple errors occur at once.
135+
*/
136+
class KafkaJSAggregateError extends Error {
137+
constructor(message, errors) {
138+
super(message);
139+
this.errors = errors;
140+
this.name = 'KafkaJSAggregateError';
141+
}
142+
}
143+
144+
/**
145+
* @typedef {Object} KafkaJSNoBrokerAvailableError represents an error raised when no broker is available for the operation.
146+
*/
147+
class KafkaJSNoBrokerAvailableError extends KafkaJSError {
148+
constructor() {
149+
super(...arguments);
150+
this.name = 'KafkaJSNoBrokerAvailableError';
151+
}
152+
}
153+
154+
/**
155+
* @function isRebalancing
156+
* @param {KafkaJSError} e
157+
* @returns boolean representing whether the error is a rebalancing error.
158+
*/
159+
const isRebalancing = e =>
160+
e.type === 'REBALANCE_IN_PROGRESS' ||
161+
e.type === 'NOT_COORDINATOR_FOR_GROUP' ||
162+
e.type === 'ILLEGAL_GENERATION';
163+
164+
/**
165+
* @function isKafkaJSError
166+
* @param {any} e
167+
* @returns boolean representing whether the error is a KafkaJSError.
168+
*/
169+
const isKafkaJSError = e => e instanceof KafkaJSError;
170+
171+
module.exports = {
172+
KafkaJSError,
173+
KafkaJSPartialMessageError,
174+
KafkaJSProtocolError,
175+
KafkaJSConnectionError,
176+
KafkaJSRequestTimeoutError,
177+
KafkaJSSASLAuthenticationError,
178+
KafkaJSOffsetOutOfRange,
179+
KafkaJSGroupCoordinatorNotFound,
180+
KafkaJSNotImplemented,
181+
KafkaJSTimeout,
182+
KafkaJSAggregateError,
183+
KafkaJSNoBrokerAvailableError,
184+
isRebalancing,
185+
isKafkaJSError,
186+
ErrorCodes: LibrdKafkaError.codes,
187+
};

lib/kafkajs/_kafka.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
const { Producer } = require("./_producer");
2-
const { Consumer } = require("./_consumer");
1+
const { Producer } = require('./_producer');
2+
const { Consumer } = require('./_consumer');
3+
const error = require('./_error');
34

45
class Kafka {
56
#commonClientConfig = {};
@@ -32,4 +33,4 @@ class Kafka {
3233
}
3334
}
3435

35-
module.exports = { Kafka };
36+
module.exports = { Kafka, ...error };

0 commit comments

Comments
 (0)