|
74 | 74 | import org.apache.kafka.common.errors.ClusterAuthorizationException; |
75 | 75 | import org.apache.kafka.common.errors.GroupIdNotFoundException; |
76 | 76 | import org.apache.kafka.common.errors.GroupNotEmptyException; |
| 77 | +import org.apache.kafka.common.errors.GroupSubscribedToTopicException; |
77 | 78 | import org.apache.kafka.common.errors.InvalidRequestException; |
78 | 79 | import org.apache.kafka.common.errors.SecurityDisabledException; |
79 | 80 | import org.apache.kafka.common.errors.TopicAuthorizationException; |
@@ -436,6 +437,27 @@ public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) { |
436 | 437 | th -> Mono.error(new IllegalEntityStateException("The group is not empty"))); |
437 | 438 | } |
438 | 439 |
|
| 440 | + public Mono<Void> deleteConsumerGroupOffsets(String groupId, String topicName) { |
| 441 | + return listConsumerGroupOffsets(List.of(groupId), null) |
| 442 | + .flatMap(table -> { |
| 443 | + // filter TopicPartitions by topicName |
| 444 | + Set<TopicPartition> partitions = table.row(groupId).keySet().stream() |
| 445 | + .filter(tp -> tp.topic().equals(topicName)) |
| 446 | + .collect(Collectors.toSet()); |
| 447 | + // check if partitions have no committed offsets |
| 448 | + return partitions.isEmpty() |
| 449 | + ? Mono.error(new NotFoundException("The topic or partition is unknown")) |
| 450 | + // call deleteConsumerGroupOffsets |
| 451 | + : toMono(client.deleteConsumerGroupOffsets(groupId, partitions).all()); |
| 452 | + }) |
| 453 | + .onErrorResume(GroupIdNotFoundException.class, |
| 454 | + th -> Mono.error(new NotFoundException("The group id does not exist"))) |
| 455 | + .onErrorResume(UnknownTopicOrPartitionException.class, |
| 456 | + th -> Mono.error(new NotFoundException("The topic or partition is unknown"))) |
| 457 | + .onErrorResume(GroupSubscribedToTopicException.class, |
| 458 | + th -> Mono.error(new IllegalEntityStateException("The group is not empty"))); |
| 459 | + } |
| 460 | + |
439 | 461 | public Mono<Void> createTopic(String name, |
440 | 462 | int numPartitions, |
441 | 463 | @Nullable Integer replicationFactor, |
|
0 commit comments