Skip to content

Commit 5ec7b53

Browse files
committed
feat: implement deleteConsumerGroupOffsets
1 parent 92ce39c commit 5ec7b53

File tree

3 files changed

+45
-0
lines changed

3 files changed

+45
-0
lines changed

api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
import io.kafbat.ui.model.PartitionOffsetDTO;
1717
import io.kafbat.ui.model.SortOrderDTO;
1818
import io.kafbat.ui.model.rbac.AccessContext;
19+
import io.kafbat.ui.model.rbac.permission.ConnectAction;
1920
import io.kafbat.ui.model.rbac.permission.TopicAction;
2021
import io.kafbat.ui.service.ConsumerGroupService;
2122
import io.kafbat.ui.service.OffsetsResetService;
2223
import java.util.Map;
2324
import java.util.Optional;
2425
import java.util.function.Supplier;
26+
import java.util.stream.Collectors;
2527
import lombok.RequiredArgsConstructor;
2628
import lombok.extern.slf4j.Slf4j;
2729
import org.springframework.beans.factory.annotation.Value;
@@ -59,6 +61,22 @@ public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName,
5961
.thenReturn(ResponseEntity.ok().build());
6062
}
6163

64+
@Override
65+
public Mono<ResponseEntity<Void>> deleteConsumerGroupOffsets(String clusterName, String groupId, String topicName,
66+
ServerWebExchange exchange) {
67+
var context = AccessContext.builder()
68+
.cluster(clusterName)
69+
.consumerGroupActions(groupId, RESET_OFFSETS)
70+
.topicActions(topicName, TopicAction.VIEW)
71+
.operationName("deleteConsumerGroupOffsets")
72+
.build();
73+
74+
return validateAccess(context)
75+
.then(consumerGroupService.deleteConsumerGroupOffset(getCluster(clusterName), groupId, topicName))
76+
.doOnEach(sig -> audit(context, sig))
77+
.thenReturn(ResponseEntity.ok().build());
78+
}
79+
6280
@Override
6381
public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String clusterName,
6482
String consumerGroupId,

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.List;
1919
import java.util.Map;
2020
import java.util.Properties;
21+
import java.util.Set;
2122
import java.util.function.ToIntFunction;
2223
import java.util.stream.Collectors;
2324
import java.util.stream.Stream;
@@ -247,6 +248,13 @@ public Mono<Void> deleteConsumerGroupById(KafkaCluster cluster,
247248
.flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
248249
}
249250

251+
public Mono<Void> deleteConsumerGroupOffset(KafkaCluster cluster,
252+
String groupId,
253+
String topicName) {
254+
return adminClientService.get(cluster)
255+
.flatMap(adminClient -> adminClient.deleteConsumerGroupOffsets(groupId, topicName));
256+
}
257+
250258
public EnhancedConsumer createConsumer(KafkaCluster cluster) {
251259
return createConsumer(cluster, Map.of());
252260
}

api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.kafka.clients.admin.DescribeClusterOptions;
5050
import org.apache.kafka.clients.admin.DescribeClusterResult;
5151
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
52+
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
5253
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
5354
import org.apache.kafka.clients.admin.ListOffsetsResult;
5455
import org.apache.kafka.clients.admin.ListTopicsOptions;
@@ -74,6 +75,7 @@
7475
import org.apache.kafka.common.errors.ClusterAuthorizationException;
7576
import org.apache.kafka.common.errors.GroupIdNotFoundException;
7677
import org.apache.kafka.common.errors.GroupNotEmptyException;
78+
import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
7779
import org.apache.kafka.common.errors.InvalidRequestException;
7880
import org.apache.kafka.common.errors.SecurityDisabledException;
7981
import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -436,6 +438,23 @@ public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
436438
th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
437439
}
438440

441+
public Mono<Void> deleteConsumerGroupOffsets(String groupId, String topicName) {
442+
return listConsumerGroupOffsets(List.of(groupId), null)
443+
.flatMap(table -> {
444+
// filter TopicPartitions by topicName
445+
Set<TopicPartition> partitions = table.row(groupId).keySet().stream()
446+
.filter(tp -> tp.topic().equals(topicName))
447+
.collect(Collectors.toSet());
448+
return toMono(client.deleteConsumerGroupOffsets(groupId, partitions).all());
449+
})
450+
.onErrorResume(GroupIdNotFoundException.class,
451+
th -> Mono.error(new NotFoundException("The group id does not exist")))
452+
.onErrorResume(UnknownTopicOrPartitionException.class,
453+
th -> Mono.error(new NotFoundException("The topic or partition is unknown")))
454+
.onErrorResume(GroupSubscribedToTopicException.class,
455+
th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
456+
}
457+
439458
public Mono<Void> createTopic(String name,
440459
int numPartitions,
441460
@Nullable Integer replicationFactor,

0 commit comments

Comments
 (0)