Skip to content

Commit c27b32d

Browse files
committed
Added lucene
1 parent 0474fd7 commit c27b32d

File tree

11 files changed

+456
-2
lines changed

11 files changed

+456
-2
lines changed

api/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ dependencies {
5454
antlr libs.antlr
5555
implementation libs.antlr.runtime
5656

57+
implementation libs.lucene
58+
implementation libs.lucene.queryparser
59+
implementation libs.lucene.analysis.common
60+
5761
implementation libs.opendatadiscovery.oddrn
5862
implementation(libs.opendatadiscovery.client) {
5963
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-webflux'

api/src/main/java/io/kafbat/ui/config/ClustersProperties.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ClustersProperties {
4141
MetricsStorage defaultMetricsStorage = new MetricsStorage();
4242

4343
CacheProperties cache = new CacheProperties();
44+
FtsProperties fts = new FtsProperties();
4445

4546
@Data
4647
public static class Cluster {
@@ -217,6 +218,17 @@ public static class CacheProperties {
217218
Duration connectClusterCacheExpiry = Duration.ofHours(24);
218219
}
219220

221+
@Data
222+
@NoArgsConstructor
223+
@AllArgsConstructor
224+
public static class FtsProperties {
225+
boolean enabled = true;
226+
int topicsMinNGram = 3;
227+
int topicsMaxNGram = 5;
228+
int filterMinNGram = 1;
229+
int filterMaxNGram = 4;
230+
}
231+
220232
@PostConstruct
221233
public void validateAndSetDefaults() {
222234
if (clusters != null) {

api/src/main/java/io/kafbat/ui/model/Statistics.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
@Value
1313
@Builder(toBuilder = true)
14-
public class Statistics {
14+
public class Statistics implements AutoCloseable {
1515
ServerStatusDTO status;
1616
Throwable lastKafkaException;
1717
String version;
@@ -46,4 +46,11 @@ public Stream<TopicDescription> topicDescriptions() {
4646
public Statistics withClusterState(UnaryOperator<ScrapedClusterState> stateUpdate) {
4747
return toBuilder().clusterState(stateUpdate.apply(clusterState)).build();
4848
}
49+
50+
@Override
51+
public void close() throws Exception {
52+
if (clusterState != null) {
53+
clusterState.close();
54+
}
55+
}
4956
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.kafbat.ui.service.index;
2+
3+
import io.kafbat.ui.model.InternalConsumerGroup;
4+
import java.util.List;
5+
import reactor.util.function.Tuple2;
6+
import reactor.util.function.Tuples;
7+
8+
public class ConsumerGroupFilter extends NgramFilter<InternalConsumerGroup> {
9+
private final List<Tuple2<String, InternalConsumerGroup>> groups;
10+
11+
public ConsumerGroupFilter(List<InternalConsumerGroup> groups) {
12+
this.groups = groups.stream().map(g -> Tuples.of(g.getGroupId(), g)).toList();
13+
}
14+
15+
@Override
16+
protected List<Tuple2<String, InternalConsumerGroup>> getItems() {
17+
return this.groups;
18+
}
19+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package io.kafbat.ui.service.index;
2+
3+
import java.io.IOException;
4+
import java.util.ArrayList;
5+
import java.util.HashMap;
6+
import java.util.HashSet;
7+
import java.util.List;
8+
import java.util.Map;
9+
import java.util.concurrent.ConcurrentHashMap;
10+
import lombok.SneakyThrows;
11+
import lombok.extern.slf4j.Slf4j;
12+
import org.apache.lucene.analysis.Analyzer;
13+
import org.apache.lucene.analysis.TokenStream;
14+
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
15+
import reactor.util.function.Tuple2;
16+
17+
@Slf4j
18+
public abstract class NgramFilter<T> {
19+
private final Analyzer analyzer = new ShortWordNGramAnalyzer(1, 4, false);
20+
21+
protected abstract List<Tuple2<String, T>> getItems();
22+
private static Map<String, List<String>> cache = new ConcurrentHashMap<>();
23+
24+
public List<T> find(String search) {
25+
try {
26+
List<SearchResult<T>> result = new ArrayList<>();
27+
List<String> queryTokens = tokenizeString(analyzer, search);
28+
Map<String, Integer> queryFreq = termFreq(queryTokens);
29+
30+
for (Tuple2<String, T> item : getItems()) {
31+
List<String> itemTokens = tokenizeString(analyzer, item.getT1());
32+
HashSet<String> itemTokensSet = new HashSet<>(itemTokens);
33+
if (itemTokensSet.containsAll(queryTokens)) {
34+
double score = cosineSimilarity(queryFreq, itemTokens);
35+
result.add(new SearchResult<T>(item.getT2(), score));
36+
// result.add(new SearchResult<T>(item.getT2(), 1));
37+
}
38+
}
39+
result.sort((o1, o2) -> Double.compare(o2.score, o1.score));
40+
return result.stream().map(r -> r.item).toList();
41+
} catch (Exception e) {
42+
throw new RuntimeException(e);
43+
}
44+
}
45+
46+
private record SearchResult<T>(T item, double score) { }
47+
48+
49+
public static List<String> tokenizeString(Analyzer analyzer, String text) throws IOException {
50+
return cache.computeIfAbsent(text, (t) -> tokenizeStringSimple(analyzer, text));
51+
}
52+
53+
@SneakyThrows
54+
public static List<String> tokenizeStringSimple(Analyzer analyzer, String text) {
55+
List<String> tokens = new ArrayList<>();
56+
try (TokenStream tokenStream = analyzer.tokenStream(null, text)) {
57+
CharTermAttribute attr = tokenStream.addAttribute(CharTermAttribute.class);
58+
tokenStream.reset();
59+
while (tokenStream.incrementToken()) {
60+
tokens.add(attr.toString());
61+
}
62+
tokenStream.end();
63+
}
64+
return tokens;
65+
}
66+
67+
private static double cosineSimilarity(Map<String, Integer> queryFreq, List<String> itemTokens) {
68+
// Build frequency maps
69+
Map<String, Integer> terms = termFreq(itemTokens);
70+
71+
double dot = 0.0;
72+
double mag1 = 0.0;
73+
double mag2 = 0.0;
74+
75+
for (String term : terms.keySet()) {
76+
int f1 = queryFreq.getOrDefault(term, 0);
77+
int f2 = terms.getOrDefault(term, 0);
78+
dot += f1 * f2;
79+
mag1 += f1 * f1;
80+
mag2 += f2 * f2;
81+
}
82+
83+
return (mag1 == 0 || mag2 == 0) ? 0.0 : dot / (Math.sqrt(mag1) * Math.sqrt(mag2));
84+
}
85+
86+
private static Map<String, Integer> termFreq(List<String> tokens) {
87+
Map<String, Integer> freq = new HashMap<>();
88+
for (String token : tokens) {
89+
freq.put(token, freq.getOrDefault(token, 0) + 1);
90+
}
91+
return freq;
92+
}
93+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.kafbat.ui.service.index;
2+
3+
import org.apache.lucene.analysis.Analyzer;
4+
import org.apache.lucene.analysis.LowerCaseFilter;
5+
import org.apache.lucene.analysis.TokenStream;
6+
import org.apache.lucene.analysis.Tokenizer;
7+
import org.apache.lucene.analysis.miscellaneous.WordDelimiterGraphFilter;
8+
import org.apache.lucene.analysis.ngram.NGramTokenFilter;
9+
import org.apache.lucene.analysis.standard.StandardTokenizer;
10+
11+
public class ShortWordNGramAnalyzer extends Analyzer {
12+
private final int minGram;
13+
private final int maxGram;
14+
private final boolean preserveOriginal;
15+
16+
public ShortWordNGramAnalyzer(int minGram, int maxGram) {
17+
this(minGram, maxGram, true);
18+
}
19+
20+
public ShortWordNGramAnalyzer(int minGram, int maxGram, boolean preserveOriginal) {
21+
this.minGram = minGram;
22+
this.maxGram = maxGram;
23+
this.preserveOriginal = preserveOriginal;
24+
}
25+
26+
27+
28+
@Override
29+
protected TokenStreamComponents createComponents(String fieldName) {
30+
Tokenizer tokenizer = new StandardTokenizer();
31+
32+
TokenStream tokenStream = new WordDelimiterGraphFilter(
33+
tokenizer,
34+
WordDelimiterGraphFilter.GENERATE_WORD_PARTS |
35+
WordDelimiterGraphFilter.SPLIT_ON_CASE_CHANGE |
36+
//WordDelimiterGraphFilter.SPLIT_ON_NUMERICS |
37+
WordDelimiterGraphFilter.STEM_ENGLISH_POSSESSIVE,
38+
null
39+
);
40+
41+
tokenStream = new LowerCaseFilter(tokenStream);
42+
43+
// Add n-gram generation from characters (min=2, max=4)
44+
tokenStream = new NGramTokenFilter(tokenStream, minGram, maxGram, this.preserveOriginal);
45+
46+
return new TokenStreamComponents(tokenizer, tokenStream);
47+
}
48+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package io.kafbat.ui.service.index;
2+
3+
import io.kafbat.ui.model.InternalTopic;
4+
import io.kafbat.ui.model.InternalTopicConfig;
5+
import java.io.IOException;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import org.apache.lucene.analysis.Analyzer;
9+
import org.apache.lucene.document.Document;
10+
import org.apache.lucene.document.Field;
11+
import org.apache.lucene.document.IntPoint;
12+
import org.apache.lucene.document.LongPoint;
13+
import org.apache.lucene.document.StringField;
14+
import org.apache.lucene.document.TextField;
15+
import org.apache.lucene.index.DirectoryReader;
16+
import org.apache.lucene.index.IndexWriter;
17+
import org.apache.lucene.index.IndexWriterConfig;
18+
import org.apache.lucene.index.Term;
19+
import org.apache.lucene.queryparser.classic.ParseException;
20+
import org.apache.lucene.queryparser.classic.QueryParser;
21+
import org.apache.lucene.search.BooleanClause;
22+
import org.apache.lucene.search.BooleanQuery;
23+
import org.apache.lucene.search.IndexSearcher;
24+
import org.apache.lucene.search.Query;
25+
import org.apache.lucene.search.ScoreDoc;
26+
import org.apache.lucene.search.Sort;
27+
import org.apache.lucene.search.SortField;
28+
import org.apache.lucene.search.TermQuery;
29+
import org.apache.lucene.search.TopDocs;
30+
import org.apache.lucene.store.ByteBuffersDirectory;
31+
import org.apache.lucene.store.Directory;
32+
33+
public class TopicsIndex implements AutoCloseable {
34+
public static final String FIELD_NAME_RAW = "name_raw";
35+
public static final String FIELD_NAME = "name";
36+
public static final String FIELD_INTERNAL = "internal";
37+
public static final String FIELD_PARTITIONS = "partitions";
38+
public static final String FIELD_REPLICATION = "replication";
39+
public static final String FIELD_SIZE = "size";
40+
public static final String FIELD_CONFIG_PREFIX = "config";
41+
42+
private final Directory directory;
43+
private final DirectoryReader indexReader;
44+
private final IndexSearcher indexSearcher;
45+
private final Analyzer analyzer;
46+
47+
public TopicsIndex(List<InternalTopic> topics) throws IOException {
48+
this(topics, 3,5);
49+
}
50+
51+
public TopicsIndex(List<InternalTopic> topics, int minNgram, int maxNgram) throws IOException {
52+
this.analyzer = new ShortWordNGramAnalyzer(minNgram, maxNgram);
53+
this.directory = build(topics);
54+
this.indexReader = DirectoryReader.open(directory);
55+
this.indexSearcher = new IndexSearcher(indexReader);
56+
}
57+
58+
private Directory build(List<InternalTopic> topics) {
59+
Directory directory = new ByteBuffersDirectory();
60+
try(IndexWriter directoryWriter = new IndexWriter(directory, new IndexWriterConfig(this.analyzer))) {
61+
for (InternalTopic topic : topics) {
62+
Document doc = new Document();
63+
doc.add(new StringField(FIELD_NAME_RAW, topic.getName(), Field.Store.YES));
64+
doc.add(new TextField(FIELD_NAME, topic.getName(), Field.Store.NO));
65+
doc.add(new IntPoint(FIELD_PARTITIONS, topic.getPartitionCount()));
66+
doc.add(new IntPoint(FIELD_REPLICATION, topic.getReplicationFactor()));
67+
doc.add(new LongPoint(FIELD_SIZE, topic.getSegmentSize()));
68+
if (topic.getTopicConfigs() != null && !topic.getTopicConfigs().isEmpty()) {
69+
for (InternalTopicConfig topicConfig : topic.getTopicConfigs()) {
70+
doc.add(new StringField(FIELD_CONFIG_PREFIX+"_"+topicConfig.getName(), topicConfig.getValue(), Field.Store.NO));
71+
}
72+
}
73+
doc.add(new StringField(FIELD_INTERNAL, String.valueOf(topic.isInternal()), Field.Store.NO));
74+
directoryWriter.addDocument(doc);
75+
}
76+
} catch (IOException e) {
77+
throw new RuntimeException(e);
78+
}
79+
return directory;
80+
}
81+
82+
@Override
83+
public void close() throws Exception {
84+
if (indexReader != null) {
85+
this.indexReader.close();
86+
}
87+
if (this.directory != null) {
88+
this.directory.close();
89+
}
90+
}
91+
92+
public List<String> find(String search, Boolean showInternal, int count) throws IOException {
93+
return find(search, showInternal, FIELD_NAME, count, 0.0f, 2);
94+
}
95+
96+
public List<String> find(String search, Boolean showInternal, String sort, int count) throws IOException {
97+
return find(search, showInternal, sort, count, 0.0f, 2);
98+
}
99+
100+
public List<String> find(String search, Boolean showInternal, String sortField, int count, float minScore, int maxEdits) throws IOException {
101+
QueryParser queryParser = new QueryParser(FIELD_NAME, this.analyzer);
102+
queryParser.setDefaultOperator(QueryParser.Operator.AND);
103+
Query nameQuery = null;
104+
try {
105+
nameQuery = queryParser.parse(search);
106+
} catch (ParseException e) {
107+
throw new RuntimeException(e);
108+
}
109+
110+
Query internalFilter = new TermQuery(new Term(FIELD_INTERNAL, "true"));
111+
112+
BooleanQuery.Builder queryBuilder = new BooleanQuery.Builder();
113+
queryBuilder.add(nameQuery, BooleanClause.Occur.MUST);
114+
if (showInternal == null || !showInternal) {
115+
queryBuilder.add(internalFilter, BooleanClause.Occur.MUST_NOT);
116+
}
117+
118+
List<SortField> sortFields = new ArrayList<>();
119+
sortFields.add(SortField.FIELD_SCORE);
120+
if (!sortField.equals(FIELD_NAME)) {
121+
sortFields.add(new SortField(sortField, SortField.Type.INT, true));
122+
}
123+
124+
Sort sort = new Sort(sortFields.toArray(new SortField[0]));
125+
126+
TopDocs result = this.indexSearcher.search(queryBuilder.build(), count);
127+
128+
List<String> topics = new ArrayList<>();
129+
for (ScoreDoc scoreDoc : result.scoreDocs) {
130+
if (scoreDoc.score > minScore) {
131+
Document document = this.indexSearcher.storedFields().document(scoreDoc.doc);
132+
topics.add(document.get(FIELD_NAME_RAW));
133+
}
134+
}
135+
return topics;
136+
}
137+
}

api/src/main/java/io/kafbat/ui/service/metrics/scrape/ScrapedClusterState.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
import io.kafbat.ui.model.InternalLogDirStats;
99
import io.kafbat.ui.model.InternalPartitionsOffsets;
1010
import io.kafbat.ui.service.ReactiveAdminClient;
11+
import io.kafbat.ui.service.index.TopicsIndex;
1112
import jakarta.annotation.Nullable;
13+
import java.io.Closeable;
1214
import java.time.Instant;
1315
import java.util.HashMap;
1416
import java.util.List;
@@ -31,12 +33,20 @@
3133
@Builder(toBuilder = true)
3234
@RequiredArgsConstructor
3335
@Value
34-
public class ScrapedClusterState {
36+
public class ScrapedClusterState implements AutoCloseable {
3537

3638
Instant scrapeFinishedAt;
3739
Map<Integer, NodeState> nodesStates;
3840
Map<String, TopicState> topicStates;
3941
Map<String, ConsumerGroupState> consumerGroupsStates;
42+
TopicsIndex topicsIndex;
43+
44+
@Override
45+
public void close() throws Exception {
46+
if (this.topicsIndex != null) {
47+
this.topicsIndex.close();
48+
}
49+
}
4050

4151
public record NodeState(int id,
4252
Node node,

0 commit comments

Comments
 (0)