Skip to content

Commit 5b59283

Browse files
committed
Add display of group configuration
Signed-off-by: Michael Edgar <medgar@redhat.com>
1 parent 3489abd commit 5b59283

File tree

37 files changed

+603
-103
lines changed

37 files changed

+603
-103
lines changed

api/src/main/java/com/github/streamshub/console/api/GroupsResource.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ public CompletionStage<Response> listGroups(
9191
Group.Fields.OFFSETS,
9292
Group.Fields.AUTHORIZED_OPERATIONS,
9393
Group.Fields.COORDINATOR,
94-
Group.Fields.PARTITION_ASSIGNOR
94+
Group.Fields.PARTITION_ASSIGNOR,
95+
Group.Fields.CONFIGS,
9596
},
9697
payload = ErrorCategory.InvalidQueryParameter.class)
9798
@Parameter(
@@ -110,7 +111,8 @@ public CompletionStage<Response> listGroups(
110111
Group.Fields.OFFSETS,
111112
Group.Fields.AUTHORIZED_OPERATIONS,
112113
Group.Fields.COORDINATOR,
113-
Group.Fields.PARTITION_ASSIGNOR
114+
Group.Fields.PARTITION_ASSIGNOR,
115+
Group.Fields.CONFIGS,
114116
}))
115117
List<String> fields,
116118

@@ -168,7 +170,8 @@ public CompletionStage<Response> describeGroup(
168170
Group.Fields.OFFSETS,
169171
Group.Fields.AUTHORIZED_OPERATIONS,
170172
Group.Fields.COORDINATOR,
171-
Group.Fields.PARTITION_ASSIGNOR
173+
Group.Fields.PARTITION_ASSIGNOR,
174+
Group.Fields.CONFIGS,
172175
},
173176
payload = ErrorCategory.InvalidQueryParameter.class)
174177
@Parameter(
@@ -187,7 +190,8 @@ public CompletionStage<Response> describeGroup(
187190
Group.Fields.OFFSETS,
188191
Group.Fields.AUTHORIZED_OPERATIONS,
189192
Group.Fields.COORDINATOR,
190-
Group.Fields.PARTITION_ASSIGNOR
193+
Group.Fields.PARTITION_ASSIGNOR,
194+
Group.Fields.CONFIGS,
191195
}))
192196
List<String> fields) {
193197

api/src/main/java/com/github/streamshub/console/api/model/Group.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Collections;
88
import java.util.Comparator;
99
import java.util.List;
10+
import java.util.Locale;
1011
import java.util.Map;
1112
import java.util.Optional;
1213

@@ -15,6 +16,7 @@
1516
import jakarta.json.JsonObjectBuilder;
1617
import jakarta.validation.Valid;
1718

19+
import org.apache.kafka.clients.consumer.GroupProtocol;
1820
import org.apache.kafka.common.errors.GroupIdNotFoundException;
1921
import org.eclipse.microprofile.openapi.annotations.media.Schema;
2022

@@ -65,6 +67,7 @@ public static final class Fields {
6567
public static final String PARTITION_ASSIGNOR = "partitionAssignor";
6668
public static final String OFFSETS = "offsets";
6769
public static final String SIMPLE_CONSUMER_GROUP = "simpleConsumerGroup";
70+
public static final String CONFIGS = "configs";
6871

6972
static final Comparator<Group> ID_COMPARATOR =
7073
comparing(Group::groupId);
@@ -172,6 +175,10 @@ public static class Attributes {
172175
@JsonProperty
173176
List<String> authorizedOperations;
174177

178+
@JsonProperty
179+
@Schema(implementation = Object.class, oneOf = { ConfigEntry.ConfigEntryMap.class, JsonApiError.class })
180+
Either<Map<String, ConfigEntry>, JsonApiError> configs;
181+
175182
// Available via list offsets operation only
176183
@JsonProperty
177184
List<@Valid OffsetAndMetadata> offsets = Collections.emptyList();
@@ -267,6 +274,7 @@ private static Group fromKafkaModel(
267274
Optional.ofNullable(description.state()).map(Enum::name).orElse(null));
268275

269276
group.type(org.apache.kafka.common.GroupType.CLASSIC.toString());
277+
group.protocol(description.protocol());
270278
group.coordinator(Node.fromKafkaModel(description.coordinator()));
271279
group.members(description.members()
272280
.stream()
@@ -291,6 +299,7 @@ private static Group fromKafkaModel(
291299
Optional.ofNullable(description.groupState()).map(Enum::name).orElse(null));
292300

293301
group.type(description.type().toString());
302+
group.protocol(GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
294303
group.partitionAssignor(description.partitionAssignor());
295304
group.coordinator(Node.fromKafkaModel(description.coordinator()));
296305
group.members(description.members()
@@ -316,6 +325,7 @@ private static Group fromKafkaModel(
316325
Optional.ofNullable(description.groupState()).map(Enum::name).orElse(null));
317326

318327
group.type(org.apache.kafka.common.GroupType.SHARE.toString());
328+
group.protocol(org.apache.kafka.common.GroupType.SHARE.name().toLowerCase(Locale.ROOT));
319329
group.coordinator(Node.fromKafkaModel(description.coordinator()));
320330
group.members(description.members()
321331
.stream()
@@ -340,6 +350,7 @@ private static Group fromKafkaModel(
340350
Optional.ofNullable(description.groupState()).map(Enum::name).orElse(null));
341351

342352
group.type(org.apache.kafka.common.GroupType.STREAMS.toString());
353+
group.protocol(org.apache.kafka.common.GroupType.STREAMS.name().toLowerCase(Locale.ROOT));
343354
group.coordinator(Node.fromKafkaModel(description.coordinator()));
344355
group.members(description.members()
345356
.stream()
@@ -432,6 +443,12 @@ public void protocol(String protocol) {
432443
attributes.protocol = protocol;
433444
}
434445

446+
public void addConfigs(Either<Map<String, ConfigEntry>, Throwable> configs) {
447+
attributes.configs = configs.ifPrimaryOrElse(
448+
Either::of,
449+
thrown -> JsonApiError.forThrowable(thrown, "Unable to describe group configs"));
450+
}
451+
435452
/**
436453
* Constructs a "cursor" ConsumerGroup from the encoded string representation of the subset
437454
* of Topic fields used to compare entities for pagination/sorting.

api/src/main/java/com/github/streamshub/console/api/service/ConfigService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public CompletionStage<Void> alterConfigs(ConfigResource.Type type, String name,
6363
}
6464

6565
CompletionStage<Map<String, Either<Map<String, ConfigEntry>, Throwable>>> describeConfigs(Admin adminClient, List<ConfigResource> keys) {
66-
Map<String, Either<Map<String, ConfigEntry>, Throwable>> result = new LinkedHashMap<>(keys.size());
66+
Map<String, Either<Map<String, ConfigEntry>, Throwable>> result = LinkedHashMap.newLinkedHashMap(keys.size());
6767

6868
var pendingDescribes = adminClient.describeConfigs(keys)
6969
.values()

api/src/main/java/com/github/streamshub/console/api/service/GroupService.java

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.kafka.common.TopicCollection;
4747
import org.apache.kafka.common.TopicPartition;
4848
import org.apache.kafka.common.Uuid;
49+
import org.apache.kafka.common.config.ConfigResource;
4950
import org.apache.kafka.common.errors.GroupIdNotFoundException;
5051
import org.apache.kafka.common.errors.GroupNotEmptyException;
5152
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -101,17 +102,20 @@ public class GroupService {
101102
@Inject
102103
PermissionService permissionService;
103104

105+
@Inject
106+
ConfigService configService;
107+
104108
@Inject
105109
TopicDescribeService topicService;
106110

107111
@Inject
108112
ValidationProxy validationService;
109113

110-
public CompletionStage<List<Group>> listGroups(List<String> includes, ListRequestContext<Group> listSupport) {
111-
return listGroups(Collections.emptyList(), includes, listSupport);
114+
public CompletionStage<List<Group>> listGroups(List<String> fields, ListRequestContext<Group> listSupport) {
115+
return listGroups(Collections.emptyList(), fields, listSupport);
112116
}
113117

114-
public CompletionStage<List<Group>> listGroups(String topicId, List<String> includes,
118+
public CompletionStage<List<Group>> listGroups(String topicId, List<String> fields,
115119
ListRequestContext<Group> listSupport) {
116120

117121
Admin adminClient = kafkaContext.admin();
@@ -131,14 +135,14 @@ public CompletionStage<List<Group>> listGroups(String topicId, List<String> incl
131135
}, asyncExec)
132136
.thenComposeAsync(topicGroups -> {
133137
if (topicGroups.containsKey(topicId)) {
134-
return listGroups(topicGroups.get(topicId), includes, listSupport);
138+
return listGroups(topicGroups.get(topicId), fields, listSupport);
135139
}
136140
return CompletableFuture.completedStage(Collections.emptyList());
137141
}, asyncExec);
138142
}
139143

140144
private CompletionStage<List<Group>> listGroups(List<String> groupIds,
141-
List<String> includes, ListRequestContext<Group> listSupport) {
145+
List<String> fields, ListRequestContext<Group> listSupport) {
142146

143147
Admin adminClient = kafkaContext.admin();
144148
Set<GroupType> types = extractFilter(listSupport, "filter[type]", GroupType::parse);
@@ -168,7 +172,7 @@ private CompletionStage<List<Group>> listGroups(List<String> groupIds,
168172
.toList(),
169173
threadContext.currentContextExecutor())
170174
.thenComposeAsync(
171-
groups -> augmentList(adminClient, groups, includes),
175+
groups -> augmentList(adminClient, groups, fields),
172176
threadContext.currentContextExecutor());
173177
}
174178

@@ -506,11 +510,11 @@ public CompletionStage<Void> deleteGroup(String requestGroupId) {
506510
.toCompletionStage();
507511
}
508512

509-
private CompletionStage<List<Group>> augmentList(Admin adminClient, List<Group> list, List<String> includes) {
513+
private CompletionStage<List<Group>> augmentList(Admin adminClient, List<Group> list, List<String> fields) {
510514
CompletableFuture<Void> describePromise;
511515

512-
if (REQUIRE_DESCRIBE.stream().anyMatch(includes::contains)) {
513-
describePromise = describeGroups(adminClient, list, includes)
516+
if (REQUIRE_DESCRIBE.stream().anyMatch(fields::contains)) {
517+
describePromise = describeGroups(adminClient, list, fields)
514518
.thenAccept(descriptions -> {
515519
Map<String, Group> groups = list.stream().collect(Collectors.toMap(Group::groupId, Function.identity()));
516520
descriptions.forEach((name, either) -> mergeDescriptions(groups.get(name), either));
@@ -546,7 +550,7 @@ private void mergeDescriptions(Group group, Either<Group, Throwable> description
546550
private CompletionStage<Map<String, Either<Group, Throwable>>> describeGroups(
547551
Admin adminClient,
548552
Collection<Group> groups,
549-
List<String> includes) {
553+
List<String> fields) {
550554

551555
Map<String, Either<Group, Throwable>> result = LinkedHashMap.newLinkedHashMap(groups.size());
552556

@@ -558,7 +562,7 @@ private CompletionStage<Map<String, Either<Group, Throwable>>> describeGroups(
558562
.<Map.Entry<String, KafkaFuture<?>>>mapMulti((group, next) -> describeGroups(adminClient,
559563
group.getKey(),
560564
group.getValue(),
561-
includes.contains(Group.Fields.AUTHORIZED_OPERATIONS),
565+
fields.contains(Group.Fields.AUTHORIZED_OPERATIONS),
562566
next))
563567
.map(entry ->
564568
entry.getValue()
@@ -587,14 +591,10 @@ private CompletionStage<Map<String, Either<Group, Throwable>>> describeGroups(
587591

588592
return CompletableFuture.allOf(pendingDescribes)
589593
.thenCompose(nothing -> pendingTopicsIds)
590-
.thenCompose(topicIds -> {
591-
if (includes.contains(Group.Fields.OFFSETS)) {
592-
return fetchOffsets(adminClient, availableGroups.get(), topicIds)
593-
.thenApply(nothing -> result);
594-
}
595-
596-
return CompletableFuture.completedFuture(result);
597-
});
594+
.thenCompose(topicIds -> CompletableFuture.allOf(
595+
maybeFetchOffsets(adminClient, availableGroups.get(), topicIds, fields),
596+
maybeDescribeConfigs(adminClient, availableGroups.get(), fields)))
597+
.thenApply(nothing -> result);
598598
}
599599

600600
private CompletableFuture<Map<String, String>> fetchTopicIdMap() {
@@ -661,7 +661,15 @@ private static String groupId(Object description) {
661661
}
662662
}
663663

664-
private CompletableFuture<Void> fetchOffsets(Admin adminClient, Map<String, Group> groups, Map<String, String> topicIds) {
664+
private CompletableFuture<Void> maybeFetchOffsets(
665+
Admin adminClient,
666+
Map<String, Group> groups, Map<String, String> topicIds,
667+
List<String> fields) {
668+
669+
if (!fields.contains(Group.Fields.OFFSETS)) {
670+
return CompletableFuture.completedFuture(null);
671+
}
672+
665673
Map<String, Either<Map<PartitionId, OffsetAndMetadata>, Throwable>> groupOffsets = new LinkedHashMap<>();
666674
Map<TopicPartition, Either<ListOffsetsResultInfo, Throwable>> topicOffsets = new LinkedHashMap<>();
667675

@@ -905,4 +913,29 @@ private void addOffsets(Group group,
905913
group.offsets(offsets);
906914
}
907915
}
916+
917+
CompletableFuture<Void> maybeDescribeConfigs(Admin adminClient, Map<String, Group> groups, List<String> fields) {
918+
if (fields.contains(Group.Fields.CONFIGS)) {
919+
List<ConfigResource> keys = groups.values()
920+
.stream()
921+
.map(Group::groupId)
922+
.filter(groupId -> {
923+
if (permissionService.permitted(ResourceTypes.Kafka.GROUP_CONFIGS, Privilege.GET, groupId)) {
924+
return true;
925+
}
926+
var error = permissionService.forbidden(ResourceTypes.Kafka.GROUP_CONFIGS, Privilege.GET, groupId);
927+
groups.get(groupId).addConfigs(Either.ofAlternate(error));
928+
return false;
929+
})
930+
.map(groupId -> new ConfigResource(ConfigResource.Type.GROUP, groupId))
931+
.toList();
932+
933+
return configService.describeConfigs(adminClient, keys)
934+
.thenAccept(configs ->
935+
configs.forEach((groupId, either) -> groups.get(groupId).addConfigs(either)))
936+
.toCompletableFuture();
937+
}
938+
939+
return CompletableFuture.completedFuture(null);
940+
}
908941
}

common/src/main/java/com/github/streamshub/console/config/security/ResourceTypes.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,20 @@ public boolean includes(String value) {
7474
return value().equals(value) || "consumerGroups".equals(value);
7575
}
7676
},
77-
NODE_CONFIGS("nodes/configs"),
77+
GROUP_CONFIGS("groups/configs"),
78+
//
7879
NODES("nodes"),
80+
NODE_CONFIGS("nodes/configs"),
7981
NODE_METRICS("nodes/metrics"),
82+
//
8083
REBALANCES("rebalances"),
84+
//
8185
TOPICS("topics"),
8286
TOPIC_RECORDS("topics/records"),
8387
TOPIC_METRICS("topics/metrics"),
88+
//
8489
USERS("users"),
90+
//
8591
ALL("*") {
8692
@Override
8793
public Set<ResourceType<Kafka>> expand() {

ui/api/groups/actions.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,19 @@ import { filterUndefinedFromObj } from "@/utils/filterUndefinedFromObj";
2020
export async function getConsumerGroup(
2121
kafkaId: string,
2222
groupId: string,
23+
params?: {
24+
fields?: string;
25+
},
2326
): Promise<ApiResponse<ConsumerGroup>> {
27+
const sp = new URLSearchParams(
28+
filterUndefinedFromObj({
29+
"fields[groups]": params?.fields,
30+
}),
31+
);
32+
2433
return fetchData(
2534
`/api/kafkas/${kafkaId}/groups/${groupId}`,
26-
"",
35+
sp,
2736
(rawData) => ConsumerGroupResponseSchema.parse(rawData).data,
2837
);
2938
}

ui/api/groups/schema.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,16 @@ const MemberDescriptionSchema = z.object({
4747
});
4848
export type MemberDescription = z.infer<typeof MemberDescriptionSchema>;
4949

50+
const ConfigSchema = z.object({
51+
value: z.string(),
52+
source: z.string(),
53+
sensitive: z.boolean(),
54+
readOnly: z.boolean(),
55+
type: z.string(),
56+
});
57+
const ConfigMapSchema = z.record(z.string(), ConfigSchema);
58+
export type ConfigMap = z.infer<typeof ConfigMapSchema>;
59+
5060
export const ConsumerGroupSchema = z.object({
5161
id: z.string(),
5262
type: z.literal("groups"),
@@ -66,6 +76,7 @@ export const ConsumerGroupSchema = z.object({
6676
partitionAssignor: z.string().nullable().optional(),
6777
coordinator: NodeSchema.nullable().optional(),
6878
authorizedOperations: z.array(z.string()).nullable().nullable().optional(),
79+
configs: z.union([ ConfigMapSchema, ApiErrorSchema ]).optional(),
6980
offsets: z.array(OffsetAndMetadataSchema).nullable().optional(),
7081
errors: z.array(ApiErrorSchema).optional(),
7182
}),

ui/app/[locale]/(authorized)/kafka/[kafkaId]/@activeBreadcrumb/groups/[groupId]/page.tsx renamed to ui/app/[locale]/(authorized)/kafka/[kafkaId]/@activeBreadcrumb/groups/[groupId]/GroupBreadcrumb.tsx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { getConsumerGroup } from "@/api/groups/actions";
2-
import { KafkaConsumerGroupMembersParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/groups/[groupId]/KafkaConsumerGroupMembers.params";
2+
import { GroupParams } from "@/app/[locale]/(authorized)/kafka/[kafkaId]/groups/[groupId]/Group.params";
33
import { BreadcrumbLink } from "@/components/Navigation/BreadcrumbLink";
44
import RichText from "@/components/RichText";
55
import { NoDataErrorState } from "@/components/NoDataErrorState";
@@ -12,10 +12,10 @@ import {
1212
import { HomeIcon } from "@/libs/patternfly/react-icons";
1313
import { getTranslations } from "next-intl/server";
1414

15-
export default async function ConsumerGroupsActiveBreadcrumb({
15+
export async function GroupBreadcrumb({
1616
params: { groupId, kafkaId },
1717
}: {
18-
params: KafkaConsumerGroupMembersParams;
18+
params: GroupParams;
1919
}) {
2020
const t = await getTranslations();
2121
const consumerGroup = (await getConsumerGroup(kafkaId, groupId));
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export {
2+
GroupBreadcrumb as default
3+
} from "@/app/[locale]/(authorized)/kafka/[kafkaId]/@activeBreadcrumb/groups/[groupId]/GroupBreadcrumb";
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export {
2+
GroupBreadcrumb as default
3+
} from "@/app/[locale]/(authorized)/kafka/[kafkaId]/@activeBreadcrumb/groups/[groupId]/GroupBreadcrumb";

0 commit comments

Comments
 (0)