Skip to content
This repository was archived by the owner on Oct 21, 2025. It is now read-only.

Commit 9871378

Browse files
author
tnewman-at-gm
authored
Merge branch 'kafbat:main' into Feature/Azure-entra
2 parents 3324341 + 273e64c commit 9871378

File tree

16 files changed

+120
-93
lines changed

16 files changed

+120
-93
lines changed

.github/workflows/e2e-run.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ jobs:
103103
run: |
104104
mkdir -p ./e2e-tests/target/selenoid-results/video
105105
mkdir -p ./e2e-tests/target/selenoid-results/logs
106-
docker-compose -f ./e2e-tests/selenoid/selenoid-ci.yaml up -d
107-
docker-compose -f ./documentation/compose/e2e-tests.yaml up -d
106+
docker compose -f ./e2e-tests/selenoid/selenoid-ci.yaml up -d
107+
docker compose -f ./documentation/compose/e2e-tests.yaml up -d
108108
109109
- name: Dump Docker logs on failure
110110
if: failure()

api/src/main/java/io/kafbat/ui/controller/AccessController.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -45,33 +45,34 @@ public Mono<ResponseEntity<AuthenticationInfoDTO>> getUserAuthInfo(ServerWebExch
4545
.map(SecurityContext::getAuthentication)
4646
.map(Principal::getName);
4747

48+
var builder = AuthenticationInfoDTO.builder()
49+
.rbacEnabled(accessControlService.isRbacEnabled());
50+
4851
return userName
4952
.zipWith(permissions)
50-
.map(data -> {
51-
var dto = new AuthenticationInfoDTO(accessControlService.isRbacEnabled());
52-
dto.setUserInfo(new UserInfoDTO(data.getT1(), data.getT2()));
53-
return dto;
54-
})
55-
.switchIfEmpty(Mono.just(new AuthenticationInfoDTO(accessControlService.isRbacEnabled())))
53+
.map(data -> (AuthenticationInfoDTO) builder
54+
.userInfo(new UserInfoDTO(data.getT1(), data.getT2()))
55+
.build()
56+
)
57+
.switchIfEmpty(Mono.just(builder.build()))
5658
.map(ResponseEntity::ok);
5759
}
5860

5961
private List<UserPermissionDTO> mapPermissions(List<Permission> permissions, List<String> clusters) {
6062
return permissions
6163
.stream()
62-
.map(permission -> {
63-
UserPermissionDTO dto = new UserPermissionDTO();
64-
dto.setClusters(clusters);
65-
dto.setResource(ResourceTypeDTO.fromValue(permission.getResource().toString().toUpperCase()));
66-
dto.setValue(permission.getValue());
67-
dto.setActions(permission.getParsedActions()
68-
.stream()
69-
.map(p -> p.name().toUpperCase())
70-
.map(this::mapAction)
71-
.filter(Objects::nonNull)
72-
.toList());
73-
return dto;
74-
})
64+
.map(permission -> (UserPermissionDTO) UserPermissionDTO.builder()
65+
.clusters(clusters)
66+
.resource(ResourceTypeDTO.fromValue(permission.getResource().toString().toUpperCase()))
67+
.value(permission.getValue())
68+
.actions(permission.getParsedActions()
69+
.stream()
70+
.map(p -> p.name().toUpperCase())
71+
.map(this::mapAction)
72+
.filter(Objects::nonNull)
73+
.toList())
74+
.build()
75+
)
7576
.toList();
7677
}
7778

api/src/main/java/io/kafbat/ui/controller/KsqlController.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String cluster
5353
}
5454

5555
@Override
56+
@SuppressWarnings("unchecked")
5657
public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> openKsqlResponsePipe(String clusterName,
5758
String pipeId,
5859
ServerWebExchange exchange) {

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package io.kafbat.ui.controller;
22

3+
import static io.kafbat.ui.model.rbac.permission.TopicAction.ANALYSIS_RUN;
4+
import static io.kafbat.ui.model.rbac.permission.TopicAction.ANALYSIS_VIEW;
35
import static io.kafbat.ui.model.rbac.permission.TopicAction.CREATE;
46
import static io.kafbat.ui.model.rbac.permission.TopicAction.DELETE;
57
import static io.kafbat.ui.model.rbac.permission.TopicAction.EDIT;
6-
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
78
import static io.kafbat.ui.model.rbac.permission.TopicAction.VIEW;
89
import static java.util.stream.Collectors.toList;
910

@@ -272,7 +273,7 @@ public Mono<ResponseEntity<Void>> analyzeTopic(String clusterName, String topicN
272273

273274
var context = AccessContext.builder()
274275
.cluster(clusterName)
275-
.topicActions(topicName, MESSAGES_READ)
276+
.topicActions(topicName, ANALYSIS_RUN)
276277
.operationName("analyzeTopic")
277278
.build();
278279

@@ -288,7 +289,7 @@ public Mono<ResponseEntity<Void>> cancelTopicAnalysis(String clusterName, String
288289
ServerWebExchange exchange) {
289290
var context = AccessContext.builder()
290291
.cluster(clusterName)
291-
.topicActions(topicName, MESSAGES_READ)
292+
.topicActions(topicName, ANALYSIS_RUN)
292293
.operationName("cancelTopicAnalysis")
293294
.build();
294295

@@ -306,7 +307,7 @@ public Mono<ResponseEntity<TopicAnalysisDTO>> getTopicAnalysis(String clusterNam
306307

307308
var context = AccessContext.builder()
308309
.cluster(clusterName)
309-
.topicActions(topicName, MESSAGES_READ)
310+
.topicActions(topicName, ANALYSIS_VIEW)
310311
.operationName("getTopicAnalysis")
311312
.build();
312313

@@ -350,18 +351,12 @@ private Comparator<InternalTopic> getComparatorForTopic(
350351
if (orderBy == null) {
351352
return defaultComparator;
352353
}
353-
switch (orderBy) {
354-
case TOTAL_PARTITIONS:
355-
return Comparator.comparing(InternalTopic::getPartitionCount);
356-
case OUT_OF_SYNC_REPLICAS:
357-
return Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
358-
case REPLICATION_FACTOR:
359-
return Comparator.comparing(InternalTopic::getReplicationFactor);
360-
case SIZE:
361-
return Comparator.comparing(InternalTopic::getSegmentSize);
362-
case NAME:
363-
default:
364-
return defaultComparator;
365-
}
354+
return switch (orderBy) {
355+
case TOTAL_PARTITIONS -> Comparator.comparing(InternalTopic::getPartitionCount);
356+
case OUT_OF_SYNC_REPLICAS -> Comparator.comparing(t -> t.getReplicas() - t.getInSyncReplicas());
357+
case REPLICATION_FACTOR -> Comparator.comparing(InternalTopic::getReplicationFactor);
358+
case SIZE -> Comparator.comparing(InternalTopic::getSegmentSize);
359+
default -> defaultComparator;
360+
};
366361
}
367362
}

api/src/main/java/io/kafbat/ui/model/rbac/permission/TopicAction.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public enum TopicAction implements PermissibleAction {
1313
MESSAGES_READ(VIEW),
1414
MESSAGES_PRODUCE(VIEW),
1515
MESSAGES_DELETE(VIEW, EDIT),
16+
ANALYSIS_VIEW(VIEW),
17+
ANALYSIS_RUN(VIEW, ANALYSIS_VIEW),
1618

1719
;
1820

api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import java.util.List;
1111
import java.util.Properties;
1212
import java.util.UUID;
13-
import java.util.stream.Collectors;
1413
import java.util.stream.Stream;
1514
import lombok.extern.slf4j.Slf4j;
1615
import lombok.val;
@@ -32,7 +31,6 @@ public class KafkaConsumerGroupTests extends AbstractIntegrationTest {
3231
@Test
3332
void shouldNotFoundWhenNoSuchConsumerGroupId() {
3433
String groupId = "groupA";
35-
String expError = "The group id does not exist";
3634
webTestClient
3735
.delete()
3836
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
@@ -47,12 +45,13 @@ void shouldOkWhenConsumerGroupIsNotActive() {
4745

4846
//Create a consumer and subscribe to the topic
4947
String groupId = UUID.randomUUID().toString();
50-
val consumer = createTestConsumerWithGroupId(groupId);
51-
consumer.subscribe(List.of(topicName));
52-
consumer.poll(Duration.ofMillis(100));
48+
try (val consumer = createTestConsumerWithGroupId(groupId)) {
49+
consumer.subscribe(List.of(topicName));
50+
consumer.poll(Duration.ofMillis(100));
5351

54-
//Unsubscribe from all topics to be able to delete this consumer
55-
consumer.unsubscribe();
52+
//Unsubscribe from all topics to be able to delete this consumer
53+
consumer.unsubscribe();
54+
}
5655

5756
//Delete the consumer when it's INACTIVE and check
5857
webTestClient
@@ -69,24 +68,24 @@ void shouldBeBadRequestWhenConsumerGroupIsActive() {
6968

7069
//Create a consumer and subscribe to the topic
7170
String groupId = UUID.randomUUID().toString();
72-
val consumer = createTestConsumerWithGroupId(groupId);
73-
consumer.subscribe(List.of(topicName));
74-
consumer.poll(Duration.ofMillis(100));
71+
try (val consumer = createTestConsumerWithGroupId(groupId)) {
72+
consumer.subscribe(List.of(topicName));
73+
consumer.poll(Duration.ofMillis(100));
7574

76-
//Try to delete the consumer when it's ACTIVE
77-
String expError = "The group is not empty";
78-
webTestClient
79-
.delete()
80-
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
81-
.exchange()
82-
.expectStatus()
83-
.isBadRequest();
75+
//Try to delete the consumer when it's ACTIVE
76+
webTestClient
77+
.delete()
78+
.uri("/api/clusters/{clusterName}/consumer-groups/{groupId}", LOCAL, groupId)
79+
.exchange()
80+
.expectStatus()
81+
.isBadRequest();
82+
}
8483
}
8584

8685
@Test
8786
void shouldReturnConsumerGroupsWithPagination() throws Exception {
88-
try (var groups1 = startConsumerGroups(3, "cgPageTest1");
89-
var groups2 = startConsumerGroups(2, "cgPageTest2")) {
87+
try (var ignored = startConsumerGroups(3, "cgPageTest1");
88+
var ignored1 = startConsumerGroups(2, "cgPageTest2")) {
9089
webTestClient
9190
.get()
9291
.uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=3&search=cgPageTest", LOCAL)
@@ -114,19 +113,19 @@ void shouldReturnConsumerGroupsWithPagination() throws Exception {
114113
});
115114

116115
webTestClient
117-
.get()
118-
.uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
119-
+ "=cgPageTest&orderBy=NAME&sortOrder=DESC", LOCAL)
120-
.exchange()
121-
.expectStatus()
122-
.isOk()
123-
.expectBody(ConsumerGroupsPageResponseDTO.class)
124-
.value(page -> {
125-
assertThat(page.getPageCount()).isEqualTo(1);
126-
assertThat(page.getConsumerGroups().size()).isEqualTo(5);
127-
assertThat(page.getConsumerGroups())
128-
.isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getGroupId).reversed());
129-
});
116+
.get()
117+
.uri("/api/clusters/{clusterName}/consumer-groups/paged?perPage=10&&search"
118+
+ "=cgPageTest&orderBy=NAME&sortOrder=DESC", LOCAL)
119+
.exchange()
120+
.expectStatus()
121+
.isOk()
122+
.expectBody(ConsumerGroupsPageResponseDTO.class)
123+
.value(page -> {
124+
assertThat(page.getPageCount()).isEqualTo(1);
125+
assertThat(page.getConsumerGroups().size()).isEqualTo(5);
126+
assertThat(page.getConsumerGroups())
127+
.isSortedAccordingTo(Comparator.comparing(ConsumerGroupDTO::getGroupId).reversed());
128+
});
130129

131130
webTestClient
132131
.get()
@@ -156,7 +155,7 @@ private Closeable startConsumerGroups(int count, String consumerGroupPrefix) {
156155
return consumer;
157156
})
158157
.limit(count)
159-
.collect(Collectors.toList());
158+
.toList();
160159
return () -> {
161160
consumers.forEach(KafkaConsumer::close);
162161
deleteTopic(topicName);

api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,10 @@ void testBase64DecodingWorks() {
199199
}
200200

201201
private TopicMessageDTO msg() {
202-
return new TopicMessageDTO(1, -1L, OffsetDateTime.now());
202+
return TopicMessageDTO.builder()
203+
.partition(1)
204+
.offset(-1L)
205+
.timestamp(OffsetDateTime.now())
206+
.build();
203207
}
204208
}

api/src/test/java/io/kafbat/ui/service/acl/AclCsvTest.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import io.kafbat.ui.exception.ValidationException;
77
import java.util.Collection;
88
import java.util.List;
9+
import java.util.stream.Stream;
910
import org.apache.kafka.common.acl.AccessControlEntry;
1011
import org.apache.kafka.common.acl.AclBinding;
1112
import org.apache.kafka.common.acl.AclOperation;
@@ -15,6 +16,8 @@
1516
import org.apache.kafka.common.resource.ResourceType;
1617
import org.junit.jupiter.api.Test;
1718
import org.junit.jupiter.params.ParameterizedTest;
19+
import org.junit.jupiter.params.provider.Arguments;
20+
import org.junit.jupiter.params.provider.MethodSource;
1821
import org.junit.jupiter.params.provider.ValueSource;
1922

2023
class AclCsvTest {
@@ -29,22 +32,26 @@ class AclCsvTest {
2932
);
3033

3134
@ParameterizedTest
32-
@ValueSource(strings = {
33-
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n"
34-
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
35-
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost",
36-
37-
//without header
38-
"User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
39-
+ "\n"
40-
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost"
41-
+ "\n"
42-
})
35+
@MethodSource
4336
void parsesValidInputCsv(String csvString) {
4437
Collection<AclBinding> parsed = AclCsv.parseCsv(csvString);
4538
assertThat(parsed).containsExactlyInAnyOrderElementsOf(TEST_BINDINGS);
4639
}
4740

41+
private static Stream<Arguments> parsesValidInputCsv() {
42+
return Stream.of(
43+
Arguments.of(
44+
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host" + System.lineSeparator()
45+
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*" + System.lineSeparator()
46+
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost"),
47+
Arguments.of(
48+
//without header
49+
"User:test1,TOPIC,LITERAL,*,READ,ALLOW,*" + System.lineSeparator()
50+
+ System.lineSeparator()
51+
+ "User:test2,GROUP,PREFIXED,group1,DESCRIBE,DENY,localhost"
52+
+ System.lineSeparator()));
53+
}
54+
4855
@ParameterizedTest
4956
@ValueSource(strings = {
5057
// columns > 7

api/src/test/java/io/kafbat/ui/service/acl/AclsServiceTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ void testSyncAclWithAclCsv() {
6868

6969
aclsService.syncAclWithAclCsv(
7070
CLUSTER,
71-
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host\n"
72-
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*\n"
71+
"Principal,ResourceType, PatternType, ResourceName,Operation,PermissionType,Host" + System.lineSeparator()
72+
+ "User:test1,TOPIC,LITERAL,*,READ,ALLOW,*" + System.lineSeparator()
7373
+ "User:test3,GROUP,PREFIXED,groupNew,DESCRIBE,DENY,localhost"
7474
).block();
7575

contract/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@
4646
<artifactId>javax.annotation-api</artifactId>
4747
<version>1.3.2</version>
4848
</dependency>
49+
<dependency>
50+
<groupId>org.projectlombok</groupId>
51+
<artifactId>lombok</artifactId>
52+
<version>${org.projectlombok.version}</version>
53+
</dependency>
4954
</dependencies>
5055

5156
<build>
@@ -100,6 +105,12 @@
100105
<useTags>true</useTags>
101106
<useSpringBoot3>true</useSpringBoot3>
102107
<dateLibrary>java8</dateLibrary>
108+
<generatedConstructorWithRequiredArgs>false</generatedConstructorWithRequiredArgs>
109+
<additionalModelTypeAnnotations>
110+
@lombok.experimental.SuperBuilder
111+
@lombok.NoArgsConstructor
112+
@lombok.AllArgsConstructor
113+
</additionalModelTypeAnnotations>
103114
</configOptions>
104115
</configuration>
105116
</execution>

0 commit comments

Comments
 (0)