Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public CompletionStage<Response> listGroups(
Group.Fields.OFFSETS,
Group.Fields.AUTHORIZED_OPERATIONS,
Group.Fields.COORDINATOR,
Group.Fields.PARTITION_ASSIGNOR
Group.Fields.PARTITION_ASSIGNOR,
Group.Fields.CONFIGS,
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
Expand All @@ -110,7 +111,8 @@ public CompletionStage<Response> listGroups(
Group.Fields.OFFSETS,
Group.Fields.AUTHORIZED_OPERATIONS,
Group.Fields.COORDINATOR,
Group.Fields.PARTITION_ASSIGNOR
Group.Fields.PARTITION_ASSIGNOR,
Group.Fields.CONFIGS,
}))
List<String> fields,

Expand Down Expand Up @@ -168,7 +170,8 @@ public CompletionStage<Response> describeGroup(
Group.Fields.OFFSETS,
Group.Fields.AUTHORIZED_OPERATIONS,
Group.Fields.COORDINATOR,
Group.Fields.PARTITION_ASSIGNOR
Group.Fields.PARTITION_ASSIGNOR,
Group.Fields.CONFIGS,
},
payload = ErrorCategory.InvalidQueryParameter.class)
@Parameter(
Expand All @@ -187,7 +190,8 @@ public CompletionStage<Response> describeGroup(
Group.Fields.OFFSETS,
Group.Fields.AUTHORIZED_OPERATIONS,
Group.Fields.COORDINATOR,
Group.Fields.PARTITION_ASSIGNOR
Group.Fields.PARTITION_ASSIGNOR,
Group.Fields.CONFIGS,
}))
List<String> fields) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

Expand All @@ -15,6 +16,7 @@
import jakarta.json.JsonObjectBuilder;
import jakarta.validation.Valid;

import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.eclipse.microprofile.openapi.annotations.media.Schema;

Expand Down Expand Up @@ -65,6 +67,7 @@ public static final class Fields {
public static final String PARTITION_ASSIGNOR = "partitionAssignor";
public static final String OFFSETS = "offsets";
public static final String SIMPLE_CONSUMER_GROUP = "simpleConsumerGroup";
public static final String CONFIGS = "configs";

static final Comparator<Group> ID_COMPARATOR =
comparing(Group::groupId);
Expand All @@ -91,8 +94,8 @@ STATE, nullsLast(comparing(Group::state)),
", " + STATE +
", " + MEMBERS +
", " + COORDINATOR +
", " + OFFSETS
+ ", " + SIMPLE_CONSUMER_GROUP;
", " + OFFSETS +
", " + SIMPLE_CONSUMER_GROUP;

private Fields() {
// Prevent instances
Expand Down Expand Up @@ -172,6 +175,10 @@ public static class Attributes {
@JsonProperty
List<String> authorizedOperations;

@JsonProperty
@Schema(implementation = Object.class, oneOf = { ConfigEntry.ConfigEntryMap.class, JsonApiError.class })
Either<Map<String, ConfigEntry>, JsonApiError> configs;

// Available via list offsets operation only
@JsonProperty
List<@Valid OffsetAndMetadata> offsets = Collections.emptyList();
Expand Down Expand Up @@ -267,6 +274,7 @@ private static Group fromKafkaModel(
Optional.ofNullable(description.state()).map(Enum::name).orElse(null));

group.type(org.apache.kafka.common.GroupType.CLASSIC.toString());
group.protocol(description.protocol());
group.coordinator(Node.fromKafkaModel(description.coordinator()));
group.members(description.members()
.stream()
Expand All @@ -291,6 +299,7 @@ private static Group fromKafkaModel(
Optional.ofNullable(description.groupState()).map(Enum::name).orElse(null));

group.type(description.type().toString());
group.protocol(GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
group.partitionAssignor(description.partitionAssignor());
group.coordinator(Node.fromKafkaModel(description.coordinator()));
group.members(description.members()
Expand All @@ -316,6 +325,7 @@ private static Group fromKafkaModel(
Optional.ofNullable(description.groupState()).map(Enum::name).orElse(null));

group.type(org.apache.kafka.common.GroupType.SHARE.toString());
group.protocol(org.apache.kafka.common.GroupType.SHARE.name().toLowerCase(Locale.ROOT));
group.coordinator(Node.fromKafkaModel(description.coordinator()));
group.members(description.members()
.stream()
Expand All @@ -340,6 +350,7 @@ private static Group fromKafkaModel(
Optional.ofNullable(description.groupState()).map(Enum::name).orElse(null));

group.type(org.apache.kafka.common.GroupType.STREAMS.toString());
group.protocol(org.apache.kafka.common.GroupType.STREAMS.name().toLowerCase(Locale.ROOT));
group.coordinator(Node.fromKafkaModel(description.coordinator()));
group.members(description.members()
.stream()
Expand Down Expand Up @@ -432,6 +443,12 @@ public void protocol(String protocol) {
attributes.protocol = protocol;
}

public void addConfigs(Either<Map<String, ConfigEntry>, Throwable> configs) {
attributes.configs = configs.ifPrimaryOrElse(
Either::of,
thrown -> JsonApiError.forThrowable(thrown, "Unable to describe group configs"));
}

/**
* Constructs a "cursor" ConsumerGroup from the encoded string representation of the subset
* of Topic fields used to compare entities for pagination/sorting.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public CompletionStage<Void> alterConfigs(ConfigResource.Type type, String name,
}

CompletionStage<Map<String, Either<Map<String, ConfigEntry>, Throwable>>> describeConfigs(Admin adminClient, List<ConfigResource> keys) {
Map<String, Either<Map<String, ConfigEntry>, Throwable>> result = new LinkedHashMap<>(keys.size());
Map<String, Either<Map<String, ConfigEntry>, Throwable>> result = LinkedHashMap.newLinkedHashMap(keys.size());

var pendingDescribes = adminClient.describeConfigs(keys)
.values()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.common.TopicCollection;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
Expand Down Expand Up @@ -101,17 +102,20 @@ public class GroupService {
@Inject
PermissionService permissionService;

@Inject
ConfigService configService;

@Inject
TopicDescribeService topicService;

@Inject
ValidationProxy validationService;

public CompletionStage<List<Group>> listGroups(List<String> includes, ListRequestContext<Group> listSupport) {
return listGroups(Collections.emptyList(), includes, listSupport);
public CompletionStage<List<Group>> listGroups(List<String> fields, ListRequestContext<Group> listSupport) {
return listGroups(Collections.emptyList(), fields, listSupport);
}

public CompletionStage<List<Group>> listGroups(String topicId, List<String> includes,
public CompletionStage<List<Group>> listGroups(String topicId, List<String> fields,
ListRequestContext<Group> listSupport) {

Admin adminClient = kafkaContext.admin();
Expand All @@ -131,14 +135,14 @@ public CompletionStage<List<Group>> listGroups(String topicId, List<String> incl
}, asyncExec)
.thenComposeAsync(topicGroups -> {
if (topicGroups.containsKey(topicId)) {
return listGroups(topicGroups.get(topicId), includes, listSupport);
return listGroups(topicGroups.get(topicId), fields, listSupport);
}
return CompletableFuture.completedStage(Collections.emptyList());
}, asyncExec);
}

private CompletionStage<List<Group>> listGroups(List<String> groupIds,
List<String> includes, ListRequestContext<Group> listSupport) {
List<String> fields, ListRequestContext<Group> listSupport) {

Admin adminClient = kafkaContext.admin();
Set<GroupType> types = extractFilter(listSupport, "filter[type]", GroupType::parse);
Expand Down Expand Up @@ -168,7 +172,7 @@ private CompletionStage<List<Group>> listGroups(List<String> groupIds,
.toList(),
threadContext.currentContextExecutor())
.thenComposeAsync(
groups -> augmentList(adminClient, groups, includes),
groups -> augmentList(adminClient, groups, fields),
threadContext.currentContextExecutor());
}

Expand Down Expand Up @@ -506,11 +510,11 @@ public CompletionStage<Void> deleteGroup(String requestGroupId) {
.toCompletionStage();
}

private CompletionStage<List<Group>> augmentList(Admin adminClient, List<Group> list, List<String> includes) {
private CompletionStage<List<Group>> augmentList(Admin adminClient, List<Group> list, List<String> fields) {
CompletableFuture<Void> describePromise;

if (REQUIRE_DESCRIBE.stream().anyMatch(includes::contains)) {
describePromise = describeGroups(adminClient, list, includes)
if (REQUIRE_DESCRIBE.stream().anyMatch(fields::contains)) {
describePromise = describeGroups(adminClient, list, fields)
.thenAccept(descriptions -> {
Map<String, Group> groups = list.stream().collect(Collectors.toMap(Group::groupId, Function.identity()));
descriptions.forEach((name, either) -> mergeDescriptions(groups.get(name), either));
Expand Down Expand Up @@ -546,7 +550,7 @@ private void mergeDescriptions(Group group, Either<Group, Throwable> description
private CompletionStage<Map<String, Either<Group, Throwable>>> describeGroups(
Admin adminClient,
Collection<Group> groups,
List<String> includes) {
List<String> fields) {

Map<String, Either<Group, Throwable>> result = LinkedHashMap.newLinkedHashMap(groups.size());

Expand All @@ -558,7 +562,7 @@ private CompletionStage<Map<String, Either<Group, Throwable>>> describeGroups(
.<Map.Entry<String, KafkaFuture<?>>>mapMulti((group, next) -> describeGroups(adminClient,
group.getKey(),
group.getValue(),
includes.contains(Group.Fields.AUTHORIZED_OPERATIONS),
fields.contains(Group.Fields.AUTHORIZED_OPERATIONS),
next))
.map(entry ->
entry.getValue()
Expand Down Expand Up @@ -587,14 +591,10 @@ private CompletionStage<Map<String, Either<Group, Throwable>>> describeGroups(

return CompletableFuture.allOf(pendingDescribes)
.thenCompose(nothing -> pendingTopicsIds)
.thenCompose(topicIds -> {
if (includes.contains(Group.Fields.OFFSETS)) {
return fetchOffsets(adminClient, availableGroups.get(), topicIds)
.thenApply(nothing -> result);
}

return CompletableFuture.completedFuture(result);
});
.thenCompose(topicIds -> CompletableFuture.allOf(
maybeFetchOffsets(adminClient, availableGroups.get(), topicIds, fields),
maybeDescribeConfigs(adminClient, availableGroups.get(), fields)))
.thenApply(nothing -> result);
}

private CompletableFuture<Map<String, String>> fetchTopicIdMap() {
Expand Down Expand Up @@ -661,7 +661,15 @@ private static String groupId(Object description) {
}
}

private CompletableFuture<Void> fetchOffsets(Admin adminClient, Map<String, Group> groups, Map<String, String> topicIds) {
private CompletableFuture<Void> maybeFetchOffsets(
Admin adminClient,
Map<String, Group> groups, Map<String, String> topicIds,
List<String> fields) {

if (!fields.contains(Group.Fields.OFFSETS)) {
return CompletableFuture.completedFuture(null);
}

Map<String, Either<Map<PartitionId, OffsetAndMetadata>, Throwable>> groupOffsets = new LinkedHashMap<>();
Map<TopicPartition, Either<ListOffsetsResultInfo, Throwable>> topicOffsets = new LinkedHashMap<>();

Expand Down Expand Up @@ -905,4 +913,29 @@ private void addOffsets(Group group,
group.offsets(offsets);
}
}

CompletableFuture<Void> maybeDescribeConfigs(Admin adminClient, Map<String, Group> groups, List<String> fields) {
if (!fields.contains(Group.Fields.CONFIGS)) {
return CompletableFuture.completedFuture(null);
}

List<ConfigResource> keys = groups.values()
.stream()
.map(Group::groupId)
.filter(groupId -> {
if (permissionService.permitted(ResourceTypes.Kafka.GROUP_CONFIGS, Privilege.GET, groupId)) {
return true;
}
var error = permissionService.forbidden(ResourceTypes.Kafka.GROUP_CONFIGS, Privilege.GET, groupId);
groups.get(groupId).addConfigs(Either.ofAlternate(error));
return false;
})
.map(groupId -> new ConfigResource(ConfigResource.Type.GROUP, groupId))
.toList();

return configService.describeConfigs(adminClient, keys)
.thenAccept(configs ->
configs.forEach((groupId, either) -> groups.get(groupId).addConfigs(either)))
.toCompletableFuture();
}
}
Loading