Skip to content

Commit d6a9638

Browse files
committed
Add AdminAPI for listTopics
1 parent 419d47f commit d6a9638

File tree

7 files changed

+138
-0
lines changed

7 files changed

+138
-0
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
# confluent-kafka-javascript vNext
2+
3+
v0.1.12-devel is a pre-production, early-access release.
4+
5+
## Features
6+
7+
1. Add support for `listTopics` in the Admin API.
8+
9+
110
# confluent-kafka-javascript v0.1.11-devel
211

312
v0.1.11-devel is a pre-production, early-access release.

index.d.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,9 @@ export interface IAdminClient {
416416
createPartitions(topic: string, desiredPartitions: number, cb?: (err: LibrdKafkaError) => void): void;
417417
createPartitions(topic: string, desiredPartitions: number, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
418418

419+
listTopics(cb?: (err: LibrdKafkaError, topics: string[]) => any): void;
420+
listTopics(options?: { timeout?: number }, cb?: (err: LibrdKafkaError, topics: string[]) => any): void;
421+
419422
listGroups(cb?: (err: LibrdKafkaError, result: { groups: GroupOverview[], errors: LibrdKafkaError[] }) => any): void;
420423
listGroups(options?: { timeout?: number, matchConsumerGroupStates?: ConsumerGroupStates[] },
421424
cb?: (err: LibrdKafkaError, result: { groups: GroupOverview[], errors: LibrdKafkaError[] }) => any): void;

lib/admin.js

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,3 +365,59 @@ AdminClient.prototype.deleteGroups = function (groups, options, cb) {
365365
}
366366
});
367367
}
368+
369+
/**
370+
* List topics.
371+
*
372+
* @param {any?} options
373+
* @param {number?} options.timeout - The request timeout in milliseconds.
374+
* May be unset (default: 5000)
375+
* @param {function} cb - The callback to be executed when finished.
376+
*
377+
* Valid ways to call this function:
378+
* listTopics(cb)
379+
* listTopics(options, cb)
380+
*/
381+
AdminClient.prototype.listTopics = function (options, cb) {
382+
if (!this._isConnected) {
383+
throw new Error('Client is disconnected');
384+
}
385+
386+
if (typeof options === 'function') {
387+
cb = options;
388+
options = {};
389+
}
390+
391+
if (!options) {
392+
options = {};
393+
}
394+
395+
// Always set allTopics to true, since we need a list.
396+
options.allTopics = true;
397+
if (!Object.hasOwn(options, 'timeout')) {
398+
options.timeout = 5000;
399+
}
400+
401+
// This definitely isn't the fastest way to list topics as
402+
// this makes a pretty large metadata request. But for the sake
403+
// of AdminAPI, this is okay.
404+
this._client.getMetadata(options, function (err, metadata) {
405+
if (err) {
406+
if (cb) {
407+
cb(LibrdKafkaError.create(err));
408+
}
409+
return;
410+
}
411+
412+
const topics = []
413+
if (metadata.topics) {
414+
for (const topic of metadata.topics) {
415+
topics.push(topic.name);
416+
}
417+
}
418+
419+
if (cb) {
420+
cb(null, topics);
421+
}
422+
});
423+
}

lib/kafkajs/_admin.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,30 @@ class Admin {
307307
});
308308
});
309309
}
310+
311+
/**
312+
* List topics.
313+
*
314+
* @param {any?} options
315+
* @param {number?} options.timeout - The request timeout in milliseconds.
316+
* May be unset (default: 5000)
317+
* @returns {Promise<string[]>}
318+
*/
319+
async listTopics(options = {}) {
320+
if (this.#state !== AdminState.CONNECTED) {
321+
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
322+
}
323+
324+
return new Promise((resolve, reject) => {
325+
this.#internalClient.listTopics(options, (err, topics) => {
326+
if (err) {
327+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
328+
} else {
329+
resolve(topics);
330+
}
331+
});
332+
});
333+
}
310334
}
311335

312336
module.exports = {

src/admin.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
9595
Nan::SetPrototypeMethod(tpl, "connect", NodeConnect);
9696
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
9797
Nan::SetPrototypeMethod(tpl, "setSaslCredentials", NodeSetSaslCredentials);
98+
Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata);
9899

99100
constructor.Reset(
100101
(tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked());
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
jest.setTimeout(30000);
2+
3+
const {
4+
secureRandom,
5+
createTopic,
6+
waitFor,
7+
createAdmin,
8+
} = require('../testhelpers');
9+
const { ConsumerGroupStates, ErrorCodes } = require('../../../lib').KafkaJS;
10+
11+
describe('Admin > listTopics', () => {
12+
let topicNames, admin;
13+
14+
beforeEach(async () => {
15+
topicNames = [`test-topic-${secureRandom()}`, `test-topic-${secureRandom()}`];
16+
17+
await createTopic({ topic: topicNames[0], });
18+
await createTopic({ topic: topicNames[1] });
19+
20+
admin = createAdmin({});
21+
});
22+
23+
afterEach(async () => {
24+
admin && (await admin.disconnect());
25+
});
26+
27+
it('should timeout', async () => {
28+
await admin.connect();
29+
30+
await expect(admin.listTopics({ timeout: 1 })).rejects.toHaveProperty(
31+
'code',
32+
ErrorCodes.ERR__TIMED_OUT
33+
);
34+
});
35+
36+
it('should list consumer topics', async () => {
37+
await admin.connect();
38+
const listTopicsResult = await admin.listTopics();
39+
expect(listTopicsResult).toEqual(
40+
expect.arrayContaining(topicNames)
41+
);
42+
});
43+
});
44+

types/kafkajs.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,7 @@ export type Admin = {
541541
topics: ITopicConfig[]
542542
}): Promise<boolean>
543543
deleteTopics(options: { topics: string[]; timeout?: number }): Promise<void>
544+
listTopics(options?: { timeout?: number }): Promise<string[]>
544545
listGroups(options?: {
545546
timeout?: number,
546547
matchConsumerGroupStates?: ConsumerGroupStates[]

0 commit comments

Comments
 (0)