Skip to content

Commit a69206d

Browse files
committed
Add AdminAPI for deleteGroups
1 parent 3ea556c commit a69206d

File tree

11 files changed

+438
-3
lines changed

11 files changed

+438
-3
lines changed

index.d.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,12 @@ export type GroupDescriptions = {
400400
groups: GroupDescription[],
401401
}
402402

403+
export type DeleteGroupsResult = {
404+
groupId: string
405+
errorCode?: number
406+
error?: LibrdKafkaError
407+
}
408+
403409
export interface IAdminClient {
404410
createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void;
405411
createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;
@@ -419,6 +425,11 @@ export interface IAdminClient {
419425
options?: { timeout?: number, includeAuthorizedOperations?: boolean },
420426
cb?: (err: LibrdKafkaError, result: GroupDescriptions) => any): void;
421427

428+
deleteGroups(groupIds: string[], cb?: (err: LibrdKafkaError, result: DeleteGroupsResult[]) => any): void;
429+
deleteGroups(groupIds: string[],
430+
options?: { timeout?: number },
431+
cb?: (err: LibrdKafkaError, result: DeleteGroupsResult[]) => any): void;
432+
422433
disconnect(): void;
423434
}
424435

lib/admin.js

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,3 +325,43 @@ AdminClient.prototype.describeGroups = function (groups, options, cb) {
325325
}
326326
});
327327
}
328+
329+
/**
330+
* Delete consumer groups.
331+
* @param {string[]} groups - The names of the groups to delete.
332+
* @param {any?} options
333+
* @param {number?} options.timeout - The request timeout in milliseconds.
334+
* May be unset (default: 5000)
335+
* @param {function} cb - The callback to be executed when finished.
336+
*
337+
* Valid ways to call this function:
338+
* deleteGroups(groups, cb)
339+
* deleteGroups(groups, options, cb)
340+
*/
341+
AdminClient.prototype.deleteGroups = function (groups, options, cb) {
342+
if (!this._isConnected) {
343+
throw new Error('Client is disconnected');
344+
}
345+
346+
if (typeof options === 'function') {
347+
cb = options;
348+
options = {};
349+
}
350+
351+
if (!options) {
352+
options = {};
353+
}
354+
355+
this._client.deleteGroups(groups, options, function (err, reports) {
356+
if (err) {
357+
if (cb) {
358+
cb(LibrdKafkaError.create(err));
359+
}
360+
return;
361+
}
362+
363+
if (cb) {
364+
cb(null, reports);
365+
}
366+
});
367+
}

lib/kafkajs/_admin.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,30 @@ class Admin {
283283
});
284284
});
285285
}
286+
287+
/**
288+
* Delete consumer groups.
289+
* @param {string[]} groups - The names of the groups to delete.
290+
* @param {any?} options
291+
* @param {number?} options.timeout - The request timeout in milliseconds.
292+
* May be unset (default: 5000)
293+
* @returns {Promise<import("../../types/kafkajs").DeleteGroupsResult[]>}
294+
*/
295+
async deleteGroups(groups, options = {}) {
296+
if (this.#state !== AdminState.CONNECTED) {
297+
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
298+
}
299+
300+
return new Promise((resolve, reject) => {
301+
this.#internalClient.deleteGroups(groups, options, (err, reports) => {
302+
if (err) {
303+
reject(createKafkaJsErrorFromLibRdKafkaError(err));
304+
} else {
305+
resolve(reports);
306+
}
307+
});
308+
});
309+
}
286310
}
287311

288312
module.exports = {

src/admin.cc

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {
9090
// Consumer group related operations
9191
Nan::SetPrototypeMethod(tpl, "listGroups", NodeListGroups);
9292
Nan::SetPrototypeMethod(tpl, "describeGroups", NodeDescribeGroups);
93+
Nan::SetPrototypeMethod(tpl, "deleteGroups", NodeDeleteGroups);
9394

9495
Nan::SetPrototypeMethod(tpl, "connect", NodeConnect);
9596
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
@@ -446,6 +447,13 @@ Baton AdminClient::ListGroups(
446447
rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new(
447448
m_client->c_ptr(), RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS);
448449

450+
char errstr[512];
451+
rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout(
452+
options, timeout_ms, errstr, sizeof(errstr));
453+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
454+
return Baton(static_cast<RdKafka::ErrorCode>(err), errstr);
455+
}
456+
449457
if (is_match_states_set) {
450458
rd_kafka_error_t *error =
451459
rd_kafka_AdminOptions_set_match_consumer_group_states(
@@ -509,6 +517,13 @@ Baton AdminClient::DescribeGroups(std::vector<std::string> &groups,
509517
rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new(
510518
m_client->c_ptr(), RD_KAFKA_ADMIN_OP_DESCRIBECONSUMERGROUPS);
511519

520+
char errstr[512];
521+
rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout(
522+
options, timeout_ms, errstr, sizeof(errstr));
523+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
524+
return Baton(static_cast<RdKafka::ErrorCode>(err), errstr);
525+
}
526+
512527
if (include_authorized_operations) {
513528
rd_kafka_error_t *error =
514529
rd_kafka_AdminOptions_set_include_authorized_operations(
@@ -561,6 +576,67 @@ Baton AdminClient::DescribeGroups(std::vector<std::string> &groups,
561576
}
562577
}
563578

579+
Baton AdminClient::DeleteGroups(rd_kafka_DeleteGroup_t **group_list,
580+
size_t group_cnt, int timeout_ms,
581+
/* out */ rd_kafka_event_t **event_response) {
582+
if (!IsConnected()) {
583+
return Baton(RdKafka::ERR__STATE);
584+
}
585+
586+
{
587+
scoped_shared_write_lock lock(m_connection_lock);
588+
if (!IsConnected()) {
589+
return Baton(RdKafka::ERR__STATE);
590+
}
591+
592+
// Make admin options to establish that we are deleting groups
593+
rd_kafka_AdminOptions_t *options = rd_kafka_AdminOptions_new(
594+
m_client->c_ptr(), RD_KAFKA_ADMIN_OP_DELETEGROUPS);
595+
596+
char errstr[512];
597+
rd_kafka_resp_err_t err = rd_kafka_AdminOptions_set_request_timeout(
598+
options, timeout_ms, errstr, sizeof(errstr));
599+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
600+
return Baton(static_cast<RdKafka::ErrorCode>(err), errstr);
601+
}
602+
603+
// Create queue just for this operation.
604+
rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr());
605+
606+
rd_kafka_DeleteGroups(m_client->c_ptr(), group_list, group_cnt, options,
607+
rkqu);
608+
609+
// Poll for an event by type in that queue
610+
// DON'T destroy the event. It is the out parameter, and ownership is
611+
// the caller's.
612+
*event_response =
613+
PollForEvent(rkqu, RD_KAFKA_EVENT_DELETEGROUPS_RESULT, timeout_ms);
614+
615+
// Destroy the queue since we are done with it.
616+
rd_kafka_queue_destroy(rkqu);
617+
618+
// Destroy the options we just made because we polled already
619+
rd_kafka_AdminOptions_destroy(options);
620+
621+
// If we got no response from that operation, this is a failure
622+
// likely due to time out
623+
if (*event_response == NULL) {
624+
return Baton(RdKafka::ERR__TIMED_OUT);
625+
}
626+
627+
// Now we can get the error code from the event
628+
if (rd_kafka_event_error(*event_response)) {
629+
// If we had a special error code, get out of here with it
630+
const rd_kafka_resp_err_t errcode = rd_kafka_event_error(*event_response);
631+
return Baton(static_cast<RdKafka::ErrorCode>(errcode));
632+
}
633+
634+
// At this point, event_response contains the result, which needs
635+
// to be parsed/converted by the caller.
636+
return Baton(RdKafka::ERR_NO_ERROR);
637+
}
638+
}
639+
564640
void AdminClient::ActivateDispatchers() {
565641
// Listen to global config
566642
m_gconfig->listen();
@@ -831,4 +907,54 @@ NAN_METHOD(AdminClient::NodeDescribeGroups) {
831907
timeout_ms));
832908
}
833909

910+
/**
911+
* Delete Consumer Groups.
912+
*/
913+
NAN_METHOD(AdminClient::NodeDeleteGroups) {
914+
Nan::HandleScope scope;
915+
916+
if (info.Length() < 3 || !info[2]->IsFunction()) {
917+
// Just throw an exception
918+
return Nan::ThrowError("Need to specify a callback");
919+
}
920+
921+
if (!info[0]->IsArray()) {
922+
return Nan::ThrowError("Must provide group name array");
923+
}
924+
925+
if (!info[1]->IsObject()) {
926+
return Nan::ThrowError("Must provide options object");
927+
}
928+
929+
// Get list of group names to delete, and convert it into an
930+
// rd_kafka_DeleteGroup_t array.
931+
v8::Local<v8::Array> group_names = info[0].As<v8::Array>();
932+
if (group_names->Length() == 0) {
933+
return Nan::ThrowError("Must provide at least one group name");
934+
}
935+
std::vector<std::string> group_names_vector =
936+
v8ArrayToStringVector(group_names);
937+
938+
// The ownership of this array is transferred to the worker.
939+
rd_kafka_DeleteGroup_t **group_list = static_cast<rd_kafka_DeleteGroup_t **>(
940+
malloc(sizeof(rd_kafka_DeleteGroup_t *) * group_names_vector.size()));
941+
for (size_t i = 0; i < group_names_vector.size(); i++) {
942+
group_list[i] = rd_kafka_DeleteGroup_new(group_names_vector[i].c_str());
943+
}
944+
945+
v8::Local<v8::Object> config = info[1].As<v8::Object>();
946+
947+
// Get the timeout - default 5000.
948+
int timeout_ms = GetParameter<int64_t>(config, "timeout", 5000);
949+
950+
// Create the final callback object
951+
v8::Local<v8::Function> cb = info[2].As<v8::Function>();
952+
Nan::Callback *callback = new Nan::Callback(cb);
953+
AdminClient *client = ObjectWrap::Unwrap<AdminClient>(info.This());
954+
955+
// Queue the work.
956+
Nan::AsyncQueueWorker(new Workers::AdminClientDeleteGroups(
957+
callback, client, group_list, group_names_vector.size(), timeout_ms));
958+
}
959+
834960
} // namespace NodeKafka

src/admin.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class AdminClient : public Connection {
5959
Baton DescribeGroups(std::vector<std::string>& groups,
6060
bool include_authorized_operations, int timeout_ms,
6161
rd_kafka_event_t** event_response);
62+
Baton DeleteGroups(rd_kafka_DeleteGroup_t** group_list, size_t group_cnt,
63+
int timeout_ms, rd_kafka_event_t** event_response);
6264

6365
protected:
6466
static Nan::Persistent<v8::Function> constructor;
@@ -79,6 +81,7 @@ class AdminClient : public Connection {
7981
// Consumer group operations
8082
static NAN_METHOD(NodeListGroups);
8183
static NAN_METHOD(NodeDescribeGroups);
84+
static NAN_METHOD(NodeDeleteGroups);
8285

8386
static NAN_METHOD(NodeConnect);
8487
static NAN_METHOD(NodeDisconnect);

src/common.cc

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -961,6 +961,51 @@ v8::Local<v8::Object> FromDescribeConsumerGroupsResult(
961961
return returnObject;
962962
}
963963

964+
/**
965+
* @brief Converts a rd_kafka_DeleteGroups_result_t* into a v8 array.
966+
*/
967+
v8::Local<v8::Array> FromDeleteGroupsResult(
968+
const rd_kafka_DeleteGroups_result_t* result) {
969+
/* Return object type:
970+
[{
971+
groupId: string
972+
errorCode?: number
973+
error?: LibrdKafkaError
974+
}]
975+
*/
976+
v8::Local<v8::Array> returnArray = Nan::New<v8::Array>();
977+
size_t result_cnt;
978+
const rd_kafka_group_result_t** results =
979+
rd_kafka_DeleteGroups_result_groups(result, &result_cnt);
980+
981+
for (size_t i = 0; i < result_cnt; i++) {
982+
const rd_kafka_group_result_t* group_result = results[i];
983+
v8::Local<v8::Object> group_object = Nan::New<v8::Object>();
984+
985+
Nan::Set(group_object, Nan::New("groupId").ToLocalChecked(),
986+
Nan::New<v8::String>(rd_kafka_group_result_name(group_result))
987+
.ToLocalChecked());
988+
989+
const rd_kafka_error_t* error = rd_kafka_group_result_error(group_result);
990+
if (!error) {
991+
Nan::Set(group_object, Nan::New("errorCode").ToLocalChecked(),
992+
Nan::New<v8::Number>(RD_KAFKA_RESP_ERR_NO_ERROR));
993+
} else {
994+
RdKafka::ErrorCode code =
995+
static_cast<RdKafka::ErrorCode>(rd_kafka_error_code(error));
996+
const char* msg = rd_kafka_error_string(error);
997+
998+
Nan::Set(group_object, Nan::New("errorCode").ToLocalChecked(),
999+
Nan::New<v8::Number>(code));
1000+
Nan::Set(group_object, Nan::New("error").ToLocalChecked(),
1001+
RdKafkaError(code, msg));
1002+
}
1003+
Nan::Set(returnArray, i, group_object);
1004+
}
1005+
1006+
return returnArray;
1007+
}
1008+
9641009
} // namespace Admin
9651010

9661011
} // namespace Conversion

src/common.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ v8::Local<v8::Object> FromConsumerGroupDescription(
124124
const rd_kafka_ConsumerGroupDescription_t *desc);
125125
v8::Local<v8::Object> FromDescribeConsumerGroupsResult(
126126
const rd_kafka_DescribeConsumerGroups_result_t *);
127+
128+
// DeleteGroups: Response
129+
v8::Local<v8::Array> FromDeleteGroupsResult(
130+
const rd_kafka_DeleteGroups_result_t *);
127131
} // namespace Admin
128132

129133
namespace TopicPartition {

0 commit comments

Comments
 (0)