Skip to content

Commit 3ea556c

Browse files
committed
Add AdminAPI for describeGroups
1 parent 1777f4d commit 3ea556c

File tree

13 files changed

+795
-47
lines changed

13 files changed

+795
-47
lines changed

index.d.ts

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ export interface WatermarkOffsets{
7272
export interface TopicPartition {
7373
topic: string;
7474
partition: number;
75+
error?: LibrdKafkaError;
7576
}
7677

7778
export interface TopicPartitionOffset extends TopicPartition{
@@ -349,6 +350,56 @@ export interface GroupOverview {
349350
state: ConsumerGroupStates;
350351
}
351352

353+
export enum AclOperationTypes {
354+
UNKNOWN = 0,
355+
ANY = 1,
356+
ALL = 2,
357+
READ = 3,
358+
WRITE = 4,
359+
CREATE = 5,
360+
DELETE = 6,
361+
ALTER = 7,
362+
DESCRIBE = 8,
363+
CLUSTER_ACTION = 9,
364+
DESCRIBE_CONFIGS = 10,
365+
ALTER_CONFIGS = 11,
366+
IDEMPOTENT_WRITE = 12,
367+
}
368+
369+
export type MemberDescription = {
370+
clientHost: string
371+
clientId: string
372+
memberId: string
373+
memberAssignment: Buffer
374+
memberMetadata: Buffer
375+
groupInstanceId?: string,
376+
assignment: TopicPartition[]
377+
}
378+
379+
export type Node = {
380+
id: number
381+
host: string
382+
port: number
383+
rack?: string
384+
}
385+
386+
export type GroupDescription = {
387+
groupId: string
388+
error?: LibrdKafkaError
389+
members: MemberDescription[]
390+
protocol: string
391+
isSimpleConsumerGroup: boolean;
392+
protocolType: string
393+
partitionAssignor: string
394+
state: ConsumerGroupStates
395+
coordinator: Node
396+
authorizedOperations?: AclOperationTypes[]
397+
}
398+
399+
export type GroupDescriptions = {
400+
groups: GroupDescription[],
401+
}
402+
352403
export interface IAdminClient {
353404
createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void;
354405
createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
@@ -360,9 +411,14 @@ export interface IAdminClient {
360411
createPartitions(topic: string, desiredPartitions: number, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
361412

362413
listGroups(cb?: (err: LibrdKafkaError, result: { groups: GroupOverview[], errors: LibrdKafkaError[] }) => any): void;
363-
listGroups(options: { timeout?: number, matchConsumerGroupStates?: ConsumerGroupStates[] },
414+
listGroups(options?: { timeout?: number, matchConsumerGroupStates?: ConsumerGroupStates[] },
364415
cb?: (err: LibrdKafkaError, result: { groups: GroupOverview[], errors: LibrdKafkaError[] }) => any): void;
365416

417+
describeGroups(groupIds: string[], cb?: (err: LibrdKafkaError, result: GroupDescriptions) => any): void;
418+
describeGroups(groupIds: string[],
419+
options?: { timeout?: number, includeAuthorizedOperations?: boolean },
420+
cb?: (err: LibrdKafkaError, result: GroupDescriptions) => any): void;
421+
366422
disconnect(): void;
367423
}
368424

lib/admin.js

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,28 @@ const ConsumerGroupStates = Object.seal({
2020
STABLE: 3,
2121
DEAD: 4,
2222
EMPTY: 5,
23-
})
23+
});
2424

25+
const AclOperationTypes = Object.seal({
26+
UNKNOWN: 0,
27+
ANY: 1,
28+
ALL: 2,
29+
READ: 3,
30+
WRITE: 4,
31+
CREATE: 5,
32+
DELETE: 6,
33+
ALTER: 7,
34+
DESCRIBE: 8,
35+
CLUSTER_ACTION: 9,
36+
DESCRIBE_CONFIGS: 10,
37+
ALTER_CONFIGS: 11,
38+
IDEMPOTENT_WRITE: 12,
39+
});
2540

2641
module.exports = {
2742
create: createAdminClient,
2843
ConsumerGroupStates,
44+
AclOperationTypes,
2945
};
3046

3147
var Client = require('./client');
@@ -267,4 +283,45 @@ AdminClient.prototype.listGroups = function (options, cb) {
267283
cb(null, groups);
268284
}
269285
});
270-
}
286+
}
287+
288+
/**
289+
* Describe consumer groups.
290+
* @param {string[]} groups - The names of the groups to describe.
291+
* @param {any?} options
292+
* @param {number?} options.timeout - The request timeout in milliseconds.
293+
* May be unset (default: 5000)
294+
* @param {boolean?} options.includeAuthorizedOperations - If true, include operations allowed on the group by the calling client (default: false).
295+
* @param {function} cb - The callback to be executed when finished.
296+
*
297+
* Valid ways to call this function:
298+
* describeGroups(groups, cb)
299+
* describeGroups(groups, options, cb)
300+
*/
301+
AdminClient.prototype.describeGroups = function (groups, options, cb) {
302+
if (!this._isConnected) {
303+
throw new Error('Client is disconnected');
304+
}
305+
306+
if (typeof options === 'function') {
307+
cb = options;
308+
options = {};
309+
}
310+
311+
if (!options) {
312+
options = {};
313+
}
314+
315+
this._client.describeGroups(groups, options, function (err, descriptions) {
316+
if (err) {
317+
if (cb) {
318+
cb(LibrdKafkaError.create(err));
319+
}
320+
return;
321+
}
322+
323+
if (cb) {
324+
cb(null, descriptions);
325+
}
326+
});
327+
}

lib/kafkajs/_admin.js

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ class Admin {
106106

107107
this.#state = AdminState.CONNECTING;
108108

109-
const config = this.#config();
109+
const config = this.#config();
110110

111111
return new Promise((resolve, reject) => {
112112
try {
@@ -258,6 +258,35 @@ class Admin {
258258
});
259259
}
260260

261+
/**
262+
* Describe consumer groups.
263+
*
264+
* @param {string[]} groups - The names of the groups to describe.
265+
* @param {object?} options
266+
* @param {number?} options.timeout - The request timeout in milliseconds.
267+
* May be unset (default: 5000)
268+
* @param {boolean?} options.includeAuthorizedOperations - If true, include operations allowed on the group by the calling client (default: false).
269+
* @returns {Promise<import("../../types/kafkajs").GroupDescriptions>}
270+
*/
271+
async describeGroups(groups, options = {}) {
272+
if (this.#state !== AdminState.CONNECTED) {
273+
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
274+
}
275+
276+
return new Promise((resolve, reject) => {
277+
this.#internalClient.describeGroups(groups, options, (err, descriptions) => {
278+
if (err) {
279+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
280+
} else {
281+
resolve(descriptions);
282+
}
283+
});
284+
});
285+
}
261286
}
262287

263-
module.exports = { Admin, ConsumerGroupStates: RdKafka.AdminClient.ConsumerGroupStates }
288+
module.exports = {
289+
Admin,
290+
ConsumerGroupStates: RdKafka.AdminClient.ConsumerGroupStates,
291+
AclOperationTypes: RdKafka.AdminClient.AclOperationTypes
292+
}

lib/kafkajs/_kafka.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
const { Producer, CompressionTypes } = require('./_producer');
22
const { Consumer, PartitionAssigners } = require('./_consumer');
3-
const { Admin, ConsumerGroupStates } = require('./_admin');
3+
const { Admin, ConsumerGroupStates, AclOperationTypes } = require('./_admin');
44
const error = require('./_error');
55
const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common');
66

@@ -86,4 +86,5 @@ module.exports = {
8686
PartitionAssigners,
8787
PartitionAssignors: PartitionAssigners,
8888
CompressionTypes,
89-
ConsumerGroupStates };
89+
ConsumerGroupStates,
90+
AclOperationTypes };

src/admin.cc

Lines changed: 124 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
8989

9090
// Consumer group related operations
9191
Nan::SetPrototypeMethod(tpl, "listGroups", NodeListGroups);
92+
Nan::SetPrototypeMethod(tpl, "describeGroups", NodeDescribeGroups);
9293

9394
Nan::SetPrototypeMethod(tpl, "connect", NodeConnect);
9495
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
@@ -455,18 +456,88 @@ Baton AdminClient::ListGroups(
455456
}
456457

457458
// Create queue just for this operation.
458-
rd_kafka_queue_t *topic_rkqu = rd_kafka_queue_new(m_client->c_ptr());
459+
rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr());
459460

460-
rd_kafka_ListConsumerGroups(m_client->c_ptr(), options, topic_rkqu);
461+
rd_kafka_ListConsumerGroups(m_client->c_ptr(), options, rkqu);
461462

462463
// Poll for an event by type in that queue
463464
// DON'T destroy the event. It is the out parameter, and ownership is
464465
// the caller's.
465466
*event_response = PollForEvent(
466-
topic_rkqu, RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT, timeout_ms);
467+
rkqu, RD_KAFKA_EVENT_LISTCONSUMERGROUPS_RESULT, timeout_ms);
467468

468469
// Destroy the queue since we are done with it.
469-
rd_kafka_queue_destroy(topic_rkqu);
470+
rd_kafka_queue_destroy(rkqu);
471+
472+
// Destroy the options we just made because we polled already
473+
rd_kafka_AdminOptions_destroy(options);
474+
475+
// If we got no response from that operation, this is a failure
476+
// likely due to time out
477+
if (*event_response == NULL) {
478+
return Baton(RdKafka::ERR__TIMED_OUT);
479+
}
480+
481+
// Now we can get the error code from the event
482+
if (rd_kafka_event_error(*event_response)) {
483+
// If we had a special error code, get out of here with it
484+
const rd_kafka_resp_err_t errcode = rd_kafka_event_error(*event_response);
485+
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
486+
}
487+
488+
// At this point, event_response contains the result, which needs
489+
// to be parsed/converted by the caller.
490+
return Baton(RdKafka::ERR_NO_ERROR);
491+
}
492+
}
493+
494+
Baton AdminClient::DescribeGroups(std::vector<std::string> &groups,
495+
bool include_authorized_operations,
496+
int timeout_ms,
497+
/* out */ rd_kafka_event_t **event_response) {
498+
if (!IsConnected()) {
499+
return Baton(RdKafka::ERR__STATE);
500+
}
501+
502+
{
503+
scoped_shared_write_lock lock(m_connection_lock);
504+
if (!IsConnected()) {
505+
return Baton(RdKafka::ERR__STATE);
506+
}
507+
508+
// Make admin options to establish that we are describing groups
509+
rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new(
510+
m_client->c_ptr(), RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS);
511+
512+
if (include_authorized_operations) {
513+
rd_kafka_error_t *error =
514+
rd_kafka_AdminOptions_set_include_authorized_operations(
515+
options, include_authorized_operations);
516+
if (error) {
517+
return Baton::BatonFromErrorAndDestroy(error);
518+
}
519+
}
520+
521+
// Create queue just for this operation.
522+
rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr());
523+
524+
// Construct a char** to pass to librdkafka. Avoid too many allocations.
525+
std::vector<const char *> c_groups(groups.size());
526+
for (size_t i = 0; i < groups.size(); i++) {
527+
c_groups[i] = groups[i].c_str();
528+
}
529+
530+
rd_kafka_DescribeConsumerGroups(m_client->c_ptr(), &c_groups[0],
531+
groups.size(), options, rkqu);
532+
533+
// Poll for an event by type in that queue
534+
// DON'T destroy the event. It is the out parameter, and ownership is
535+
// the caller's.
536+
*event_response = PollForEvent(
537+
rkqu, RD_KAFKA_EVENT_DESCRIBECONSUMERGROUPS_RESULT, timeout_ms);
538+
539+
// Destroy the queue since we are done with it.
540+
rd_kafka_queue_destroy(rkqu);
470541

471542
// Destroy the options we just made because we polled already
472543
rd_kafka_AdminOptions_destroy(options);
@@ -696,10 +767,10 @@ NAN_METHOD(AdminClient::NodeListGroups) {
696767

697768
// Get the match states, or not if they are unset.
698769
std::vector<rd_kafka_consumer_group_state_t> match_states;
699-
v8::Local<v8::String> matchConsumerGroupStatesKey =
770+
v8::Local<v8::String> match_consumer_group_states_key =
700771
Nan::New("matchConsumerGroupStates").ToLocalChecked();
701772
bool is_match_states_set =
702-
Nan::Has(config, matchConsumerGroupStatesKey).FromMaybe(false);
773+
Nan::Has(config, match_consumer_group_states_key).FromMaybe(false);
703774
v8::Local<v8::Array> match_states_array;
704775

705776
if (is_match_states_set) {
@@ -713,4 +784,51 @@ NAN_METHOD(AdminClient::NodeListGroups) {
713784
callback, client, is_match_states_set, match_states, timeout_ms));
714785
}
715786

787+
/**
788+
* Describe Consumer Groups.
789+
*/
790+
NAN_METHOD(AdminClient::NodeDescribeGroups) {
791+
Nan::HandleScope scope;
792+
793+
if (info.Length() < 3 || !info[2]->IsFunction()) {
794+
// Just throw an exception
795+
return Nan::ThrowError("Need to specify a callback");
796+
}
797+
798+
if (!info[0]->IsArray()) {
799+
return Nan::ThrowError("Must provide group name array");
800+
}
801+
802+
if (!info[1]->IsObject()) {
803+
return Nan::ThrowError("Must provide options object");
804+
}
805+
806+
// Get list of group names to describe.
807+
v8::Local<v8::Array> group_names = info[0].As<v8::Array>();
808+
if (group_names->Length() == 0) {
809+
return Nan::ThrowError("Must provide at least one group name");
810+
}
811+
std::vector<std::string> group_names_vector =
812+
v8ArrayToStringVector(group_names);
813+
814+
v8::Local<v8::Object> config = info[1].As<v8::Object>();
815+
816+
// Get the timeout - default 5000.
817+
int timeout_ms = GetParameter<int64_t>(config, "timeout", 5000);
818+
819+
// Get whether to include authorized operations - default false.
820+
bool include_authorized_operations =
821+
GetParameter<bool>(config, "includeAuthorizedOperations", false);
822+
823+
// Create the final callback object
824+
v8::Local<v8::Function> cb = info[2].As<v8::Function>();
825+
Nan::Callback *callback = new Nan::Callback(cb);
826+
AdminClient *client = ObjectWrap::Unwrap<AdminClient>(info.This());
827+
828+
// Queue the work.
829+
Nan::AsyncQueueWorker(new Workers::AdminClientDescribeGroups(
830+
callback, client, group_names_vector, include_authorized_operations,
831+
timeout_ms));
832+
}
833+
716834
} // namespace NodeKafka

0 commit comments

Comments
 (0)