Skip to content

Commit 76f479c

Browse files
authored
DescribeTopics api implemented (#155)
* describeTopics api implemented * addressing comments * addressing lint errors * requested changes * requested changes * reverting space indent
1 parent 83e1a9f commit 76f479c

File tree

14 files changed

+869
-4
lines changed

14 files changed

+869
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ v0.5.0 is a limited availability feature release. It is supported for all usage.
77

88
1. Add support for an Admin API to delete records.(#141).
99
2. Fixes an issue with unresolved raced Promises leaking in the consumer (#151).
10+
3. Add support for an Admin API to describe topics.(#155).
1011

1112

1213
# confluent-kafka-javascript v0.4.0

MIGRATION.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,8 @@ The admin-client only has support for a limited subset of methods, with more to
335335
`requireStableOffsets` option but `resolveOffsets` option is not yet supported.
336336
* The `deleteTopicRecords` method is supported with additional `timeout`
337337
and `operationTimeout` option.
338+
* The `fetchTopicMetadata` method is supported with additional `timeout`
339+
and `includeAuthorizedOperations` option. Fetching for all topics is not advisable.
338340
339341
### Using the Schema Registry
340342
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
2+
const { parseArgs } = require('node:util');
3+
4+
async function fetchMetadata() {
5+
// Parse command-line arguments
6+
const args = parseArgs({
7+
allowPositionals: true,
8+
options: {
9+
'bootstrap-servers': {
10+
type: 'string',
11+
short: 'b',
12+
default: 'localhost:9092',
13+
},
14+
'timeout': {
15+
type: 'string',
16+
short: 't',
17+
default: '5000',
18+
},
19+
'include-authorized-operations': {
20+
type: 'boolean',
21+
short: 'a',
22+
default: false,
23+
},
24+
},
25+
});
26+
27+
const {
28+
'bootstrap-servers': bootstrapServers,
29+
timeout,
30+
'include-authorized-operations': includeAuthorizedOperations,
31+
} = args.values;
32+
33+
const [topicName] = args.positionals;
34+
35+
if (!topicName) {
36+
console.error('Topic name is required');
37+
process.exit(1);
38+
}
39+
40+
const kafka = new Kafka({
41+
kafkaJS: {
42+
brokers: [bootstrapServers],
43+
},
44+
});
45+
46+
const admin = kafka.admin();
47+
await admin.connect();
48+
49+
try {
50+
// Fetch the topic metadata with specified options
51+
const metadata = await admin.fetchTopicMetadata(
52+
{
53+
topics: [topicName],
54+
includeAuthorizedOperations: includeAuthorizedOperations,
55+
timeout: Number(timeout), // Convert timeout to a number
56+
});
57+
58+
console.log(`Metadata for topic "${topicName}":`, stringifyBigInt(metadata));
59+
} catch (err) {
60+
console.error('Error fetching topic metadata:', err);
61+
} finally {
62+
await admin.disconnect();
63+
}
64+
}
65+
66+
function stringifyBigInt(obj) {
67+
return JSON.stringify(
68+
obj,
69+
(key, value) =>
70+
typeof value === 'bigint'
71+
? value.toString() // Convert BigInt to string
72+
: value,
73+
2
74+
);
75+
}
76+
77+
fetchMetadata();

lib/admin.js

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ AdminClient.prototype.listConsumerGroupOffsets = function (listGroupOffsets, opt
493493
*
494494
* @param {import("../../types/rdkafka").TopicPartitionOffset[]} delRecords - The list of topic partitions and
495495
* offsets to delete records up to.
496-
* @param {number} options.operationTimeout - The operation timeout in milliseconds.
496+
* @param {number?} options.operationTimeout - The operation timeout in milliseconds.
497497
* May be unset (default: 60000)
498498
* @param {number?} options.timeout - The request timeout in milliseconds.
499499
* May be unset (default: 5000)
@@ -534,3 +534,44 @@ AdminClient.prototype.deleteRecords = function (delRecords, options, cb) {
534534
}
535535
});
536536
};
537+
538+
/**
539+
* Describe Topics.
540+
*
541+
* @param {string[]} topics - The names of the topics to describe.
542+
* @param {number?} options.timeout - The request timeout in milliseconds.
543+
* May be unset (default: 5000)
544+
* @param {boolean?} options.includeAuthorizedOperations - If true, include operations allowed on the topic by the calling client.
545+
* (default: false)
546+
* @param {function} cb - The callback to be executed when finished.
547+
*/
548+
AdminClient.prototype.describeTopics = function (topics, options, cb) {
549+
if (!this._isConnected) {
550+
throw new Error('Client is disconnected');
551+
}
552+
553+
if(!options) {
554+
options = {};
555+
}
556+
557+
if(!Object.hasOwn(options, 'timeout')) {
558+
options.timeout = 5000;
559+
}
560+
561+
if(!Object.hasOwn(options, 'includeAuthorizedOperations')) {
562+
options.includeAuthorizedOperations = false;
563+
}
564+
565+
this._client.describeTopics(topics, options, function (err, descriptions) {
566+
if (err) {
567+
if (cb) {
568+
cb(LibrdKafkaError.create(err));
569+
}
570+
return;
571+
}
572+
573+
if (cb) {
574+
cb(null, descriptions);
575+
}
576+
});
577+
};

lib/kafkajs/_admin.js

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ class Admin {
541541
*
542542
* @param {string} options.topic - The topic to delete offsets for.
543543
* @param {import("../../types/kafkajs").SeekEntry[]} options.partitons - The partitions to delete offsets for.
544-
* @param {number} options.operationTimeout - The operation timeout in milliseconds.
544+
* @param {number?} options.operationTimeout - The operation timeout in milliseconds.
545545
* May be unset (default: 60000)
546546
* @param {number?} options.timeout - The request timeout in milliseconds.
547547
* May be unset (default: 5000)
@@ -589,6 +589,66 @@ class Admin {
589589
});
590590
});
591591
}
592+
593+
/**
594+
* Describe topics.
595+
*
596+
* @param {string[]} options.topics - The topics to describe.
597+
* If unset, all topics will be described.
598+
* @param {number?} options.timeout - The request timeout in milliseconds.
599+
* May be unset (default: 5000)
600+
* @param {boolean?} options.includeAuthorizedOperations - If true, include operations allowed on the topic
601+
* by the calling client (default: false).
602+
*
603+
* @returns {Promise<{ topics: Array<import('../../types/kafkajs').ITopicMetadata> }>}
604+
*/
605+
async fetchTopicMetadata(options = {}) {
606+
if (this.#state !== AdminState.CONNECTED) {
607+
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
608+
}
609+
610+
let topics = options.topics;
611+
if (!Object.hasOwn(options, 'topics')) {
612+
try {
613+
topics = await this.listTopics();
614+
} catch (err) {
615+
throw createKafkaJsErrorFromLibRdKafkaError(err);
616+
}
617+
}
618+
619+
return new Promise((resolve, reject) => {
620+
this.#internalClient.describeTopics(topics, options, (err, metadata) => {
621+
if (err) {
622+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
623+
} else {
624+
625+
let errs = metadata.filter(topic => topic.error);
626+
if (errs.length > 0) {
627+
reject(createKafkaJsErrorFromLibRdKafkaError(errs[0].error));
628+
return;
629+
}
630+
const convertedMetadata = metadata.map(topic => ({
631+
name: topic.name,
632+
topicId: topic.topicId,
633+
isInternal: topic.isInternal,
634+
partitions: topic.partitions.map(partition => ({
635+
partitionErrorCode: error.ErrorCodes.ERR_NO_ERROR,
636+
partitionId: partition.partition,
637+
leader: partition.leader.id,
638+
leaderNode: partition.leader,
639+
replicas: partition.replicas.map(replica => replica.id),
640+
replicaNodes: partition.replicas,
641+
isr: partition.isr.map(isrNode => isrNode.id),
642+
isrNodes: partition.isr
643+
})),
644+
authorizedOperations: topic.authorizedOperations
645+
}));
646+
647+
resolve(convertedMetadata);
648+
}
649+
});
650+
});
651+
}
592652
}
593653

594654
module.exports = {

src/admin.cc

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
116116
Nan::SetPrototypeMethod(tpl, "deleteTopic", NodeDeleteTopic);
117117
Nan::SetPrototypeMethod(tpl, "createPartitions", NodeCreatePartitions);
118118
Nan::SetPrototypeMethod(tpl, "deleteRecords", NodeDeleteRecords);
119+
Nan::SetPrototypeMethod(tpl, "describeTopics", NodeDescribeTopics);
119120

120121
// Consumer group related operations
121122
Nan::SetPrototypeMethod(tpl, "listGroups", NodeListGroups);
@@ -812,6 +813,76 @@ Baton AdminClient::DeleteRecords(rd_kafka_DeleteRecords_t **del_records,
812813
}
813814
}
814815

816+
Baton AdminClient::DescribeTopics(rd_kafka_TopicCollection_t *topics,
817+
bool include_authorized_operations,
818+
int timeout_ms,
819+
rd_kafka_event_t **event_response) {
820+
if (!IsConnected()) {
821+
return Baton(RdKafka::ERR__STATE);
822+
}
823+
824+
{
825+
scoped_shared_write_lock lock(m_connection_lock);
826+
if (!IsConnected()) {
827+
return Baton(RdKafka::ERR__STATE);
828+
}
829+
830+
// Make admin options to establish that we are describing topics
831+
rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new(
832+
m_client->c_ptr(), RD_KAFKA_ADMIN_OP_DESCRIBETOPICS);
833+
834+
if (include_authorized_operations) {
835+
rd_kafka_error_t *error =
836+
rd_kafka_AdminOptions_set_include_authorized_operations(
837+
options, include_authorized_operations);
838+
if (error) {
839+
return Baton::BatonFromErrorAndDestroy(error);
840+
}
841+
}
842+
843+
char errstr[512];
844+
rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout(
845+
options, timeout_ms, errstr, sizeof(errstr));
846+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
847+
return Baton(static_cast<RdKafka::ErrorCode>(err), errstr);
848+
}
849+
850+
// Create queue just for this operation.
851+
rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr());
852+
853+
rd_kafka_DescribeTopics(m_client->c_ptr(), topics, options, rkqu);
854+
855+
// Poll for an event by type in that queue
856+
// DON'T destroy the event. It is the out parameter, and ownership is
857+
// the caller's.
858+
*event_response =
859+
PollForEvent(rkqu, RD_KAFKA_EVENT_DESCRIBETOPICS_RESULT, timeout_ms);
860+
861+
// Destroy the queue since we are done with it.
862+
rd_kafka_queue_destroy(rkqu);
863+
864+
// Destroy the options we just made because we polled already
865+
rd_kafka_AdminOptions_destroy(options);
866+
867+
// If we got no response from that operation, this is a failure
868+
// likely due to time out
869+
if (*event_response == NULL) {
870+
return Baton(RdKafka::ERR__TIMED_OUT);
871+
}
872+
873+
// Now we can get the error code from the event
874+
if (rd_kafka_event_error(*event_response)) {
875+
// If we had a special error code, get out of here with it
876+
const rd_kafka_resp_err_t errcode = rd_kafka_event_error(*event_response);
877+
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
878+
}
879+
880+
// At this point, event_response contains the result, which needs
881+
// to be parsed/converted by the caller.
882+
return Baton(RdKafka::ERR_NO_ERROR);
883+
}
884+
}
885+
815886
void AdminClient::ActivateDispatchers() {
816887
// Listen to global config
817888
m_gconfig->listen();
@@ -1298,4 +1369,59 @@ NAN_METHOD(AdminClient::NodeDeleteRecords) {
12981369
callback, client, delete_records, 1, operation_timeout_ms, timeout_ms));
12991370
}
13001371

1372+
/**
1373+
* Describe Topics.
1374+
*/
1375+
NAN_METHOD(AdminClient::NodeDescribeTopics) {
1376+
Nan::HandleScope scope;
1377+
1378+
if (info.Length() < 3 || !info[2]->IsFunction()) {
1379+
return Nan::ThrowError("Need to specify a callback");
1380+
}
1381+
1382+
if (!info[0]->IsArray()) {
1383+
return Nan::ThrowError("Must provide an array of 'topicNames'");
1384+
}
1385+
1386+
v8::Local<v8::Array> topicNames = info[0].As<v8::Array>();
1387+
1388+
if (topicNames->Length() == 0) {
1389+
return Nan::ThrowError("'topicNames' cannot be empty");
1390+
}
1391+
1392+
std::vector<std::string> topicNamesVector = v8ArrayToStringVector(topicNames);
1393+
1394+
const char **topics = static_cast<const char **>(
1395+
malloc(sizeof(const char *) * topicNamesVector.size()));
1396+
1397+
for (size_t i = 0; i < topicNamesVector.size(); i++) {
1398+
topics[i] = topicNamesVector[i].c_str();
1399+
}
1400+
1401+
/**
1402+
* The ownership of this is taken by
1403+
* Workers::AdminClientDescribeTopics and freeing it is also handled
1404+
* by that class.
1405+
*/
1406+
rd_kafka_TopicCollection_t *topic_collection =
1407+
rd_kafka_TopicCollection_of_topic_names(topics, topicNamesVector.size());
1408+
1409+
free(topics);
1410+
1411+
v8::Local<v8::Object> options = info[1].As<v8::Object>();
1412+
1413+
bool include_authorised_operations =
1414+
GetParameter<bool>(options, "includeAuthorizedOperations", false);
1415+
1416+
int timeout_ms = GetParameter<int64_t>(options, "timeout", 5000);
1417+
1418+
v8::Local<v8::Function> cb = info[2].As<v8::Function>();
1419+
Nan::Callback *callback = new Nan::Callback(cb);
1420+
AdminClient *client = ObjectWrap::Unwrap<AdminClient>(info.This());
1421+
1422+
Nan::AsyncQueueWorker(new Workers::AdminClientDescribeTopics(
1423+
callback, client, topic_collection,
1424+
include_authorised_operations, timeout_ms));
1425+
}
1426+
13011427
} // namespace NodeKafka

src/admin.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ class AdminClient : public Connection {
6868
Baton DeleteRecords(rd_kafka_DeleteRecords_t** del_records,
6969
size_t del_records_cnt, int operation_timeout_ms,
7070
int timeout_ms, rd_kafka_event_t** event_response);
71+
Baton DescribeTopics(rd_kafka_TopicCollection_t* topics,
72+
bool include_authorized_operations, int timeout_ms,
73+
rd_kafka_event_t** event_response);
7174

7275
protected:
7376
static Nan::Persistent<v8::Function> constructor;
@@ -91,6 +94,7 @@ class AdminClient : public Connection {
9194
static NAN_METHOD(NodeDeleteGroups);
9295
static NAN_METHOD(NodeListConsumerGroupOffsets);
9396
static NAN_METHOD(NodeDeleteRecords);
97+
static NAN_METHOD(NodeDescribeTopics);
9498

9599
static NAN_METHOD(NodeConnect);
96100
static NAN_METHOD(NodeDisconnect);

0 commit comments

Comments
 (0)