Skip to content

Commit 61f6e20

Browse files
authored
Merge branch 'main' into main
2 parents b88e541 + d4bd299 commit 61f6e20

File tree

64 files changed

+870
-367
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+870
-367
lines changed

api/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# The tag is ignored when a sha is included but the reason to add it are:
22
# 1. Self Documentation: It is difficult to find out what the expected tag is given a sha alone
33
# 2. Helps dependabot during discovery of upgrades
4-
FROM azul/zulu-openjdk-alpine:21.0.8-jre-headless@sha256:9c7b4b7850bd4cdd78f91b369accc5b55beffa9a073b9a2bb94caa42606b9444
4+
FROM azul/zulu-openjdk-alpine:21.0.8-jre-headless@sha256:49992ca94d08736b6827ad3289114acc7934a0da543f3bff208c3df46ed1108c
55

66
RUN apk add --no-cache \
77
# snappy codec

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,11 +243,23 @@ public static class NgramProperties {
243243
@NoArgsConstructor
244244
@AllArgsConstructor
245245
public static class ClusterFtsProperties {
246-
boolean enabled = false;
246+
boolean enabled = true;
247+
boolean defaultEnabled = false;
247248
NgramProperties schemas = new NgramProperties(1, 4);
248249
NgramProperties consumers = new NgramProperties(1, 4);
249250
NgramProperties connect = new NgramProperties(1, 4);
250251
NgramProperties acl = new NgramProperties(1, 4);
252+
253+
public boolean use(Boolean request) {
254+
if (enabled) {
255+
if (Boolean.TRUE.equals(request)) {
256+
return true;
257+
} else if (request == null && defaultEnabled) {
258+
return true;
259+
}
260+
}
261+
return false;
262+
}
251263
}
252264

253265
@PostConstruct

api/src/main/java/io/kafbat/ui/controller/AclsController.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
@RestController
2727
@RequiredArgsConstructor
28-
public class AclsController extends AbstractController implements AclsApi, McpTool {
28+
public class AclsController extends AbstractController implements AclsApi, McpTool {
2929

3030
private final AclsService aclsService;
3131

@@ -69,6 +69,7 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
6969
String resourceName,
7070
KafkaAclNamePatternTypeDTO namePatternTypeDto,
7171
String search,
72+
Boolean fts,
7273
ServerWebExchange exchange) {
7374
AccessContext context = AccessContext.builder()
7475
.cluster(clusterName)
@@ -89,7 +90,7 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
8990
return validateAccess(context).then(
9091
Mono.just(
9192
ResponseEntity.ok(
92-
aclsService.listAcls(getCluster(clusterName), filter, search)
93+
aclsService.listAcls(getCluster(clusterName), filter, search, fts)
9394
.map(ClusterMapper::toKafkaAclDto)))
9495
).doOnEach(sig -> audit(context, sig));
9596
}

api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public Mono<ResponseEntity<ConsumerGroupsPageResponseDTO>> getConsumerGroupsPage
128128
String search,
129129
ConsumerGroupOrderingDTO orderBy,
130130
SortOrderDTO sortOrderDto,
131+
Boolean fts,
131132
ServerWebExchange exchange) {
132133

133134
var context = AccessContext.builder()

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
129129
String search,
130130
ConnectorColumnsToSortDTO orderBy,
131131
SortOrderDTO sortOrder,
132+
Boolean fts,
132133
ServerWebExchange exchange
133134
) {
134135
var context = AccessContext.builder()
@@ -140,7 +141,7 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
140141
? getConnectorsComparator(orderBy)
141142
: getConnectorsComparator(orderBy).reversed();
142143

143-
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
144+
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search, fts)
144145
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
145146
.sort(comparator);
146147

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,13 +217,15 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
217217
@Valid String search,
218218
SchemaColumnsToSortDTO orderBy,
219219
SortOrderDTO sortOrder,
220+
Boolean fts,
220221
ServerWebExchange serverWebExchange) {
221222
var context = AccessContext.builder()
222223
.cluster(clusterName)
223224
.operationName("getSchemas")
224225
.build();
225226

226-
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
227+
ClustersProperties.ClusterFtsProperties ftsProperties = clustersProperties.getFts();
228+
boolean useFts = ftsProperties.use(fts);
227229

228230
return schemaRegistryService
229231
.getAllSubjectNames(getCluster(clusterName))
@@ -234,7 +236,7 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
234236
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
235237
int subjectToSkip = ((pageNum != null && pageNum > 0 ? pageNum : 1) - 1) * pageSize;
236238

237-
SchemasFilter filter = new SchemasFilter(subjects, fts.isEnabled(), fts.getSchemas());
239+
SchemasFilter filter = new SchemasFilter(subjects, useFts, ftsProperties.getSchemas());
238240
List<String> filteredSubjects = new ArrayList<>(filter.find(search));
239241

240242
var totalPages = (filteredSubjects.size() / pageSize)

api/src/main/java/io/kafbat/ui/controller/TopicsController.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,20 +174,22 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
174174
@Valid String search,
175175
@Valid TopicColumnsToSortDTO orderBy,
176176
@Valid SortOrderDTO sortOrder,
177+
Boolean fts,
177178
ServerWebExchange exchange) {
178179

179180
AccessContext context = AccessContext.builder()
180181
.cluster(clusterName)
181182
.operationName("getTopics")
182183
.build();
183184

184-
return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal)
185+
return topicsService.getTopicsForPagination(getCluster(clusterName), search, showInternal, fts)
185186
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
186187
.flatMap(topics -> {
187188
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
188189
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
189-
ClustersProperties.ClusterFtsProperties fts = clustersProperties.getFts();
190-
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, fts.isEnabled());
190+
ClustersProperties.ClusterFtsProperties ftsProperties = clustersProperties.getFts();
191+
boolean useFts = ftsProperties.use(fts);
192+
Comparator<InternalTopic> comparatorForTopic = getComparatorForTopic(orderBy, useFts);
191193
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
192194
? comparatorForTopic : comparatorForTopic.reversed();
193195

api/src/main/java/io/kafbat/ui/model/ClusterFeature.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,7 @@ public enum ClusterFeature {
88
KAFKA_ACL_VIEW,
99
KAFKA_ACL_EDIT,
1010
CLIENT_QUOTA_MANAGEMENT,
11-
GRAPHS_ENABLED
11+
GRAPHS_ENABLED,
12+
FTS_ENABLED,
13+
FTS_DEFAULT_ENABLED
1214
}

api/src/main/java/io/kafbat/ui/serdes/BuiltInSerde.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package io.kafbat.ui.serdes;
22

33
import io.kafbat.ui.serde.api.PropertyResolver;
4+
import io.kafbat.ui.serde.api.SchemaDescription;
45
import io.kafbat.ui.serde.api.Serde;
6+
import java.util.Optional;
57

68
public interface BuiltInSerde extends Serde {
79

@@ -24,4 +26,25 @@ default void configure(PropertyResolver serdeProperties,
2426
PropertyResolver kafkaClusterProperties,
2527
PropertyResolver globalProperties) {
2628
}
29+
30+
@Override
31+
default boolean canSerialize(String topic, Serde.Target type) {
32+
return false;
33+
}
34+
35+
@Override
36+
default Serde.Serializer serializer(String topic, Serde.Target type) {
37+
throw new UnsupportedOperationException();
38+
}
39+
40+
@Override
41+
default Optional<SchemaDescription> getSchema(String topic, Serde.Target type) {
42+
return Optional.empty();
43+
}
44+
45+
@Override
46+
default Optional<String> getDescription() {
47+
return Optional.empty();
48+
}
49+
2750
}

api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ public Stream<SerdeInstance> all() {
6464

6565
public SerdeInstance suggestSerdeForSerialize(String topic, Serde.Target type) {
6666
return findSerdeByPatternsOrDefault(topic, type, s -> s.canSerialize(topic, type))
67-
.orElse(serdes.get(StringSerde.name()));
67+
.orElse(serdes.get(StringSerde.NAME));
6868
}
6969

7070
public SerdeInstance suggestSerdeForDeserialize(String topic, Serde.Target type) {
7171
return findSerdeByPatternsOrDefault(topic, type, s -> s.canDeserialize(topic, type))
72-
.orElse(serdes.get(StringSerde.name()));
72+
.orElse(serdes.get(StringSerde.NAME));
7373
}
7474

7575
@Override

0 commit comments

Comments
 (0)