Skip to content

Commit 959429b

Browse files
authored
Merge branch 'main' into issues/592
2 parents 8d41256 + 7be3325 commit 959429b

File tree

22 files changed

+418
-192
lines changed

22 files changed

+418
-192
lines changed

.github/workflows/build-public-image.yml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ on:
66
types: ['labeled']
77

88
permissions:
9+
id-token: write
910
contents: read
11+
pull-requests: write
1012

1113
jobs:
1214
build:
@@ -47,12 +49,11 @@ jobs:
4749
key: ${{ runner.os }}-buildx-${{ github.sha }}
4850
restore-keys: |
4951
${{ runner.os }}-buildx-
50-
- name: Configure AWS credentials for Kafka-UI account
52+
- name: Configure AWS Credentials
5153
uses: aws-actions/configure-aws-credentials@v4
5254
with:
53-
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
54-
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
5555
aws-region: us-east-1
56+
role-to-assume: ${{ secrets.AWS_ROLE }}
5657
- name: Login to Amazon ECR
5758
id: login-ecr
5859
uses: aws-actions/amazon-ecr-login@v2
@@ -65,7 +66,7 @@ jobs:
6566
builder: ${{ steps.buildx.outputs.name }}
6667
context: api
6768
push: true
68-
tags: public.ecr.aws/kafbat/kafka-ui-custom-build:${{ steps.extract_branch.outputs.tag }}
69+
tags: ${{ vars.ECR_REGISTRY }}/${{ github.repository }}:${{ steps.extract_branch.outputs.tag }}
6970
build-args: |
7071
JAR_FILE=api-${{ steps.build.outputs.version }}.jar
7172
cache-from: type=local,src=/tmp/.buildx-cache
@@ -75,6 +76,6 @@ jobs:
7576
with:
7677
issue-number: ${{ github.event.pull_request.number }}
7778
body: |
78-
Image published at public.ecr.aws/kafbat/kafka-ui-custom-build:${{ steps.extract_branch.outputs.tag }}
79+
Image published at ${{ vars.ECR_REGISTRY }}/${{ github.repository }}:${{ steps.extract_branch.outputs.tag }}
7980
outputs:
8081
tag: ${{ steps.extract_branch.outputs.tag }}

api/pom.xml

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@
9191
<dependency>
9292
<groupId>software.amazon.msk</groupId>
9393
<artifactId>aws-msk-iam-auth</artifactId>
94-
<version>2.1.0</version>
94+
<version>2.2.0</version>
9595
</dependency>
9696

9797
<dependency>
@@ -266,18 +266,6 @@
266266
<artifactId>cel</artifactId>
267267
</dependency>
268268
<!-- CVE fixes -->
269-
<dependency>
270-
<groupId>ch.qos.logback</groupId>
271-
<artifactId>logback-classic</artifactId>
272-
<version>1.4.12</version>
273-
</dependency>
274-
<!-- CVE fixes -->
275-
<dependency>
276-
<groupId>ch.qos.logback</groupId>
277-
<artifactId>logback-core</artifactId>
278-
<version>1.4.12</version>
279-
</dependency>
280-
<!-- CVE fixes -->
281269
<dependency>
282270
<groupId>com.squareup.okhttp3</groupId>
283271
<artifactId>logging-interceptor</artifactId>
@@ -289,7 +277,6 @@
289277
<artifactId>commons-compress</artifactId>
290278
<version>1.26.0</version>
291279
</dependency>
292-
293280
</dependencies>
294281

295282
<build>

api/src/main/java/io/kafbat/ui/config/ReadOnlyModeFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class ReadOnlyModeFilter implements WebFilter {
2525
Pattern.compile("/api/clusters/(?<clusterName>[^/]++)");
2626

2727
private static final Set<Pattern> SAFE_ENDPOINTS = Set.of(
28-
Pattern.compile("/api/clusters/[^/]+/topics/[^/]+/(smartfilters)$")
28+
Pattern.compile("/api/clusters/[^/]+/topics/[^/]+/(smartfilters|analysis)$")
2929
);
3030

3131
private final ClustersStorage clustersStorage;

api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,24 @@ public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName,
5959
.thenReturn(ResponseEntity.ok().build());
6060
}
6161

62+
@Override
63+
public Mono<ResponseEntity<Void>> deleteConsumerGroupOffsets(String clusterName,
64+
String groupId,
65+
String topicName,
66+
ServerWebExchange exchange) {
67+
var context = AccessContext.builder()
68+
.cluster(clusterName)
69+
.consumerGroupActions(groupId, RESET_OFFSETS)
70+
.topicActions(topicName, TopicAction.VIEW)
71+
.operationName("deleteConsumerGroupOffsets")
72+
.build();
73+
74+
return validateAccess(context)
75+
.then(consumerGroupService.deleteConsumerGroupOffset(getCluster(clusterName), groupId, topicName))
76+
.doOnEach(sig -> audit(context, sig))
77+
.thenReturn(ResponseEntity.ok().build());
78+
}
79+
6280
@Override
6381
public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String clusterName,
6482
String consumerGroupId,

api/src/main/java/io/kafbat/ui/model/InternalBrokerConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ public class InternalBrokerConfig {
1616
private final boolean isReadOnly;
1717
private final List<ConfigEntry.ConfigSynonym> synonyms;
1818

19-
public static InternalBrokerConfig from(ConfigEntry configEntry) {
19+
public static InternalBrokerConfig from(ConfigEntry configEntry, boolean readOnlyCluster) {
2020
InternalBrokerConfig.InternalBrokerConfigBuilder builder = InternalBrokerConfig.builder()
2121
.name(configEntry.name())
2222
.value(configEntry.value())
2323
.source(configEntry.source())
24-
.isReadOnly(configEntry.isReadOnly())
24+
.isReadOnly(readOnlyCluster || configEntry.isReadOnly())
2525
.isSensitive(configEntry.isSensitive())
2626
.synonyms(configEntry.synonyms());
2727
return builder.build();

api/src/main/java/io/kafbat/ui/service/BrokerService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Intege
5959
}
6060
return loadBrokersConfig(cluster, brokerId)
6161
.map(list -> list.stream()
62-
.map(InternalBrokerConfig::from)
62+
.map(configEntry -> InternalBrokerConfig.from(configEntry, cluster.isReadOnly()))
6363
.collect(Collectors.toList()))
6464
.flatMapMany(Flux::fromIterable);
6565
}

api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,12 +209,13 @@ private Mono<List<ConsumerGroupDescription>> describeConsumerGroups(ReactiveAdmi
209209
}
210210

211211

212-
private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(ReactiveAdminClient ac,
213-
List<ConsumerGroupListing> groups,
214-
Comparator<GroupWithDescr> comparator,
215-
int pageNum,
216-
int perPage,
217-
SortOrderDTO sortOrderDto) {
212+
private Mono<List<ConsumerGroupDescription>> loadDescriptionsByInternalConsumerGroups(
213+
ReactiveAdminClient ac,
214+
List<ConsumerGroupListing> groups,
215+
Comparator<GroupWithDescr> comparator,
216+
int pageNum,
217+
int perPage,
218+
SortOrderDTO sortOrderDto) {
218219
var groupNames = groups.stream().map(ConsumerGroupListing::groupId).toList();
219220

220221
return ac.describeConsumerGroups(groupNames)
@@ -247,6 +248,13 @@ public Mono<Void> deleteConsumerGroupById(KafkaCluster cluster,
247248
.flatMap(adminClient -> adminClient.deleteConsumerGroups(List.of(groupId)));
248249
}
249250

251+
public Mono<Void> deleteConsumerGroupOffset(KafkaCluster cluster,
252+
String groupId,
253+
String topicName) {
254+
return adminClientService.get(cluster)
255+
.flatMap(adminClient -> adminClient.deleteConsumerGroupOffsets(groupId, topicName));
256+
}
257+
250258
public EnhancedConsumer createConsumer(KafkaCluster cluster) {
251259
return createConsumer(cluster, Map.of());
252260
}

api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.kafka.common.errors.ClusterAuthorizationException;
7575
import org.apache.kafka.common.errors.GroupIdNotFoundException;
7676
import org.apache.kafka.common.errors.GroupNotEmptyException;
77+
import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
7778
import org.apache.kafka.common.errors.InvalidRequestException;
7879
import org.apache.kafka.common.errors.SecurityDisabledException;
7980
import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -436,6 +437,27 @@ public Mono<Void> deleteConsumerGroups(Collection<String> groupIds) {
436437
th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
437438
}
438439

440+
public Mono<Void> deleteConsumerGroupOffsets(String groupId, String topicName) {
441+
return listConsumerGroupOffsets(List.of(groupId), null)
442+
.flatMap(table -> {
443+
// filter TopicPartitions by topicName
444+
Set<TopicPartition> partitions = table.row(groupId).keySet().stream()
445+
.filter(tp -> tp.topic().equals(topicName))
446+
.collect(Collectors.toSet());
447+
// check if partitions have no committed offsets
448+
return partitions.isEmpty()
449+
? Mono.error(new NotFoundException("The topic or partition is unknown"))
450+
// call deleteConsumerGroupOffsets
451+
: toMono(client.deleteConsumerGroupOffsets(groupId, partitions).all());
452+
})
453+
.onErrorResume(GroupIdNotFoundException.class,
454+
th -> Mono.error(new NotFoundException("The group id does not exist")))
455+
.onErrorResume(UnknownTopicOrPartitionException.class,
456+
th -> Mono.error(new NotFoundException("The topic or partition is unknown")))
457+
.onErrorResume(GroupSubscribedToTopicException.class,
458+
th -> Mono.error(new IllegalEntityStateException("The group is not empty")));
459+
}
460+
439461
public Mono<Void> createTopic(String name,
440462
int numPartitions,
441463
@Nullable Integer replicationFactor,

api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import io.kafbat.ui.model.ConsumerGroupDTO;
66
import io.kafbat.ui.model.ConsumerGroupsPageResponseDTO;
7+
import io.kafbat.ui.producer.KafkaTestProducer;
78
import java.io.Closeable;
89
import java.time.Duration;
910
import java.util.Comparator;
@@ -22,6 +23,8 @@
2223
import org.junit.jupiter.api.Test;
2324
import org.springframework.beans.factory.annotation.Autowired;
2425
import org.springframework.test.web.reactive.server.WebTestClient;
26+
import reactor.core.publisher.Flux;
27+
import reactor.core.publisher.Mono;
2528

2629
@Slf4j
2730
public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
@@ -31,12 +34,76 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
3134
@Test
3235
void shouldNotFoundWhenNoSuchConsumerGroupId() {
3336
String groupId = "groupA";
37+
String topicName = "topicX";
38+
3439
webTestClient
3540
.delete()
3641
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
3742
.exchange()
3843
.expectStatus()
3944
.isNotFound();
45+
46+
webTestClient
47+
.delete()
48+
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId, topicName)
49+
.exchange()
50+
.expectStatus()
51+
.isNotFound();
52+
}
53+
54+
@Test
55+
void shouldNotFoundWhenNoSuchTopic() {
56+
String topicName = createTopicWithRandomName();
57+
String topicNameUnSubscribed = "topicX";
58+
59+
//Create a consumer and subscribe to the topic
60+
String groupId = UUID.randomUUID().toString();
61+
try (val consumer = createTestConsumerWithGroupId(groupId)) {
62+
consumer.subscribe(List.of(topicName));
63+
consumer.poll(Duration.ofMillis(100));
64+
65+
webTestClient
66+
.delete()
67+
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId,
68+
topicNameUnSubscribed)
69+
.exchange()
70+
.expectStatus()
71+
.isNotFound();
72+
}
73+
}
74+
75+
@Test
76+
void shouldOkWhenConsumerGroupIsNotActiveAndPartitionOffsetExists() {
77+
String topicName = createTopicWithRandomName();
78+
79+
//Create a consumer and subscribe to the topic
80+
String groupId = UUID.randomUUID().toString();
81+
82+
try (KafkaTestProducer<String, String> producer = KafkaTestProducer.forKafka(kafka)) {
83+
Flux.fromStream(
84+
Stream.of("one", "two", "three", "four")
85+
.map(value -> Mono.fromFuture(producer.send(topicName, value)))
86+
).blockLast();
87+
} catch (Throwable e) {
88+
log.error("Error on sending", e);
89+
throw new RuntimeException(e);
90+
}
91+
92+
try (val consumer = createTestConsumerWithGroupId(groupId)) {
93+
consumer.subscribe(List.of(topicName));
94+
consumer.poll(Duration.ofMillis(100));
95+
96+
//Stop consumers to delete consumer offset from the topic
97+
consumer.pause(consumer.assignment());
98+
}
99+
100+
//Delete the consumer offset when it's INACTIVE and check
101+
webTestClient
102+
.delete()
103+
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}/topics/{topicName}", LOCAL, groupId, topicName)
104+
.exchange()
105+
.expectStatus()
106+
.isOk();
40107
}
41108

42109
@Test

api/src/test/java/io/kafbat/ui/serdes/PropertyResolverImplTest.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55

66
import java.util.List;
77
import java.util.Map;
8-
import lombok.AllArgsConstructor;
9-
import lombok.Data;
108
import org.junit.jupiter.api.Nested;
119
import org.junit.jupiter.api.Test;
1210
import org.springframework.boot.context.properties.bind.BindException;
@@ -21,11 +19,7 @@ class PropertyResolverImplTest {
2119

2220
private final MockEnvironment env = new MockEnvironment();
2321

24-
@Data
25-
@AllArgsConstructor
26-
public static class CustomPropertiesClass {
27-
private String f1;
28-
private Integer f2;
22+
public record CustomPropertiesClass(String f1, Integer f2) {
2923
}
3024

3125
@Test

0 commit comments

Comments
 (0)