Skip to content

Commit 9f507f3

Browse files
authored
Merge branch 'main' into feature/connector-failed-modal
2 parents a04d6c8 + 3a7b70f commit 9f507f3

File tree

59 files changed

+1036
-159
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1036
-159
lines changed

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ public static class CacheProperties {
236236
public static class NgramProperties {
237237
int ngramMin = 1;
238238
int ngramMax = 4;
239+
boolean distanceScore = true;
239240
}
240241

241242
@Data
@@ -244,10 +245,10 @@ public static class NgramProperties {
244245
public static class ClusterFtsProperties {
245246
boolean enabled = true;
246247
boolean defaultEnabled = false;
247-
NgramProperties schemas = new NgramProperties(1, 4);
248-
NgramProperties consumers = new NgramProperties(1, 4);
249-
NgramProperties connect = new NgramProperties(1, 4);
250-
NgramProperties acl = new NgramProperties(1, 4);
248+
NgramProperties schemas = new NgramProperties(1, 4, true);
249+
NgramProperties consumers = new NgramProperties(1, 4, true);
250+
NgramProperties connect = new NgramProperties(1, 4, true);
251+
NgramProperties acl = new NgramProperties(1, 4, true);
251252

252253
public boolean use(Boolean request) {
253254
if (enabled) {

api/src/main/java/io/kafbat/ui/controller/KafkaConnectController.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.kafbat.ui.service.mcp.McpTool;
2424
import java.util.Comparator;
2525
import java.util.Map;
26+
import java.util.Optional;
2627
import java.util.Set;
2728
import javax.validation.Valid;
2829
import lombok.RequiredArgsConstructor;
@@ -141,15 +142,18 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
141142
.operationName("getAllConnectors")
142143
.build();
143144

145+
var maybeComparator = Optional.ofNullable(orderBy).map(this::getConnectorsComparator);
146+
144147
var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
145-
? getConnectorsComparator(orderBy)
146-
: getConnectorsComparator(orderBy).reversed();
148+
? maybeComparator
149+
: maybeComparator.map(Comparator::reversed);
150+
151+
Flux<FullConnectorInfoDTO> connectors = kafkaConnectService.getAllConnectors(getCluster(clusterName), search, fts)
152+
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName));
147153

148-
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search, fts)
149-
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
150-
.sort(comparator);
154+
Flux<FullConnectorInfoDTO> sorted = comparator.map(connectors::sort).orElse(connectors);
151155

152-
return Mono.just(ResponseEntity.ok(job))
156+
return Mono.just(ResponseEntity.ok(sorted))
153157
.doOnEach(sig -> audit(context, sig));
154158
}
155159

@@ -284,9 +288,7 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
284288
FullConnectorInfoDTO::getName,
285289
Comparator.nullsFirst(Comparator.naturalOrder())
286290
);
287-
if (orderBy == null) {
288-
return defaultComparator;
289-
}
291+
290292
return switch (orderBy) {
291293
case CONNECT -> Comparator.comparing(
292294
FullConnectorInfoDTO::getConnect,

api/src/main/java/io/kafbat/ui/controller/SchemasController.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Comparator;
2626
import java.util.List;
2727
import java.util.Map;
28+
import java.util.Optional;
2829
import javax.validation.Valid;
2930
import lombok.RequiredArgsConstructor;
3031
import lombok.extern.slf4j.Slf4j;
@@ -244,11 +245,15 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
244245

245246
List<String> subjectsToRetrieve;
246247
boolean paginate = true;
247-
var schemaComparator = getComparatorForSchema(orderBy);
248-
final Comparator<SubjectWithCompatibilityLevel> comparator =
249-
sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
250-
? schemaComparator : schemaComparator.reversed();
248+
249+
var schemaComparator = Optional.ofNullable(orderBy).map(this::getComparatorForSchema);
250+
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
251+
? schemaComparator : schemaComparator.map(Comparator::reversed);
252+
251253
if (orderBy == null || SchemaColumnsToSortDTO.SUBJECT.equals(orderBy)) {
254+
if (orderBy != null) {
255+
filteredSubjects.sort(Comparator.nullsFirst(Comparator.naturalOrder()));
256+
}
252257
if (SortOrderDTO.DESC.equals(sortOrder)) {
253258
filteredSubjects.sort(Comparator.nullsFirst(Comparator.reverseOrder()));
254259
}
@@ -274,11 +279,13 @@ public Mono<ResponseEntity<SchemaSubjectsResponseDTO>> getSchemas(String cluster
274279

275280
private List<SubjectWithCompatibilityLevel> paginateSchemas(
276281
List<SubjectWithCompatibilityLevel> subjects,
277-
Comparator<SubjectWithCompatibilityLevel> comparator,
282+
Optional<Comparator<SubjectWithCompatibilityLevel>> comparator,
278283
boolean paginate,
279284
int pageSize,
280285
int subjectToSkip) {
281-
subjects.sort(comparator);
286+
287+
comparator.ifPresent(subjects::sort);
288+
282289
if (paginate) {
283290
return subjects.subList(subjectToSkip, Math.min(subjectToSkip + pageSize, subjects.size()));
284291
} else {

api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
import io.kafbat.ui.config.ClustersProperties;
44
import io.kafbat.ui.connect.model.ClusterInfo;
55
import io.kafbat.ui.connect.model.Connector;
6+
import io.kafbat.ui.connect.model.ConnectorStatus;
67
import io.kafbat.ui.connect.model.ConnectorStatusConnector;
78
import io.kafbat.ui.connect.model.ConnectorTask;
89
import io.kafbat.ui.connect.model.ConnectorTopics;
910
import io.kafbat.ui.connect.model.ExpandedConnector;
1011
import io.kafbat.ui.connect.model.NewConnector;
12+
import io.kafbat.ui.connect.model.TaskStatus;
1113
import io.kafbat.ui.model.ConnectDTO;
1214
import io.kafbat.ui.model.ConnectorDTO;
1315
import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO;
@@ -42,14 +44,36 @@ default ClusterInfo toClient(KafkaConnectState state) {
4244

4345
@Mapping(target = "status", ignore = true)
4446
@Mapping(target = "connect", ignore = true)
45-
ConnectorDTO fromClient(io.kafbat.ui.connect.model.Connector connector);
46-
47-
default ConnectorDTO fromClient(Connector connector, ConnectorTopics topics) {
48-
ConnectorDTO connectorDto = this.fromClient(connector);
47+
ConnectorDTO fromClient(Connector connector);
48+
49+
default ConnectorDTO fromClient(Connector connector,
50+
String connect,
51+
ConnectorTopics topics,
52+
Map<String, Object> sanitizedConfigs,
53+
ConnectorStatus status) {
54+
ConnectorDTO result = this.fromClient(connector);
55+
result.connect(connect);
4956
if (topics != null) {
50-
return connectorDto.topics(topics.getTopics());
57+
result = result.topics(topics.getTopics());
58+
}
59+
if (sanitizedConfigs != null) {
60+
result = result.config(sanitizedConfigs);
61+
}
62+
if (status != null && status.getConnector() != null) {
63+
result = result.status(fromClient(status.getConnector()));
64+
65+
if (status.getTasks() != null) {
66+
boolean isAnyTaskFailed = status.getTasks().stream()
67+
.map(TaskStatus::getState)
68+
.anyMatch(TaskStatus.StateEnum.FAILED::equals);
69+
70+
if (isAnyTaskFailed) {
71+
result.getStatus().state(ConnectorStateDTO.TASK_FAILED);
72+
}
73+
}
5174
}
52-
return connectorDto;
75+
76+
return result;
5377
}
5478

5579
ConnectorStatusDTO fromClient(ConnectorStatusConnector connectorStatus);

api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -227,35 +227,20 @@ public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName,
227227
String connectorName) {
228228
return api(cluster, connectName)
229229
.mono(client ->
230-
Mono.zip(client.getConnector(connectorName), getConnectorTopics(cluster, connectName, connectorName))
231-
.map(t -> kafkaConnectMapper.fromClient(t.getT1(), t.getT2()))
232-
.flatMap(connector ->
233-
client.getConnectorStatus(connector.getName())
234-
// status request can return 404 if tasks not assigned yet
235-
.onErrorResume(WebClientResponseException.NotFound.class,
230+
Mono.zip(
231+
client.getConnector(connectorName),
232+
getConnectorTopics(cluster, connectName, connectorName),
233+
client.getConnectorStatus(connectorName).onErrorResume(WebClientResponseException.NotFound.class,
236234
e -> emptyStatus(connectorName))
237-
.map(connectorStatus -> {
238-
var status = connectorStatus.getConnector();
239-
var sanitizedConfig = kafkaConfigSanitizer.sanitizeConnectorConfig(connector.getConfig());
240-
ConnectorDTO result = new ConnectorDTO()
241-
.connect(connectName)
242-
.status(kafkaConnectMapper.fromClient(status))
243-
.type(connector.getType())
244-
.tasks(connector.getTasks())
245-
.name(connector.getName())
246-
.config(sanitizedConfig);
247-
248-
if (connectorStatus.getTasks() != null) {
249-
boolean isAnyTaskFailed = connectorStatus.getTasks().stream()
250-
.map(TaskStatus::getState)
251-
.anyMatch(TaskStatus.StateEnum.FAILED::equals);
252-
253-
if (isAnyTaskFailed) {
254-
result.getStatus().state(ConnectorStateDTO.TASK_FAILED);
255-
}
256-
}
257-
return result;
258-
})
235+
)
236+
.map(t ->
237+
kafkaConnectMapper.fromClient(
238+
t.getT1(),
239+
connectName,
240+
t.getT2(),
241+
kafkaConfigSanitizer.sanitizeConnectorConfig(t.getT1().getConfig()),
242+
t.getT3()
243+
)
259244
)
260245
);
261246
}
@@ -281,7 +266,7 @@ public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster cluster, String connec
281266
.mono(c ->
282267
requestBody
283268
.flatMap(body -> c.setConnectorConfig(connectorName, body))
284-
.map(kafkaConnectMapper::fromClient));
269+
.map(connector -> kafkaConnectMapper.fromClient(connector)));
285270
}
286271

287272
public Mono<Void> deleteConnector(

api/src/main/java/io/kafbat/ui/service/acl/AclsService.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,7 @@ public Flux<AclBinding> listAcls(KafkaCluster cluster, ResourcePatternFilter fil
7575
Boolean fts) {
7676
return adminClientService.get(cluster)
7777
.flatMap(c -> c.listAcls(filter))
78-
.flatMapIterable(acls -> acls)
79-
.filter(acl -> principalSearch == null || acl.entry().principal().contains(principalSearch))
80-
.collectList()
81-
.map(lst -> filter(lst, principalSearch, fts))
78+
.map(lst -> filter(new ArrayList<>(lst), principalSearch, fts))
8279
.flatMapMany(Flux::fromIterable)
8380
.sort(Comparator.comparing(AclBinding::toString)); //sorting to keep stable order on different calls
8481
}

api/src/main/java/io/kafbat/ui/service/index/AclBindingNgramFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class AclBindingNgramFilter extends NgramFilter<AclBinding> {
1111
private final List<Tuple2<List<String>, AclBinding>> bindings;
1212

1313
public AclBindingNgramFilter(Collection<AclBinding> bindings) {
14-
this(bindings, true, new ClustersProperties.NgramProperties(1, 4));
14+
this(bindings, true, new ClustersProperties.NgramProperties(1, 4, true));
1515
}
1616

1717
public AclBindingNgramFilter(

api/src/main/java/io/kafbat/ui/service/index/ConsumerGroupFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class ConsumerGroupFilter extends NgramFilter<ConsumerGroupListing> {
1111
private final List<Tuple2<List<String>, ConsumerGroupListing>> groups;
1212

1313
public ConsumerGroupFilter(Collection<ConsumerGroupListing> groups) {
14-
this(groups, true, new ClustersProperties.NgramProperties(1, 4));
14+
this(groups, true, new ClustersProperties.NgramProperties(1, 4, true));
1515
}
1616

1717
public ConsumerGroupFilter(

api/src/main/java/io/kafbat/ui/service/index/KafkaConnectNgramFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public class KafkaConnectNgramFilter extends NgramFilter<FullConnectorInfoDTO> {
1111
private final List<Tuple2<List<String>, FullConnectorInfoDTO>> connectors;
1212

1313
public KafkaConnectNgramFilter(Collection<FullConnectorInfoDTO> connectors) {
14-
this(connectors, true, new ClustersProperties.NgramProperties(1, 4));
14+
this(connectors, true, new ClustersProperties.NgramProperties(1, 4, true));
1515
}
1616

1717
public KafkaConnectNgramFilter(

api/src/main/java/io/kafbat/ui/service/index/LuceneTopicsIndex.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
import io.kafbat.ui.model.InternalTopic;
44
import io.kafbat.ui.model.InternalTopicConfig;
5+
import io.kafbat.ui.service.index.lucene.IndexedTextField;
6+
import io.kafbat.ui.service.index.lucene.NameDistanceScoringFunction;
7+
import io.kafbat.ui.service.index.lucene.ShortWordAnalyzer;
58
import java.io.IOException;
69
import java.io.UncheckedIOException;
710
import java.util.ArrayList;
@@ -18,11 +21,11 @@
1821
import org.apache.lucene.document.IntPoint;
1922
import org.apache.lucene.document.LongPoint;
2023
import org.apache.lucene.document.StringField;
21-
import org.apache.lucene.document.TextField;
2224
import org.apache.lucene.index.DirectoryReader;
2325
import org.apache.lucene.index.IndexWriter;
2426
import org.apache.lucene.index.IndexWriterConfig;
2527
import org.apache.lucene.index.Term;
28+
import org.apache.lucene.queries.function.FunctionScoreQuery;
2629
import org.apache.lucene.queryparser.classic.ParseException;
2730
import org.apache.lucene.queryparser.classic.QueryParser;
2831
import org.apache.lucene.search.BooleanClause;
@@ -59,18 +62,22 @@ public LuceneTopicsIndex(List<InternalTopic> topics) throws IOException {
5962

6063
private Directory build(List<InternalTopic> topics) {
6164
Directory directory = new ByteBuffersDirectory();
65+
6266
try (IndexWriter directoryWriter = new IndexWriter(directory, new IndexWriterConfig(this.analyzer))) {
6367
for (InternalTopic topic : topics) {
6468
Document doc = new Document();
69+
6570
doc.add(new StringField(FIELD_NAME_RAW, topic.getName(), Field.Store.YES));
66-
doc.add(new TextField(FIELD_NAME, topic.getName(), Field.Store.NO));
71+
doc.add(new IndexedTextField(FIELD_NAME, topic.getName(), Field.Store.YES));
6772
doc.add(new IntPoint(FIELD_PARTITIONS, topic.getPartitionCount()));
6873
doc.add(new IntPoint(FIELD_REPLICATION, topic.getReplicationFactor()));
6974
doc.add(new LongPoint(FIELD_SIZE, topic.getSegmentSize()));
7075
if (topic.getTopicConfigs() != null && !topic.getTopicConfigs().isEmpty()) {
7176
for (InternalTopicConfig topicConfig : topic.getTopicConfigs()) {
72-
doc.add(new StringField(FIELD_CONFIG_PREFIX + "_" + topicConfig.getName(), topicConfig.getValue(),
73-
Field.Store.NO));
77+
if (topicConfig.getName() != null || topicConfig.getValue() != null) {
78+
doc.add(new StringField(FIELD_CONFIG_PREFIX + "_" + topicConfig.getName(), topicConfig.getValue(),
79+
Field.Store.NO));
80+
}
7481
}
7582
}
7683
doc.add(new StringField(FIELD_INTERNAL, String.valueOf(topic.isInternal()), Field.Store.NO));
@@ -117,9 +124,9 @@ public List<InternalTopic> find(String search, Boolean showInternal,
117124
closeLock.readLock().lock();
118125
try {
119126

120-
QueryParser queryParser = new PrefixQueryParser(FIELD_NAME, this.analyzer);
127+
PrefixQueryParser queryParser = new PrefixQueryParser(FIELD_NAME, this.analyzer);
121128
queryParser.setDefaultOperator(QueryParser.Operator.AND);
122-
Query nameQuery = queryParser.parse(search);;
129+
Query nameQuery = queryParser.parse(search);
123130

124131
Query internalFilter = new TermQuery(new Term(FIELD_INTERNAL, "true"));
125132

@@ -129,6 +136,12 @@ public List<InternalTopic> find(String search, Boolean showInternal,
129136
queryBuilder.add(internalFilter, BooleanClause.Occur.MUST_NOT);
130137
}
131138

139+
BooleanQuery combined = queryBuilder.build();
140+
Query wrapped = new FunctionScoreQuery(
141+
combined,
142+
new NameDistanceScoringFunction(FIELD_NAME, queryParser.getPrefixes())
143+
);
144+
132145
List<SortField> sortFields = new ArrayList<>();
133146
sortFields.add(SortField.FIELD_SCORE);
134147
if (!sortField.equals(FIELD_NAME)) {
@@ -137,7 +150,7 @@ public List<InternalTopic> find(String search, Boolean showInternal,
137150

138151
Sort sort = new Sort(sortFields.toArray(new SortField[0]));
139152

140-
TopDocs result = this.indexSearcher.search(queryBuilder.build(), count != null ? count : this.maxSize, sort);
153+
TopDocs result = this.indexSearcher.search(wrapped, count != null ? count : this.maxSize, sort);
141154

142155
List<String> topics = new ArrayList<>();
143156
for (ScoreDoc scoreDoc : result.scoreDocs) {

0 commit comments

Comments
 (0)