Skip to content

Commit a30b761

Browse files
authored
[KIP-848]: Implemented KIP 848 changes in ListGroups API (#328)
* Implemented KIP 848 changes * style format * Expect apt groups * updated list group tests * Requested changes * Changelog changes and minor fix
1 parent 206347c commit a30b761

File tree

14 files changed

+124
-12
lines changed

14 files changed

+124
-12
lines changed

.semaphore/semaphore.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ blocks:
171171
- export BUILD_LIBRDKAFKA=0
172172
- npm run install-from-source
173173
jobs:
174-
- name: "Performance Test (Classic Protocol)"
174+
- name: "Performance Test"
175175
commands:
176176
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
177177
- docker compose -f test/docker/docker-compose.yml up -d && sleep 30

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ v1.4.0 is a feature release. It is supported for all usage.
55
## Enhancements
66

77
1. References librdkafka v2.11.0. Refer to the [librdkafka v2.11.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0) for more information.
8-
2. [KIP-848] `describeGroups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to being undefined.
8+
2. [KIP-848] `describeGroups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to being undefined (#329)
9+
3. [KIP-848] Admin API for listing consumer groups now has an optional filter to return only groups of given types (#328)
910

1011
# confluent-kafka-javascript v1.3.2
1112

examples/kafkajs/admin/list-groups.js

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
2-
const { Kafka, ConsumerGroupStates } = require('@confluentinc/kafka-javascript').KafkaJS;
2+
const { Kafka, ConsumerGroupStates, ConsumerGroupTypes } = require('@confluentinc/kafka-javascript').KafkaJS;
33
const { parseArgs } = require('node:util');
44

55
async function adminStart() {
@@ -20,13 +20,20 @@ async function adminStart() {
2020
short: 's',
2121
multiple: true,
2222
default: [],
23-
}
23+
},
24+
'types': {
25+
type: 'string',
26+
short: 't',
27+
multiple: true,
28+
default: [],
29+
},
2430
},
2531
});
2632

2733
let {
2834
'bootstrap-servers': bootstrapServers,
2935
states: matchConsumerGroupStates,
36+
types: matchConsumerGroupTypes,
3037
timeout,
3138
} = args.values;
3239

@@ -36,6 +43,9 @@ async function adminStart() {
3643
matchConsumerGroupStates = matchConsumerGroupStates.map(
3744
state => ConsumerGroupStates[state]);
3845

46+
matchConsumerGroupTypes = matchConsumerGroupTypes.map(
47+
type => ConsumerGroupTypes[type]);
48+
3949
const kafka = new Kafka({
4050
kafkaJS: {
4151
brokers: [bootstrapServers],
@@ -48,13 +58,15 @@ async function adminStart() {
4858
try {
4959
const groupOverview = await admin.listGroups({
5060
timeout,
51-
matchConsumerGroupStates
61+
matchConsumerGroupStates,
62+
matchConsumerGroupTypes
5263
});
5364
for (const group of groupOverview.groups) {
5465
console.log(`Group id: ${group.groupId}`);
5566
console.log(`\tType: ${group.protocolType}`);
5667
console.log(`\tIs simple: ${group.isSimpleConsumerGroup}`);
5768
console.log(`\tState: ${group.state}`);
69+
console.log(`\tType: ${group.type}`);
5870
}
5971
} catch(err) {
6072
console.log('List topics failed', err);

lib/admin.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,8 @@ AdminClient.prototype.createPartitions = function (topic, totalPartitions, timeo
373373
* May be unset (default: 5000).
374374
* @param {Array<RdKafka.ConsumerGroupStates>?} options.matchConsumerGroupStates -
375375
* A list of consumer group states to match. May be unset, fetches all states (default: unset).
376+
* @param {Array<RdKafka.ConsumerGroupTypes>?} options.matchConsumerGroupTypes -
377+
* A list of consumer group types to match. May be unset, fetches all types (default: unset).
376378
* @param {function} cb - The callback to be executed when finished.
377379
* @example
378380
* // Valid ways to call this function:

lib/kafkajs/_admin.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,9 @@ class Admin {
387387
* May be unset (default: 5000).
388388
* @param {Array<KafkaJS.ConsumerGroupStates>?} options.matchConsumerGroupStates -
389389
* A list of consumer group states to match. May be unset, fetches all states (default: unset).
390-
* @returns {Promise<{ groups: Array<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates}>, errors: Array<RdKafka.LibrdKafkaError> }>}
390+
* @param {Array<KafkaJS.ConsumerGroupTypes>?} options.matchConsumerGroupTypes -
391+
* A list of consumer group types to match. May be unset, fetches all types (default: unset).
392+
* @returns {Promise<{ groups: Array<{groupId: string, protocolType: string, isSimpleConsumerGroup: boolean, state: KafkaJS.ConsumerGroupStates, type: KafkaJS.ConsumerGroupTypes}>, errors: Array<RdKafka.LibrdKafkaError> }>}
391393
* Resolves with the list of consumer groups, rejects on error.
392394
*/
393395
async listGroups(options = {}) {

src/admin.cc

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,9 @@ Baton AdminClient::CreatePartitions(
483483

484484
Baton AdminClient::ListGroups(
485485
bool is_match_states_set,
486-
std::vector<rd_kafka_consumer_group_state_t> &match_states, int timeout_ms,
486+
std::vector<rd_kafka_consumer_group_state_t> &match_states,
487+
bool is_match_types_set,
488+
std::vector<rd_kafka_consumer_group_type_t> &match_types, int timeout_ms,
487489
/* out */ rd_kafka_event_t **event_response) {
488490
if (!IsConnected()) {
489491
return Baton(RdKafka::ERR__STATE);
@@ -515,6 +517,15 @@ Baton AdminClient::ListGroups(
515517
}
516518
}
517519

520+
if (is_match_types_set) {
521+
rd_kafka_error_t *error =
522+
rd_kafka_AdminOptions_set_match_consumer_group_types(
523+
options, &match_types[0], match_types.size());
524+
if (error) {
525+
return Baton::BatonFromErrorAndDestroy(error);
526+
}
527+
}
528+
518529
// Create queue just for this operation.
519530
rd_kafka_queue_t *rkqu = rd_kafka_queue_new(m_client->c_ptr());
520531

@@ -1195,9 +1206,26 @@ NAN_METHOD(AdminClient::NodeListGroups) {
11951206
}
11961207
}
11971208

1209+
std::vector<rd_kafka_consumer_group_type_t> match_types;
1210+
v8::Local<v8::String> match_consumer_group_types_key =
1211+
Nan::New("matchConsumerGroupTypes").ToLocalChecked();
1212+
bool is_match_types_set =
1213+
Nan::Has(config, match_consumer_group_types_key).FromMaybe(false);
1214+
v8::Local<v8::Array> match_types_array = Nan::New<v8::Array>();
1215+
1216+
if (is_match_types_set) {
1217+
match_types_array = GetParameter<v8::Local<v8::Array>>(
1218+
config, "matchConsumerGroupTypes", match_types_array);
1219+
if (match_types_array->Length()) {
1220+
match_types = Conversion::Admin::FromV8GroupTypeArray(
1221+
match_types_array);
1222+
}
1223+
}
1224+
11981225
// Queue the work.
11991226
Nan::AsyncQueueWorker(new Workers::AdminClientListGroups(
1200-
callback, client, is_match_states_set, match_states, timeout_ms));
1227+
callback, client, is_match_states_set, match_states, is_match_types_set,
1228+
match_types, timeout_ms));
12011229
}
12021230

12031231
/**

src/admin.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class AdminClient : public Connection {
5454
// Baton DescribeConfig(rd_kafka_NewTopic_t* topic, int timeout_ms);
5555
Baton ListGroups(bool is_match_states_set,
5656
std::vector<rd_kafka_consumer_group_state_t>& match_states,
57+
bool is_match_types_set,
58+
std::vector<rd_kafka_consumer_group_type_t>& match_types,
5759
int timeout_ms,
5860
rd_kafka_event_t** event_response);
5961
Baton DescribeGroups(std::vector<std::string>& groups,

src/common.cc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,35 @@ std::vector<rd_kafka_consumer_group_state_t> FromV8GroupStateArray(
908908
return returnVec;
909909
}
910910

911+
/**
912+
* @brief Converts a v8 array of group types into a vector of
913+
* rd_kafka_consumer_group_type_t.
914+
*/
915+
std::vector<rd_kafka_consumer_group_type_t> FromV8GroupTypeArray(
916+
v8::Local<v8::Array> array) {
917+
v8::Local<v8::Array> parameter = array.As<v8::Array>();
918+
std::vector<rd_kafka_consumer_group_type_t> returnVec;
919+
if (parameter->Length() >= 1) {
920+
for (unsigned int i = 0; i < parameter->Length(); i++) {
921+
v8::Local<v8::Value> v;
922+
if (!Nan::Get(parameter, i).ToLocal(&v)) {
923+
continue;
924+
}
925+
Nan::Maybe<int64_t> maybeT = Nan::To<int64_t>(v);
926+
if (maybeT.IsNothing()) {
927+
continue;
928+
}
929+
int64_t type_number = maybeT.FromJust();
930+
if (type_number < 0 || type_number >= RD_KAFKA_CONSUMER_GROUP_TYPE__CNT) {
931+
continue;
932+
}
933+
returnVec.push_back(
934+
static_cast<rd_kafka_consumer_group_type_t>(type_number));
935+
}
936+
}
937+
return returnVec;
938+
}
939+
911940
/**
912941
* @brief Converts a rd_kafka_ListConsumerGroups_result_t* into a v8 object.
913942
*/
@@ -920,6 +949,7 @@ v8::Local<v8::Object> FromListConsumerGroupsResult(
920949
protocolType: string,
921950
isSimpleConsumerGroup: boolean,
922951
state: ConsumerGroupState (internally a number)
952+
type: ConsumerGroupType (internally a number)
923953
}[],
924954
errors: LibrdKafkaError[]
925955
}
@@ -957,6 +987,9 @@ v8::Local<v8::Object> FromListConsumerGroupsResult(
957987
Nan::Set(groupObject, Nan::New("state").ToLocalChecked(),
958988
Nan::New<v8::Number>(rd_kafka_ConsumerGroupListing_state(group)));
959989

990+
Nan::Set(groupObject, Nan::New("type").ToLocalChecked(),
991+
Nan::New<v8::Number>(rd_kafka_ConsumerGroupListing_type(group)));
992+
960993
Nan::Set(groups, i, groupObject);
961994
}
962995

src/common.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ rd_kafka_NewTopic_t **FromV8TopicObjectArray(v8::Local<v8::Array>);
116116
std::vector<rd_kafka_consumer_group_state_t> FromV8GroupStateArray(
117117
v8::Local<v8::Array>);
118118

119+
// ListGroups: request
120+
std::vector<rd_kafka_consumer_group_type_t> FromV8GroupTypeArray(
121+
v8::Local<v8::Array> array);
122+
119123
// ListGroups: response
120124
v8::Local<v8::Object> FromListConsumerGroupsResult(
121125
const rd_kafka_ListConsumerGroups_result_t *);

src/workers.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1317,11 +1317,15 @@ void AdminClientCreatePartitions::HandleErrorCallback() {
13171317
AdminClientListGroups::AdminClientListGroups(
13181318
Nan::Callback* callback, AdminClient* client, bool is_match_states_set,
13191319
std::vector<rd_kafka_consumer_group_state_t>& match_states,
1320+
bool is_match_types_set,
1321+
std::vector<rd_kafka_consumer_group_type_t>& match_types,
13201322
const int& timeout_ms)
13211323
: ErrorAwareWorker(callback),
13221324
m_client(client),
13231325
m_is_match_states_set(is_match_states_set),
13241326
m_match_states(match_states),
1327+
m_is_match_types_set(is_match_types_set),
1328+
m_match_types(match_types),
13251329
m_timeout_ms(timeout_ms) {}
13261330

13271331
AdminClientListGroups::~AdminClientListGroups() {
@@ -1332,6 +1336,7 @@ AdminClientListGroups::~AdminClientListGroups() {
13321336

13331337
void AdminClientListGroups::Execute() {
13341338
Baton b = m_client->ListGroups(m_is_match_states_set, m_match_states,
1339+
m_is_match_types_set, m_match_types,
13351340
m_timeout_ms, &m_event_response);
13361341
if (b.err() != RdKafka::ERR_NO_ERROR) {
13371342
SetErrorBaton(b);

0 commit comments

Comments
 (0)