Skip to content

Commit c26a0a4

Browse files
authored
Add display of group configuration (#2376)
* Add display of group configuration Signed-off-by: Michael Edgar <medgar@redhat.com> * Add test cases dealing with group configurations Signed-off-by: Michael Edgar <medgar@redhat.com> * Align conditions to display groupId as link or plain text Signed-off-by: Michael Edgar <medgar@redhat.com> --------- Signed-off-by: Michael Edgar <medgar@redhat.com>
1 parent f99feb3 commit c26a0a4

File tree

41 files changed

+835
-123
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+835
-123
lines changed

api/src/main/java/com/github/streamshub/console/api/GroupsResource.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ public CompletionStage<Response> listGroups(
9191
Group.Fields.OFFSETS,
9292
Group.Fields.AUTHORIZED_OPERATIONS,
9393
Group.Fields.COORDINATOR,
94-
Group.Fields.PARTITION_ASSIGNOR
94+
Group.Fields.PARTITION_ASSIGNOR,
95+
Group.Fields.CONFIGS,
9596
},
9697
payload = ErrorCategory.InvalidQueryParameter.class)
9798
@Parameter(
@@ -110,7 +111,8 @@ public CompletionStage<Response> listGroups(
110111
Group.Fields.OFFSETS,
111112
Group.Fields.AUTHORIZED_OPERATIONS,
112113
Group.Fields.COORDINATOR,
113-
Group.Fields.PARTITION_ASSIGNOR
114+
Group.Fields.PARTITION_ASSIGNOR,
115+
Group.Fields.CONFIGS,
114116
}))
115117
List<String> fields,
116118

@@ -168,7 +170,8 @@ public CompletionStage<Response> describeGroup(
168170
Group.Fields.OFFSETS,
169171
Group.Fields.AUTHORIZED_OPERATIONS,
170172
Group.Fields.COORDINATOR,
171-
Group.Fields.PARTITION_ASSIGNOR
173+
Group.Fields.PARTITION_ASSIGNOR,
174+
Group.Fields.CONFIGS,
172175
},
173176
payload = ErrorCategory.InvalidQueryParameter.class)
174177
@Parameter(
@@ -187,7 +190,8 @@ public CompletionStage<Response> describeGroup(
187190
Group.Fields.OFFSETS,
188191
Group.Fields.AUTHORIZED_OPERATIONS,
189192
Group.Fields.COORDINATOR,
190-
Group.Fields.PARTITION_ASSIGNOR
193+
Group.Fields.PARTITION_ASSIGNOR,
194+
Group.Fields.CONFIGS,
191195
}))
192196
List<String> fields) {
193197

api/src/main/java/com/github/streamshub/console/api/model/Group.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Collections;
88
import java.util.Comparator;
99
import java.util.List;
10+
import java.util.Locale;
1011
import java.util.Map;
1112
import java.util.Optional;
1213

@@ -15,6 +16,7 @@
1516
import jakarta.json.JsonObjectBuilder;
1617
import jakarta.validation.Valid;
1718

19+
import org.apache.kafka.clients.consumer.GroupProtocol;
1820
import org.apache.kafka.common.errors.GroupIdNotFoundException;
1921
import org.eclipse.microprofile.openapi.annotations.media.Schema;
2022

@@ -65,6 +67,7 @@ public static final class Fields {
6567
public static final String PARTITION_ASSIGNOR = "partitionAssignor";
6668
public static final String OFFSETS = "offsets";
6769
public static final String SIMPLE_CONSUMER_GROUP = "simpleConsumerGroup";
70+
public static final String CONFIGS = "configs";
6871

6972
static final Comparator<Group> ID_COMPARATOR =
7073
comparing(Group::groupId);
@@ -91,8 +94,8 @@ STATE, nullsLast(comparing(Group::state)),
9194
", " + STATE +
9295
", " + MEMBERS +
9396
", " + COORDINATOR +
94-
", " + OFFSETS
95-
+ ", " + SIMPLE_CONSUMER_GROUP;
97+
", " + OFFSETS +
98+
", " + SIMPLE_CONSUMER_GROUP;
9699

97100
private Fields() {
98101
// Prevent instances
@@ -172,6 +175,10 @@ public static class Attributes {
172175
@JsonProperty
173176
List<String> authorizedOperations;
174177

178+
@JsonProperty
179+
@Schema(implementation = Object.class, oneOf = { ConfigEntry.ConfigEntryMap.class, JsonApiError.class })
180+
Either<Map<String, ConfigEntry>, JsonApiError> configs;
181+
175182
// Available via list offsets operation only
176183
@JsonProperty
177184
List<@Valid OffsetAndMetadata> offsets = Collections.emptyList();
@@ -267,6 +274,7 @@ private static Group fromKafkaModel(
267274
Optional.ofNullable(description.state()).map(Enum::name).orElse(null));
268275

269276
group.type(org.apache.kafka.common.GroupType.CLASSIC.toString());
277+
group.protocol(description.protocol());
270278
group.coordinator(Node.fromKafkaModel(description.coordinator()));
271279
group.members(description.members()
272280
.stream()
@@ -291,6 +299,7 @@ private static Group fromKafkaModel(
291299
Optional.ofNullable(description.groupState()).map(Enum::name).orElse(null));
292300

293301
group.type(description.type().toString());
302+
group.protocol(GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
294303
group.partitionAssignor(description.partitionAssignor());
295304
group.coordinator(Node.fromKafkaModel(description.coordinator()));
296305
group.members(description.members()
@@ -316,6 +325,7 @@ private static Group fromKafkaModel(
316325
Optional.ofNullable(description.groupState()).map(Enum::name).orElse(null));
317326

318327
group.type(org.apache.kafka.common.GroupType.SHARE.toString());
328+
group.protocol(org.apache.kafka.common.GroupType.SHARE.name().toLowerCase(Locale.ROOT));
319329
group.coordinator(Node.fromKafkaModel(description.coordinator()));
320330
group.members(description.members()
321331
.stream()
@@ -340,6 +350,7 @@ private static Group fromKafkaModel(
340350
Optional.ofNullable(description.groupState()).map(Enum::name).orElse(null));
341351

342352
group.type(org.apache.kafka.common.GroupType.STREAMS.toString());
353+
group.protocol(org.apache.kafka.common.GroupType.STREAMS.name().toLowerCase(Locale.ROOT));
343354
group.coordinator(Node.fromKafkaModel(description.coordinator()));
344355
group.members(description.members()
345356
.stream()
@@ -432,6 +443,12 @@ public void protocol(String protocol) {
432443
attributes.protocol = protocol;
433444
}
434445

446+
public void addConfigs(Either<Map<String, ConfigEntry>, Throwable> configs) {
447+
attributes.configs = configs.ifPrimaryOrElse(
448+
Either::of,
449+
thrown -> JsonApiError.forThrowable(thrown, "Unable to describe group configs"));
450+
}
451+
435452
/**
436453
* Constructs a "cursor" ConsumerGroup from the encoded string representation of the subset
437454
* of Topic fields used to compare entities for pagination/sorting.

api/src/main/java/com/github/streamshub/console/api/service/ConfigService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public CompletionStage<Void> alterConfigs(ConfigResource.Type type, String name,
6363
}
6464

6565
CompletionStage<Map<String, Either<Map<String, ConfigEntry>, Throwable>>> describeConfigs(Admin adminClient, List<ConfigResource> keys) {
66-
Map<String, Either<Map<String, ConfigEntry>, Throwable>> result = new LinkedHashMap<>(keys.size());
66+
Map<String, Either<Map<String, ConfigEntry>, Throwable>> result = LinkedHashMap.newLinkedHashMap(keys.size());
6767

6868
var pendingDescribes = adminClient.describeConfigs(keys)
6969
.values()

api/src/main/java/com/github/streamshub/console/api/service/GroupService.java

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.kafka.common.TopicCollection;
4747
import org.apache.kafka.common.TopicPartition;
4848
import org.apache.kafka.common.Uuid;
49+
import org.apache.kafka.common.config.ConfigResource;
4950
import org.apache.kafka.common.errors.GroupIdNotFoundException;
5051
import org.apache.kafka.common.errors.GroupNotEmptyException;
5152
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -101,17 +102,20 @@ public class GroupService {
101102
@Inject
102103
PermissionService permissionService;
103104

105+
@Inject
106+
ConfigService configService;
107+
104108
@Inject
105109
TopicDescribeService topicService;
106110

107111
@Inject
108112
ValidationProxy validationService;
109113

110-
public CompletionStage<List<Group>> listGroups(List<String> includes, ListRequestContext<Group> listSupport) {
111-
return listGroups(Collections.emptyList(), includes, listSupport);
114+
public CompletionStage<List<Group>> listGroups(List<String> fields, ListRequestContext<Group> listSupport) {
115+
return listGroups(Collections.emptyList(), fields, listSupport);
112116
}
113117

114-
public CompletionStage<List<Group>> listGroups(String topicId, List<String> includes,
118+
public CompletionStage<List<Group>> listGroups(String topicId, List<String> fields,
115119
ListRequestContext<Group> listSupport) {
116120

117121
Admin adminClient = kafkaContext.admin();
@@ -131,14 +135,14 @@ public CompletionStage<List<Group>> listGroups(String topicId, List<String> incl
131135
}, asyncExec)
132136
.thenComposeAsync(topicGroups -> {
133137
if (topicGroups.containsKey(topicId)) {
134-
return listGroups(topicGroups.get(topicId), includes, listSupport);
138+
return listGroups(topicGroups.get(topicId), fields, listSupport);
135139
}
136140
return CompletableFuture.completedStage(Collections.emptyList());
137141
}, asyncExec);
138142
}
139143

140144
private CompletionStage<List<Group>> listGroups(List<String> groupIds,
141-
List<String> includes, ListRequestContext<Group> listSupport) {
145+
List<String> fields, ListRequestContext<Group> listSupport) {
142146

143147
Admin adminClient = kafkaContext.admin();
144148
Set<GroupType> types = extractFilter(listSupport, "filter[type]", GroupType::parse);
@@ -168,7 +172,7 @@ private CompletionStage<List<Group>> listGroups(List<String> groupIds,
168172
.toList(),
169173
threadContext.currentContextExecutor())
170174
.thenComposeAsync(
171-
groups -> augmentList(adminClient, groups, includes),
175+
groups -> augmentList(adminClient, groups, fields),
172176
threadContext.currentContextExecutor());
173177
}
174178

@@ -506,11 +510,11 @@ public CompletionStage<Void> deleteGroup(String requestGroupId) {
506510
.toCompletionStage();
507511
}
508512

509-
private CompletionStage<List<Group>> augmentList(Admin adminClient, List<Group> list, List<String> includes) {
513+
private CompletionStage<List<Group>> augmentList(Admin adminClient, List<Group> list, List<String> fields) {
510514
CompletableFuture<Void> describePromise;
511515

512-
if (REQUIRE_DESCRIBE.stream().anyMatch(includes::contains)) {
513-
describePromise = describeGroups(adminClient, list, includes)
516+
if (REQUIRE_DESCRIBE.stream().anyMatch(fields::contains)) {
517+
describePromise = describeGroups(adminClient, list, fields)
514518
.thenAccept(descriptions -> {
515519
Map<String, Group> groups = list.stream().collect(Collectors.toMap(Group::groupId, Function.identity()));
516520
descriptions.forEach((name, either) -> mergeDescriptions(groups.get(name), either));
@@ -546,7 +550,7 @@ private void mergeDescriptions(Group group, Either<Group, Throwable> description
546550
private CompletionStage<Map<String, Either<Group, Throwable>>> describeGroups(
547551
Admin adminClient,
548552
Collection<Group> groups,
549-
List<String> includes) {
553+
List<String> fields) {
550554

551555
Map<String, Either<Group, Throwable>> result = LinkedHashMap.newLinkedHashMap(groups.size());
552556

@@ -558,7 +562,7 @@ private CompletionStage<Map<String, Either<Group, Throwable>>> describeGroups(
558562
.<Map.Entry<String, KafkaFuture<?>>>mapMulti((group, next) -> describeGroups(adminClient,
559563
group.getKey(),
560564
group.getValue(),
561-
includes.contains(Group.Fields.AUTHORIZED_OPERATIONS),
565+
fields.contains(Group.Fields.AUTHORIZED_OPERATIONS),
562566
next))
563567
.map(entry ->
564568
entry.getValue()
@@ -587,14 +591,10 @@ private CompletionStage<Map<String, Either<Group, Throwable>>> describeGroups(
587591

588592
return CompletableFuture.allOf(pendingDescribes)
589593
.thenCompose(nothing -> pendingTopicsIds)
590-
.thenCompose(topicIds -> {
591-
if (includes.contains(Group.Fields.OFFSETS)) {
592-
return fetchOffsets(adminClient, availableGroups.get(), topicIds)
593-
.thenApply(nothing -> result);
594-
}
595-
596-
return CompletableFuture.completedFuture(result);
597-
});
594+
.thenCompose(topicIds -> CompletableFuture.allOf(
595+
maybeFetchOffsets(adminClient, availableGroups.get(), topicIds, fields),
596+
maybeDescribeConfigs(adminClient, availableGroups.get(), fields)))
597+
.thenApply(nothing -> result);
598598
}
599599

600600
private CompletableFuture<Map<String, String>> fetchTopicIdMap() {
@@ -661,7 +661,15 @@ private static String groupId(Object description) {
661661
}
662662
}
663663

664-
private CompletableFuture<Void> fetchOffsets(Admin adminClient, Map<String, Group> groups, Map<String, String> topicIds) {
664+
private CompletableFuture<Void> maybeFetchOffsets(
665+
Admin adminClient,
666+
Map<String, Group> groups, Map<String, String> topicIds,
667+
List<String> fields) {
668+
669+
if (!fields.contains(Group.Fields.OFFSETS)) {
670+
return CompletableFuture.completedFuture(null);
671+
}
672+
665673
Map<String, Either<Map<PartitionId, OffsetAndMetadata>, Throwable>> groupOffsets = new LinkedHashMap<>();
666674
Map<TopicPartition, Either<ListOffsetsResultInfo, Throwable>> topicOffsets = new LinkedHashMap<>();
667675

@@ -905,4 +913,29 @@ private void addOffsets(Group group,
905913
group.offsets(offsets);
906914
}
907915
}
916+
917+
CompletableFuture<Void> maybeDescribeConfigs(Admin adminClient, Map<String, Group> groups, List<String> fields) {
918+
if (!fields.contains(Group.Fields.CONFIGS)) {
919+
return CompletableFuture.completedFuture(null);
920+
}
921+
922+
List<ConfigResource> keys = groups.values()
923+
.stream()
924+
.map(Group::groupId)
925+
.filter(groupId -> {
926+
if (permissionService.permitted(ResourceTypes.Kafka.GROUP_CONFIGS, Privilege.GET, groupId)) {
927+
return true;
928+
}
929+
var error = permissionService.forbidden(ResourceTypes.Kafka.GROUP_CONFIGS, Privilege.GET, groupId);
930+
groups.get(groupId).addConfigs(Either.ofAlternate(error));
931+
return false;
932+
})
933+
.map(groupId -> new ConfigResource(ConfigResource.Type.GROUP, groupId))
934+
.toList();
935+
936+
return configService.describeConfigs(adminClient, keys)
937+
.thenAccept(configs ->
938+
configs.forEach((groupId, either) -> groups.get(groupId).addConfigs(either)))
939+
.toCompletableFuture();
940+
}
908941
}

0 commit comments

Comments
 (0)