44import lombok .extern .log4j .Log4j2 ;
55
66import java .time .Duration ;
7+ import java .util .LinkedList ;
78import java .util .List ;
89import java .util .Map ;
910import java .util .Optional ;
1011import java .util .stream .Collectors ;
1112
13+ import org .apache .commons .lang3 .StringUtils ;
1214import org .apache .kafka .clients .consumer .ConsumerRecord ;
1315import org .apache .kafka .clients .consumer .ConsumerRecords ;
1416import org .apache .kafka .clients .consumer .KafkaConsumer ;
1517import org .apache .kafka .common .TopicPartition ;
1618import org .apache .kafka .common .utils .Bytes ;
1719import org .springframework .stereotype .Service ;
1820
21+ import com .fasterxml .jackson .databind .JsonNode ;
22+ import com .fasterxml .jackson .databind .ObjectMapper ;
1923import com .provectus .kafka .ui .cluster .deserialization .DeserializationService ;
2024import com .provectus .kafka .ui .cluster .deserialization .RecordDeserializer ;
2125import com .provectus .kafka .ui .cluster .model .ConsumerPosition ;
@@ -36,12 +40,12 @@ public class ConsumingService {
3640
3741 private static final int MAX_RECORD_LIMIT = 100 ;
3842 private static final int DEFAULT_RECORD_LIMIT = 20 ;
39- private static final int MAX_POLLS_COUNT = 30 ;
4043
4144 private final KafkaService kafkaService ;
4245 private final DeserializationService deserializationService ;
46+ private final ObjectMapper objectMapper = new ObjectMapper ();
4347
44- public Flux <TopicMessage > loadMessages (KafkaCluster cluster , String topic , ConsumerPosition consumerPosition , Integer limit ) {
48+ public Flux <TopicMessage > loadMessages (KafkaCluster cluster , String topic , ConsumerPosition consumerPosition , String query , Integer limit ) {
4549 int recordsLimit = Optional .ofNullable (limit )
4650 .map (s -> Math .min (s , MAX_RECORD_LIMIT ))
4751 .orElse (DEFAULT_RECORD_LIMIT );
@@ -50,12 +54,44 @@ public Flux<TopicMessage> loadMessages(KafkaCluster cluster, String topic, Consu
5054 return Flux .create (emitter ::emit )
5155 .subscribeOn (Schedulers .boundedElastic ())
5256 .map (r -> ClusterUtil .mapToTopicMessage (r , recordDeserializer ))
57+ .filter (m -> filterTopicMessage (m , query ))
5358 .limitRequest (recordsLimit );
5459 }
5560
61+ private boolean filterTopicMessage (TopicMessage message , String query ) {
62+ if (StringUtils .isEmpty (query )) {
63+ return true ;
64+ }
65+
66+ Object content = message .getContent ();
67+ JsonNode tree = objectMapper .valueToTree (content );
68+ return treeContainsValue (tree , query );
69+ }
70+
71+ private boolean treeContainsValue (JsonNode tree , String query ) {
72+ LinkedList <JsonNode > nodesForSearch = new LinkedList <>();
73+ nodesForSearch .add (tree );
74+
75+ while (!nodesForSearch .isEmpty ()) {
76+ JsonNode node = nodesForSearch .removeFirst ();
77+
78+ if (node .isContainerNode ()) {
79+ node .elements ().forEachRemaining (nodesForSearch ::add );
80+ continue ;
81+ }
82+
83+ String nodeValue = node .asText ();
84+ if (nodeValue .contains (query )) {
85+ return true ;
86+ }
87+ }
88+
89+ return false ;
90+ }
91+
5692 @ RequiredArgsConstructor
5793 private static class RecordEmitter {
58-
94+ private static final int MAX_POLLS_COUNT = 30 ;
5995 private static final Duration POLL_TIMEOUT_MS = Duration .ofMillis (1000L );
6096
6197 private final KafkaService kafkaService ;
0 commit comments