Skip to content

Commit 3cd0526

Browse files
authored
Add proper errors to promisified admin api (#159)
* Add topic creation specific errors and test * Add group deletion specific errors and change test * Add topic record deletion specific errors and change test * Include properties in error * Add CHANGELOG.md entry * Update test because there seems to be a change in error message depending on broker version
1 parent 2471567 commit 3cd0526

File tree

7 files changed

+266
-24
lines changed

7 files changed

+266
-24
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ v1.0.0 is a feature release. It is supported for all usage.
66

77
1. Add support for an Admin API to fetch topic offsets (#156).
88
2. Add support for Node v23 pre-built binaries (#158).
9+
3. Add KafkaJS-compatible errors to promisified Admin API (createTopics, deleteGroups, deleteTopicRecords) (#159).
910

1011
## Fixes
1112

MIGRATION.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ The admin-client only has support for a limited subset of methods, with more to
331331
* The `describeGroups` method is supported with additional `timeout` and `includeAuthorizedOperations` options.
332332
A number of additional properties have been added to the returned groups.
333333
* The `deleteGroups` method is supported with an additional `timeout` option.
334-
* The `fetchOffsets` method is supported with additional `timeout` and
334+
* The `fetchOffsets` method is supported with additional `timeout` and
335335
`requireStableOffsets` option but `resolveOffsets` option is not yet supported.
336336
* The `deleteTopicRecords` method is supported with additional `timeout`
337337
and `operationTimeout` option.
@@ -374,8 +374,11 @@ An example is made available [here](./examples/kafkajs/sr.js).
374374
For compatibility, as many error types as possible have been retained, but it is
375375
better to switch to checking the `error.code`.
376376
377+
Note that `KafkaJSAggregateError` remains as before. Check the `.errors` array
378+
for the individual errors when checking the error code.
377379
378380
Exhaustive list of error types and error fields removed:
381+
379382
| Error | Change |
380383
|-------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
381384
| `KafkaJSNonRetriableError` | Removed. Retriable errors are automatically retried by librdkafka, so there's no need for this type. Note that `error.retriable` still exists, but it's applicable only for transactional producer, where users are expected to retry an action themselves. All error types using this as a superclass now use `KafkaJSError` as their superclass. |
@@ -386,16 +389,13 @@ An example is made available [here](./examples/kafkajs/sr.js).
386389
| `KafkaJSMetadataNotLoaded` | Removed. Metadata is automatically reloaded by librdkafka. |
387390
| `KafkaJSTopicMetadataNotLoaded` | Removed. Topic metadata is automatically reloaded by librdkafka. |
388391
| `KafkaJSStaleTopicMetadataAssignment` | removed as it's automatically refreshed by librdkafka. |
389-
| `KafkaJSDeleteGroupsError` | Removed, as the Admin Client doesn't have this yet. May be added back again, or changed. |
390392
| `KafkaJSServerDoesNotSupportApiKey` | Removed, as this error isn't generally exposed to user in librdkafka. If raised, it is subsumed into `KafkaJSError` where `error.code === Kafka.ErrorCode.ERR_UNSUPPORTED_VERSION`. |
391393
| `KafkaJSBrokerNotFound` | Removed. This error isn't exposed directly to the user in librdkafka. |
392394
| `KafkaJSLockTimeout` | Removed. This error is not applicable while using librdkafka. |
393395
| `KafkaJSUnsupportedMagicByteInMessageSet` | Removed. It is subsumed into `KafkaJSError` where `error.code === Kafka.ErrorCode.ERR_UNSUPPORTED_VERSION`. |
394-
| `KafkaJSDeleteTopicRecordsError` | Removed, as the Admin Client doesn't have this yet. May be added back again, or changed. |
395396
| `KafkaJSInvariantViolation` | Removed, as it's not applicable to librdkafka. Errors in internal state are subsumed into `KafkaJSError` where `error.code === Kafka.ErrorCode.ERR__STATE`. |
396397
| `KafkaJSInvalidVarIntError` | Removed. This error isn't exposed directly to the user in librdkafka. |
397398
| `KafkaJSInvalidLongError` | Removed. This error isn't exposed directly to the user in librdkafka. |
398-
| `KafkaJSCreateTopicError` | Removed, as the Admin Client doesn't have this yet. May be added back again, or changed.. |
399399
| `KafkaJSAlterPartitionReassignmentsError` | removed, as the RPC is not used in librdkafka. |
400400
| `KafkaJSFetcherRebalanceError` | Removed. This error isn't exposed directly to the user in librdkafka. |
401401
| `KafkaJSConnectionError` | `broker` is removed from this object. |

lib/kafkajs/_admin.js

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -325,25 +325,33 @@ class Admin {
325325

326326
/* Convert each topic to a format suitable for node-rdkafka, and dispatch the call. */
327327
let allTopicsCreated = true;
328+
const errors = [];
328329
const ret =
329330
options.topics
330331
.map(this.#topicConfigToRdKafka)
331-
.map(topicConfig => new Promise((resolve, reject) => {
332+
.map(topicConfig => new Promise(resolve => {
332333
this.#internalClient.createTopic(topicConfig, options.timeout ?? 5000, (err) => {
333334
if (err) {
334335
if (err.code === error.ErrorCodes.ERR_TOPIC_ALREADY_EXISTS) {
335336
allTopicsCreated = false;
336337
resolve();
337338
return;
338339
}
339-
reject(createKafkaJsErrorFromLibRdKafkaError(err));
340+
const e = createKafkaJsErrorFromLibRdKafkaError(err);
341+
const createTopicError = new error.KafkaJSCreateTopicError(e, topicConfig.topic, e /* includes the properties */);
342+
errors.push(createTopicError);
343+
resolve(); // Don't reject this promise, instead, look at the errors array later.
340344
} else {
341345
resolve();
342346
}
343347
});
344348
}));
345349

346-
return Promise.all(ret).then(() => allTopicsCreated);
350+
await Promise.allSettled(ret);
351+
if (errors.length > 0) {
352+
throw new error.KafkaJSAggregateError("Topic creation errors", errors);
353+
}
354+
return allTopicsCreated;
347355
}
348356

349357
/**
@@ -442,9 +450,24 @@ class Admin {
442450
this.#internalClient.deleteGroups(groups, options, (err, reports) => {
443451
if (err) {
444452
reject(createKafkaJsErrorFromLibRdKafkaError(err));
445-
} else {
446-
resolve(reports);
453+
return;
454+
}
455+
456+
/* Convert group-level errors to KafkaJS errors if required. */
457+
let errorsPresent = false;
458+
reports = reports.map(groupReport => {
459+
if (groupReport.error) {
460+
errorsPresent = true;
461+
groupReport.error = createKafkaJsErrorFromLibRdKafkaError(groupReport.error);
462+
}
463+
return groupReport;
464+
});
465+
466+
if (errorsPresent) {
467+
reject(new error.KafkaJSDeleteGroupsError('Error in DeleteGroups', reports));
468+
return;
447469
}
470+
resolve(reports);
448471
});
449472
});
450473
}
@@ -645,9 +668,35 @@ class Admin {
645668
this.#internalClient.deleteRecords(topicPartitionOffsets, options, (err, results) => {
646669
if (err) {
647670
reject(createKafkaJsErrorFromLibRdKafkaError(err));
648-
} else {
649-
resolve(results);
671+
return;
672+
}
673+
674+
let errorsPresent = false;
675+
results = results.map(result => {
676+
if (result.error) {
677+
errorsPresent = true;
678+
result.error = createKafkaJsErrorFromLibRdKafkaError(result.error);
679+
}
680+
return result;
681+
});
682+
683+
if (errorsPresent) {
684+
const partitionsWithError =
685+
{
686+
/* Note that, for API compatibility, we must filter out partitions
687+
* without errors, even though it is more useful to return all of
688+
* them so the user can check offsets. */
689+
partitions:
690+
results.filter(result => result.error).map(result => ({
691+
partition: result.partition,
692+
offset: String(result.lowWatermark),
693+
error: result.error,
694+
}))
695+
};
696+
reject(new error.KafkaJSDeleteTopicRecordsError(partitionsWithError));
697+
return;
650698
}
699+
resolve(results);
651700
});
652701
});
653702
}

lib/kafkajs/_error.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,37 @@ class KafkaJSLockTimeout extends KafkaJSTimeout {
188188
}
189189
}
190190

191+
class KafkaJSCreateTopicError extends KafkaJSProtocolError {
192+
constructor(e, topicName, properties) {
193+
super(e, properties);
194+
this.topic = topicName;
195+
this.name = 'KafkaJSCreateTopicError';
196+
}
197+
}
198+
199+
class KafkaJSDeleteGroupsError extends KafkaJSError {
200+
constructor(e, groups) {
201+
super(e);
202+
this.groups = groups || [];
203+
this.name = 'KafkaJSDeleteGroupsError';
204+
}
205+
}
206+
207+
class KafkaJSDeleteTopicRecordsError extends KafkaJSError {
208+
constructor({ partitions }) {
209+
/*
210+
* This error is retriable if all the errors were retriable
211+
*/
212+
const retriable = partitions
213+
.filter(({ error }) => error !== null)
214+
.every(({ error }) => error.retriable === true);
215+
216+
super('Error while deleting records', { retriable });
217+
this.name = 'KafkaJSDeleteTopicRecordsError';
218+
this.partitions = partitions;
219+
}
220+
}
221+
191222
/**
192223
* KafkaJSAggregateError represents an error raised when multiple errors occur at once.
193224
* @extends Error
@@ -248,6 +279,9 @@ module.exports = {
248279
KafkaJSNotImplemented,
249280
KafkaJSTimeout,
250281
KafkaJSLockTimeout,
282+
KafkaJSCreateTopicError,
283+
KafkaJSDeleteGroupsError,
284+
KafkaJSDeleteTopicRecordsError,
251285
KafkaJSAggregateError,
252286
KafkaJSNoBrokerAvailableError,
253287
isRebalancing,
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
jest.setTimeout(30000);
2+
3+
const {
4+
secureRandom,
5+
createAdmin,
6+
sleep,
7+
} = require('../testhelpers');
8+
const { KafkaJSAggregateError, KafkaJSCreateTopicError, ErrorCodes } = require('../../../lib').KafkaJS;
9+
10+
describe('Admin > createTopics', () => {
11+
let topicNames, admin;
12+
13+
beforeEach(async () => {
14+
topicNames = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`];
15+
admin = createAdmin({});
16+
});
17+
18+
afterEach(async () => {
19+
admin && (await admin.disconnect());
20+
});
21+
22+
it('should create topics', async () => {
23+
await admin.connect();
24+
await expect(admin.createTopics({
25+
topics: [{
26+
topic: topicNames[0]
27+
}, {
28+
topic: topicNames[1],
29+
replicationFactor: 1,
30+
}]
31+
})).resolves.toEqual(true);
32+
await sleep(1000); /* wait for metadata propagation */
33+
34+
const listTopicsResult = await admin.listTopics();
35+
expect(listTopicsResult).toEqual(
36+
expect.arrayContaining(topicNames)
37+
);
38+
});
39+
40+
it('should indicate if topics were already created', async () => {
41+
await admin.connect();
42+
await expect(admin.createTopics({
43+
topics: [{
44+
topic: topicNames[0]
45+
}]
46+
})).resolves.toEqual(true);
47+
await sleep(1000); /* wait for metadata propagation */
48+
49+
await expect(admin.createTopics({
50+
topics: [{
51+
topic: topicNames[0]
52+
}]
53+
})).resolves.toEqual(false); /* topic already exists */
54+
await sleep(1000); /* wait for metadata propagation */
55+
56+
57+
await expect(admin.createTopics({
58+
topics: [{
59+
topic: topicNames[0]
60+
}, {
61+
topic: topicNames[1]
62+
}]
63+
})).resolves.toEqual(false); /* Even of one topic already exists */
64+
});
65+
66+
it('should throw topic errors', async () => {
67+
await admin.connect();
68+
69+
let storedErr;
70+
await expect(admin.createTopics({
71+
topics: [{
72+
topic: topicNames[0] + '-invalid',
73+
replicationFactor: 9090, /* unlikely that anyone has this many brokers in test env */
74+
},
75+
{
76+
topic: topicNames[1] + '-invalid',
77+
numPartitions: 0, /* 0 partitions is invalid */
78+
}, {
79+
topic: topicNames[0]
80+
}]
81+
}).catch(err => {
82+
/* Store the error for checking contents later. */
83+
storedErr = err;
84+
throw err;
85+
})).rejects.toThrow(KafkaJSAggregateError);
86+
await sleep(1000); /* wait for metadata propagation */
87+
88+
expect(storedErr.message).toMatch(/Topic creation errors/);
89+
expect(storedErr.errors).toHaveLength(2);
90+
91+
const replicationErr = storedErr.errors.find(e => e.topic === topicNames[0] + '-invalid');
92+
expect(replicationErr).toBeInstanceOf(KafkaJSCreateTopicError);
93+
expect(replicationErr.code).toEqual(ErrorCodes.ERR_INVALID_REPLICATION_FACTOR);
94+
95+
const partitionsErr = storedErr.errors.find(e => e.topic === topicNames[1]+ '-invalid');
96+
expect(partitionsErr).toBeInstanceOf(KafkaJSCreateTopicError);
97+
expect(partitionsErr.code).toEqual(ErrorCodes.ERR_INVALID_PARTITIONS);
98+
99+
/* Despite errors the valid topic should still be created. */
100+
await expect(admin.listTopics()).resolves.toEqual(expect.arrayContaining([topicNames[0]]));
101+
});
102+
});
103+

0 commit comments

Comments
 (0)