From 32559d423a4bc015c823dfb3051cd53abba36bcb Mon Sep 17 00:00:00 2001 From: ndanilin Date: Thu, 26 Dec 2024 21:29:35 +0300 Subject: [PATCH 01/14] fixed swagger --- pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pom.xml b/pom.xml index c21b8885..162110e6 100644 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,10 @@ org.slf4j slf4j-log4j12 + + io.swagger.core.v3 + swagger-annotations + From cbfb3c03a06f245795714013ea1963a21dcfa939 Mon Sep 17 00:00:00 2001 From: ndanilin Date: Mon, 30 Dec 2024 21:31:13 +0300 Subject: [PATCH 02/14] 1. fixed duplicated messages for viewAllMessages 2. added getAllMessages (json api for viewAllMessages) --- .../kafdrop/controller/MessageController.java | 76 +++++++++++-------- 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index d3542f7d..8bc4864f 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -32,49 +32,28 @@ import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties; import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties; import kafdrop.form.SearchMessageForm; +import kafdrop.model.CreateMessageVO; import kafdrop.model.MessageVO; import kafdrop.model.TopicPartitionVO; import kafdrop.model.TopicVO; import kafdrop.service.KafkaMonitor; import kafdrop.service.MessageInspector; import kafdrop.service.TopicNotFoundException; -import kafdrop.util.AvroMessageDeserializer; -import kafdrop.util.AvroMessageSerializer; -import kafdrop.util.DefaultMessageDeserializer; -import kafdrop.util.DefaultMessageSerializer; -import kafdrop.util.Deserializers; -import kafdrop.util.KeyFormat; -import kafdrop.util.MessageDeserializer; -import kafdrop.util.MessageFormat; -import kafdrop.util.MessageSerializer; -import kafdrop.util.MsgPackMessageDeserializer; -import kafdrop.util.MsgPackMessageSerializer; -import kafdrop.util.ProtobufMessageDeserializer; -import kafdrop.util.ProtobufMessageSerializer; -import kafdrop.util.ProtobufSchemaRegistryMessageDeserializer; - -import java.io.File; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; - - -import kafdrop.util.Serializers; +import kafdrop.util.*; import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.validation.BindingResult; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.ModelAttribute; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.server.ResponseStatusException; +import java.io.File; +import java.util.ArrayList; +import java.util.Comparator; import java.util.Date; - -import org.springframework.web.bind.annotation.PostMapping; -import kafdrop.model.CreateMessageVO; +import java.util.List; @Tag(name = "message-controller", description = "Message Controller") @Controller @@ -127,7 +106,7 @@ public String viewAllMessages(@PathVariable("name") String topicName, getDeserializer(topicName, defaultKeyFormat, "", "", protobufProperties.getParseAnyProto()), getDeserializer(topicName, defaultFormat, "", "", protobufProperties.getParseAnyProto())); - final List messages = messageInspector.getMessages(topicName, size, deserializers); + final List messages = new ArrayList<>(); for (TopicPartitionVO partition : topic.getPartitions()) { messages.addAll(messageInspector.getMessages(topicName, @@ -143,6 +122,40 @@ public String viewAllMessages(@PathVariable("name") String topicName, return "topic-messages"; } + @Operation(summary = "getAllMessages", description = "Get all messages from topic") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "Success"), + @ApiResponse(responseCode = "404", description = "Invalid topic name") + }) + @GetMapping(value = "/topic/{name:.+}/allmessages", produces = MediaType.APPLICATION_JSON_VALUE) + @ResponseBody + public List getAllMessages(@PathVariable("name") String topicName, + @RequestParam(name = "count", required = false) Integer count) { + final int size = (count != null ? count : 100); + final MessageFormat defaultFormat = messageFormatProperties.getFormat(); + final MessageFormat defaultKeyFormat = messageFormatProperties.getKeyFormat(); + final TopicVO topic = kafkaMonitor.getTopic(topicName) + .orElseThrow(() -> new TopicNotFoundException(topicName)); + + final var deserializers = new Deserializers( + getDeserializer(topicName, defaultKeyFormat, "", "", protobufProperties.getParseAnyProto()), + getDeserializer(topicName, defaultFormat, "", "", protobufProperties.getParseAnyProto())); + + final List messages = new ArrayList<>(); + + for (TopicPartitionVO partition : topic.getPartitions()) { + messages.addAll(messageInspector.getMessages(topicName, + partition.getId(), + partition.getFirstOffset(), + size, + deserializers)); + } + + messages.sort(Comparator.comparing(MessageVO::getTimestamp)); + + return messages; + } + /** * Human friendly view of reading messages. * @@ -588,6 +601,5 @@ public Boolean getIsAnyProto() { public void setIsAnyProto(Boolean isAnyProto) { this.isAnyProto = isAnyProto; } - } } From 2eeaf27c1040603702114ec0da7dab45387f6fb1 Mon Sep 17 00:00:00 2001 From: ndanilin Date: Fri, 10 Jan 2025 13:25:34 +0300 Subject: [PATCH 03/14] 1. fixed search by timestamp 2. added json api for messages search --- .../kafdrop/controller/MessageController.java | 80 +++++++++++++++++-- .../form/SearchMessageFormForJson.java | 41 ++++++++++ .../service/KafkaHighLevelConsumer.java | 11 ++- 3 files changed, 123 insertions(+), 9 deletions(-) create mode 100644 src/main/java/kafdrop/form/SearchMessageFormForJson.java diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 8bc4864f..44eb721f 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.enums.Explode; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponses; import io.swagger.v3.oas.annotations.tags.Tag; @@ -32,10 +34,8 @@ import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties; import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties; import kafdrop.form.SearchMessageForm; -import kafdrop.model.CreateMessageVO; -import kafdrop.model.MessageVO; -import kafdrop.model.TopicPartitionVO; -import kafdrop.model.TopicVO; +import kafdrop.form.SearchMessageFormForJson; +import kafdrop.model.*; import kafdrop.service.KafkaMonitor; import kafdrop.service.MessageInspector; import kafdrop.service.TopicNotFoundException; @@ -50,10 +50,7 @@ import org.springframework.web.server.ResponseStatusException; import java.io.File; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Date; -import java.util.List; +import java.util.*; @Tag(name = "message-controller", description = "Message Controller") @Controller @@ -122,6 +119,13 @@ public String viewAllMessages(@PathVariable("name") String topicName, return "topic-messages"; } + /** + * JSON array of reading all topic messages sorted by timestamp. + * + * @param topicName Name of topic + * @param count Count of messages + * @return JSON array for seeing all messages in a topic sorted by timestamp. + */ @Operation(summary = "getAllMessages", description = "Get all messages from topic") @ApiResponses(value = { @ApiResponse(responseCode = "200", description = "Success"), @@ -341,6 +345,66 @@ public String searchMessageForm(@PathVariable("name") String topicName, return "search-message"; } + /** + * + * @param topicName Name of topic + * @param searchMessageForm Message form for submitting requests to search messages (all fields are not required):
+ *  * searchText (default value = "")
+ *  * maximumCount default value = "1000")
+ *  * partition (default value = "-1")
+ *  * format (default value = "DEFAULT")
+ *  * keyFormat (default value = "DEFAULT")
+ *  * startTimestamp (default value = "1970-01-01 00:00:00.000") + * @param keys Keys to filter messages (not required) + * @param errors + * @return JSON array of found messages and completionDetails about search results + */ + @Operation(summary = "searchMessages", description = "Search messages and return results as JSON") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "Success"), + @ApiResponse(responseCode = "400", description = "Body has validation errors"), + @ApiResponse(responseCode = "404", description = "Invalid topic name") + }) + @GetMapping(value = "/topic/{name:.+}/search-messages", produces = MediaType.APPLICATION_JSON_VALUE) + @ResponseBody + public SearchResultsVO searchMessages(@PathVariable("name") String topicName, + @Valid @ModelAttribute SearchMessageFormForJson searchMessageForm, + @Parameter(description = "Keys to filter messages", explode = Explode.TRUE) + @RequestParam(name = "keys", required = false) String[] keys, + BindingResult errors) { + + if (errors.hasErrors()) throw new ResponseStatusException(HttpStatus.BAD_REQUEST, errors.getAllErrors().toString()); + + kafkaMonitor.getTopic(topicName) + .orElseThrow(() -> new TopicNotFoundException(topicName)); + + final var deserializers = new Deserializers( + getDeserializer(topicName, searchMessageForm.getKeyFormat(), null, null, + protobufProperties.getParseAnyProto()), + getDeserializer(topicName, searchMessageForm.getFormat(), null, null, + protobufProperties.getParseAnyProto()) + ); + + var searchResultsVO = kafkaMonitor.searchMessages( + topicName, + searchMessageForm.getSearchText(), + searchMessageForm.getPartition(), + searchMessageForm.getMaximumCount(), + searchMessageForm.getStartTimestamp(), + deserializers); + + if (keys != null) { + var filteredByKeyMessages = searchResultsVO.getMessages().stream() + .filter( + messageVO -> Arrays.asList(keys).contains(messageVO.getKey())) + .toList(); + + searchResultsVO.setMessages(filteredByKeyMessages); + } + + return searchResultsVO; + } + /** * Returns the selected message format based on the form submission * diff --git a/src/main/java/kafdrop/form/SearchMessageFormForJson.java b/src/main/java/kafdrop/form/SearchMessageFormForJson.java new file mode 100644 index 00000000..baf9aeae --- /dev/null +++ b/src/main/java/kafdrop/form/SearchMessageFormForJson.java @@ -0,0 +1,41 @@ +package kafdrop.form; + +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import kafdrop.util.MessageFormat; +import lombok.Data; +import org.springframework.format.annotation.DateTimeFormat; + +import java.util.Date; + +@Data +public class SearchMessageFormForJson { + @Schema(example = "Some search text") + private String searchText; + @Min(1) + @Max(1000) + @Schema(example = "1000") + private Integer maximumCount; + @Schema(example = "-1") + private Integer partition; + @Schema(example = "DEFAULT") + private MessageFormat format; + @Schema(example = "DEFAULT") + private MessageFormat keyFormat; + @Schema(type = "string", example = "1970-01-01 03:00:00.000") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss.SSS") + private Date startTimestamp; + + public SearchMessageFormForJson(String searchText, Integer maximumCount, Integer partition, MessageFormat format, + MessageFormat keyFormat, Date startTimestamp) { + this.searchText = (searchText == null) ? "" : searchText; + this.maximumCount = (maximumCount == null) ? 1000 : maximumCount; + this.partition = (partition == null) ? -1 : partition; + this.format = (format == null) ? MessageFormat.DEFAULT : format; + this.keyFormat = (keyFormat == null) ? MessageFormat.DEFAULT : keyFormat; + this.startTimestamp = (startTimestamp == null) ? new Date(0) : startTimestamp; + } +} diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index f9697553..2501d0af 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -219,7 +219,16 @@ synchronized SearchResults searchRecords(String topic, kafkaConsumer.assign(partitions); seekToTimestamp(partitions, startTimestamp); - return searchRecords(searchString, maximumCount, deserializers); + var records = searchRecords(searchString, maximumCount, deserializers); + var filteredByTimestampResults = records.getResults().stream() + .filter(result -> result.timestamp() >= startTimestamp.getTime()) + .toList(); + + return new SearchResults( + filteredByTimestampResults, + records.getCompletionReason(), + records.getFinalMessageTimestamp(), + records.getMessagesScannedCount()); } private void seekToTimestamp(List partitions, Date startTimestamp) { From ce5990aa9ca644cc14dbec359d95dbbf0dadd6fd Mon Sep 17 00:00:00 2001 From: ndanilin Date: Tue, 14 Jan 2025 10:54:05 +0300 Subject: [PATCH 04/14] 1. Added pattern to timestamp for json api (like in topic-messages.ftlh) 2. sorted messages in searchMessages --- .../kafdrop/config/ObjectMapperConfig.java | 19 +++++++++++++++++++ .../kafdrop/controller/MessageController.java | 4 +++- src/main/java/kafdrop/model/MessageVO.java | 3 +++ 3 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 src/main/java/kafdrop/config/ObjectMapperConfig.java diff --git a/src/main/java/kafdrop/config/ObjectMapperConfig.java b/src/main/java/kafdrop/config/ObjectMapperConfig.java new file mode 100644 index 00000000..e67f3933 --- /dev/null +++ b/src/main/java/kafdrop/config/ObjectMapperConfig.java @@ -0,0 +1,19 @@ +package kafdrop.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; + +import java.util.TimeZone; + +@Configuration +public class ObjectMapperConfig { + + @Bean + public ObjectMapper objectMapper(Jackson2ObjectMapperBuilder builder) { + return builder + .timeZone(TimeZone.getDefault()) + .build(); + } +} diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 44eb721f..6776a3a4 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -357,7 +357,7 @@ public String searchMessageForm(@PathVariable("name") String topicName, *  * startTimestamp (default value = "1970-01-01 00:00:00.000") * @param keys Keys to filter messages (not required) * @param errors - * @return JSON array of found messages and completionDetails about search results + * @return JSON array of found messages (sorted by timestamp) and completionDetails about search results */ @Operation(summary = "searchMessages", description = "Search messages and return results as JSON") @ApiResponses(value = { @@ -402,6 +402,8 @@ public SearchResultsVO searchMessages(@PathVariable("name") String topicName, searchResultsVO.setMessages(filteredByKeyMessages); } + searchResultsVO.getMessages().sort(Comparator.comparing(MessageVO::getTimestamp)); + return searchResultsVO; } diff --git a/src/main/java/kafdrop/model/MessageVO.java b/src/main/java/kafdrop/model/MessageVO.java index be7f9436..cca4f0eb 100644 --- a/src/main/java/kafdrop/model/MessageVO.java +++ b/src/main/java/kafdrop/model/MessageVO.java @@ -18,6 +18,8 @@ package kafdrop.model; +import com.fasterxml.jackson.annotation.JsonFormat; + import java.util.Date; import java.util.Map; import java.util.stream.Collectors; @@ -28,6 +30,7 @@ public final class MessageVO { private String message; private String key; private Map headers; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") private Date timestamp; public int getPartition() { From 3f472442a39cd489c8208ecc4fd3c83a8ec0719f Mon Sep 17 00:00:00 2001 From: ndanilin Date: Sun, 26 Jan 2025 21:33:30 +0300 Subject: [PATCH 05/14] fixed sort by keys (for searchMessages endpoint) --- src/main/java/kafdrop/controller/MessageController.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 6776a3a4..22150e6a 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -397,13 +397,12 @@ public SearchResultsVO searchMessages(@PathVariable("name") String topicName, var filteredByKeyMessages = searchResultsVO.getMessages().stream() .filter( messageVO -> Arrays.asList(keys).contains(messageVO.getKey())) + .sorted(Comparator.comparing(MessageVO::getTimestamp)) .toList(); searchResultsVO.setMessages(filteredByKeyMessages); } - searchResultsVO.getMessages().sort(Comparator.comparing(MessageVO::getTimestamp)); - return searchResultsVO; } From 3416223650810164d6cecd892a1eedae5615be98 Mon Sep 17 00:00:00 2001 From: ndanilin Date: Sat, 7 Jun 2025 22:17:38 +0300 Subject: [PATCH 06/14] fixed imports --- .../kafdrop/controller/MessageController.java | 35 ++++++++++++++++--- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 22150e6a..f9cde479 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -35,22 +35,49 @@ import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties; import kafdrop.form.SearchMessageForm; import kafdrop.form.SearchMessageFormForJson; -import kafdrop.model.*; +import kafdrop.model.CreateMessageVO; +import kafdrop.model.MessageVO; +import kafdrop.model.SearchResultsVO; +import kafdrop.model.TopicPartitionVO; +import kafdrop.model.TopicVO; import kafdrop.service.KafkaMonitor; import kafdrop.service.MessageInspector; import kafdrop.service.TopicNotFoundException; -import kafdrop.util.*; +import kafdrop.util.AvroMessageDeserializer; +import kafdrop.util.AvroMessageSerializer; +import kafdrop.util.DefaultMessageDeserializer; +import kafdrop.util.DefaultMessageSerializer; +import kafdrop.util.Deserializers; +import kafdrop.util.KeyFormat; +import kafdrop.util.MessageDeserializer; +import kafdrop.util.MessageFormat; +import kafdrop.util.MessageSerializer; +import kafdrop.util.MsgPackMessageDeserializer; +import kafdrop.util.MsgPackMessageSerializer; +import kafdrop.util.ProtobufMessageDeserializer; +import kafdrop.util.ProtobufMessageSerializer; +import kafdrop.util.ProtobufSchemaRegistryMessageDeserializer; +import kafdrop.util.Serializers; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.validation.BindingResult; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.ModelAttribute; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.server.ResponseStatusException; import java.io.File; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Date; +import java.util.List; @Tag(name = "message-controller", description = "Message Controller") @Controller From 8d4bad336748f7570805a710c1b2aac703fc4564 Mon Sep 17 00:00:00 2001 From: Bert Roos Date: Thu, 3 Jul 2025 21:53:38 +0200 Subject: [PATCH 07/14] Deduplicated SearchMessagForm related code --- .../kafdrop/controller/MessageController.java | 76 +++++++------------ .../java/kafdrop/form/SearchMessageForm.java | 74 +----------------- .../form/SearchMessageFormForJson.java | 66 ++++++++++++++++ 3 files changed, 96 insertions(+), 120 deletions(-) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index f9cde479..9ac53837 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -126,22 +126,7 @@ public String viewAllMessages(@PathVariable("name") String topicName, model.addAttribute("keyFormats", KeyFormat.values()); model.addAttribute("descFiles", protobufProperties.getDescFilesList()); - final var deserializers = new Deserializers( - getDeserializer(topicName, defaultKeyFormat, "", "", protobufProperties.getParseAnyProto()), - getDeserializer(topicName, defaultFormat, "", "", protobufProperties.getParseAnyProto())); - - final List messages = new ArrayList<>(); - - for (TopicPartitionVO partition : topic.getPartitions()) { - messages.addAll(messageInspector.getMessages(topicName, - partition.getId(), - partition.getFirstOffset(), - size, - deserializers)); - } - - messages.sort(Comparator.comparing(MessageVO::getTimestamp)); - model.addAttribute("messages", messages); + model.addAttribute("messages", getMessages(topicName, defaultKeyFormat, defaultFormat, topic, size)); return "topic-messages"; } @@ -168,6 +153,10 @@ public List getAllMessages(@PathVariable("name") String topicName, final TopicVO topic = kafkaMonitor.getTopic(topicName) .orElseThrow(() -> new TopicNotFoundException(topicName)); + return getMessages(topicName, defaultKeyFormat, defaultFormat, topic, size); + } + + private @org.jetbrains.annotations.NotNull List getMessages(String topicName, MessageFormat defaultKeyFormat, MessageFormat defaultFormat, TopicVO topic, int size) { final var deserializers = new Deserializers( getDeserializer(topicName, defaultKeyFormat, "", "", protobufProperties.getParseAnyProto()), getDeserializer(topicName, defaultFormat, "", "", protobufProperties.getParseAnyProto())); @@ -183,7 +172,6 @@ public List getAllMessages(@PathVariable("name") String topicName, } messages.sort(Comparator.comparing(MessageVO::getTimestamp)); - return messages; } @@ -347,23 +335,8 @@ public String searchMessageForm(@PathVariable("name") String topicName, model.addAttribute("descFiles", protobufProperties.getDescFilesList()); if (!searchMessageForm.isEmpty() && !errors.hasErrors()) { - - final var deserializers = new Deserializers( - getDeserializer(topicName, searchMessageForm.getKeyFormat(), searchMessageForm.getDescFile(), - searchMessageForm.getMsgTypeName(), - protobufProperties.getParseAnyProto()), - getDeserializer(topicName, searchMessageForm.getFormat(), searchMessageForm.getDescFile(), - searchMessageForm.getMsgTypeName(), - protobufProperties.getParseAnyProto()) - ); - - var searchResults = kafkaMonitor.searchMessages( - topicName, - searchMessageForm.getSearchText(), - searchMessageForm.getPartition(), - searchMessageForm.getMaximumCount(), - searchMessageForm.getStartTimestamp(), - deserializers); + final var searchResults = searchMessageVOs(topicName, searchMessageForm, searchMessageForm.getDescFile(), + searchMessageForm.getMsgTypeName()); model.addAttribute("messages", searchResults.getMessages()); model.addAttribute("details", searchResults.getCompletionDetails()); @@ -372,6 +345,26 @@ public String searchMessageForm(@PathVariable("name") String topicName, return "search-message"; } + private SearchResultsVO searchMessageVOs(String topicName, SearchMessageFormForJson searchMessageForm, String descFile, + String msgTypeName) { + final var deserializers = new Deserializers( + getDeserializer(topicName, searchMessageForm.getKeyFormat(), descFile, + msgTypeName, + protobufProperties.getParseAnyProto()), + getDeserializer(topicName, searchMessageForm.getFormat(), descFile, + msgTypeName, + protobufProperties.getParseAnyProto()) + ); + + return kafkaMonitor.searchMessages( + topicName, + searchMessageForm.getSearchText(), + searchMessageForm.getPartition(), + searchMessageForm.getMaximumCount(), + searchMessageForm.getStartTimestamp(), + deserializers); + } + /** * * @param topicName Name of topic @@ -405,20 +398,7 @@ public SearchResultsVO searchMessages(@PathVariable("name") String topicName, kafkaMonitor.getTopic(topicName) .orElseThrow(() -> new TopicNotFoundException(topicName)); - final var deserializers = new Deserializers( - getDeserializer(topicName, searchMessageForm.getKeyFormat(), null, null, - protobufProperties.getParseAnyProto()), - getDeserializer(topicName, searchMessageForm.getFormat(), null, null, - protobufProperties.getParseAnyProto()) - ); - - var searchResultsVO = kafkaMonitor.searchMessages( - topicName, - searchMessageForm.getSearchText(), - searchMessageForm.getPartition(), - searchMessageForm.getMaximumCount(), - searchMessageForm.getStartTimestamp(), - deserializers); + final var searchResultsVO = searchMessageVOs(topicName, searchMessageForm, null, null); if (keys != null) { var filteredByKeyMessages = searchResultsVO.getMessages().stream() diff --git a/src/main/java/kafdrop/form/SearchMessageForm.java b/src/main/java/kafdrop/form/SearchMessageForm.java index 27564633..b127033b 100644 --- a/src/main/java/kafdrop/form/SearchMessageForm.java +++ b/src/main/java/kafdrop/form/SearchMessageForm.java @@ -10,40 +10,14 @@ import java.util.Date; -public class SearchMessageForm { - - @NotBlank - private String searchText; - - @NotNull - @Min(1) - @Max(1000) - private Integer maximumCount; - - private Integer partition; - - private MessageFormat format; - - private MessageFormat keyFormat; +public class SearchMessageForm extends SearchMessageFormForJson { private String descFile; private String msgTypeName; - @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") - private Date startTimestamp; - public SearchMessageForm(String searchText, MessageFormat format) { - this.searchText = searchText; - this.format = format; - } - - public Date getStartTimestamp() { - return startTimestamp; - } - - public void setStartTimestamp(Date startTimestamp) { - this.startTimestamp = startTimestamp; + super (searchText, format); } public SearchMessageForm(String searchText) { @@ -53,43 +27,6 @@ public SearchMessageForm(String searchText) { public SearchMessageForm() { } - @JsonIgnore - public boolean isEmpty() { - return searchText == null || searchText.isEmpty(); - } - - public String getSearchText() { - return searchText; - } - - public void setSearchText(String searchText) { - this.searchText = searchText; - } - - public Integer getMaximumCount() { - return maximumCount; - } - - public void setMaximumCount(Integer maximumCount) { - this.maximumCount = maximumCount; - } - - public MessageFormat getKeyFormat() { - return keyFormat; - } - - public void setKeyFormat(MessageFormat keyFormat) { - this.keyFormat = keyFormat; - } - - public MessageFormat getFormat() { - return format; - } - - public void setFormat(MessageFormat format) { - this.format = format; - } - public String getDescFile() { return descFile; } @@ -106,11 +43,4 @@ public void setMsgTypeName(String msgTypeName) { this.msgTypeName = msgTypeName; } - public Integer getPartition() { - return partition; - } - - public void setPartition(Integer partition) { - this.partition = partition; - } } diff --git a/src/main/java/kafdrop/form/SearchMessageFormForJson.java b/src/main/java/kafdrop/form/SearchMessageFormForJson.java index baf9aeae..ac991ea4 100644 --- a/src/main/java/kafdrop/form/SearchMessageFormForJson.java +++ b/src/main/java/kafdrop/form/SearchMessageFormForJson.java @@ -1,9 +1,12 @@ package kafdrop.form; import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonIgnore; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.Max; import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; import kafdrop.util.MessageFormat; import lombok.Data; import org.springframework.format.annotation.DateTimeFormat; @@ -12,8 +15,10 @@ @Data public class SearchMessageFormForJson { + @NotBlank @Schema(example = "Some search text") private String searchText; + @NotNull @Min(1) @Max(1000) @Schema(example = "1000") @@ -29,6 +34,14 @@ public class SearchMessageFormForJson { @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss.SSS") private Date startTimestamp; + protected SearchMessageFormForJson() { + // Default constructor + } + protected SearchMessageFormForJson(String searchText, MessageFormat format) { + this.searchText = searchText; + this.format = format; + } + public SearchMessageFormForJson(String searchText, Integer maximumCount, Integer partition, MessageFormat format, MessageFormat keyFormat, Date startTimestamp) { this.searchText = (searchText == null) ? "" : searchText; @@ -38,4 +51,57 @@ public SearchMessageFormForJson(String searchText, Integer maximumCount, Integer this.keyFormat = (keyFormat == null) ? MessageFormat.DEFAULT : keyFormat; this.startTimestamp = (startTimestamp == null) ? new Date(0) : startTimestamp; } + + @JsonIgnore + public boolean isEmpty() { + return searchText == null || searchText.isEmpty(); + } + + public String getSearchText() { + return searchText; + } + + public void setSearchText(String searchText) { + this.searchText = searchText; + } + + public Integer getMaximumCount() { + return maximumCount; + } + + public void setMaximumCount(Integer maximumCount) { + this.maximumCount = maximumCount; + } + + public Integer getPartition() { + return partition; + } + + public void setPartition(Integer partition) { + this.partition = partition; + } + + public MessageFormat getFormat() { + return format; + } + + public void setFormat(MessageFormat format) { + this.format = format; + } + + public MessageFormat getKeyFormat() { + return keyFormat; + } + + public void setKeyFormat(MessageFormat keyFormat) { + this.keyFormat = keyFormat; + } + + public Date getStartTimestamp() { + return startTimestamp; + } + + public void setStartTimestamp(Date startTimestamp) { + this.startTimestamp = startTimestamp; + } } From 72334b92cdf880da7e8cc6416ce9c0388a5ef9df Mon Sep 17 00:00:00 2001 From: Bert Roos Date: Thu, 3 Jul 2025 21:57:42 +0200 Subject: [PATCH 08/14] Shortened too long lines --- .../java/kafdrop/controller/MessageController.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 9ac53837..fd2b3440 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -156,7 +156,10 @@ public List getAllMessages(@PathVariable("name") String topicName, return getMessages(topicName, defaultKeyFormat, defaultFormat, topic, size); } - private @org.jetbrains.annotations.NotNull List getMessages(String topicName, MessageFormat defaultKeyFormat, MessageFormat defaultFormat, TopicVO topic, int size) { + private @org.jetbrains.annotations.NotNull List getMessages(String topicName, + MessageFormat defaultKeyFormat, + MessageFormat defaultFormat, TopicVO topic, + int size) { final var deserializers = new Deserializers( getDeserializer(topicName, defaultKeyFormat, "", "", protobufProperties.getParseAnyProto()), getDeserializer(topicName, defaultFormat, "", "", protobufProperties.getParseAnyProto())); @@ -345,8 +348,9 @@ public String searchMessageForm(@PathVariable("name") String topicName, return "search-message"; } - private SearchResultsVO searchMessageVOs(String topicName, SearchMessageFormForJson searchMessageForm, String descFile, - String msgTypeName) { + private SearchResultsVO searchMessageVOs(String topicName, SearchMessageFormForJson searchMessageForm, + String descFile, + String msgTypeName) { final var deserializers = new Deserializers( getDeserializer(topicName, searchMessageForm.getKeyFormat(), descFile, msgTypeName, From a2ff1e9d2f46db1a744cb1c47466cdc67720480a Mon Sep 17 00:00:00 2001 From: ndanilin Date: Thu, 10 Jul 2025 16:08:08 +0300 Subject: [PATCH 09/14] fixed date format --- .../java/kafdrop/form/SearchMessageForm.java | 28 +++++++++++++------ .../form/SearchMessageFormForJson.java | 7 ++--- src/main/java/kafdrop/model/MessageVO.java | 3 -- .../resources/templates/search-message.ftlh | 4 +-- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/main/java/kafdrop/form/SearchMessageForm.java b/src/main/java/kafdrop/form/SearchMessageForm.java index b127033b..b5c0674e 100644 --- a/src/main/java/kafdrop/form/SearchMessageForm.java +++ b/src/main/java/kafdrop/form/SearchMessageForm.java @@ -1,23 +1,18 @@ package kafdrop.form; -import com.fasterxml.jackson.annotation.JsonIgnore; -import jakarta.validation.constraints.Max; -import jakarta.validation.constraints.Min; -import jakarta.validation.constraints.NotBlank; -import jakarta.validation.constraints.NotNull; import kafdrop.util.MessageFormat; -import org.springframework.format.annotation.DateTimeFormat; -import java.util.Date; +import java.text.ParseException; +import java.text.SimpleDateFormat; public class SearchMessageForm extends SearchMessageFormForJson { private String descFile; - private String msgTypeName; + private final SimpleDateFormat UI_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); public SearchMessageForm(String searchText, MessageFormat format) { - super (searchText, format); + super(searchText, format); } public SearchMessageForm(String searchText) { @@ -43,4 +38,19 @@ public void setMsgTypeName(String msgTypeName) { this.msgTypeName = msgTypeName; } + public String getStartTimestampUi() { + if (super.getStartTimestamp() == null) { + return ""; + } + return UI_DATE_FORMAT.format(super.getStartTimestamp()); + } + + public void setStartTimestampUi(String value) throws ParseException { + if (value == null || value.trim().isEmpty()) { + super.setStartTimestamp(null); + } else { + super.setStartTimestamp(UI_DATE_FORMAT.parse(value)); + } + } + } diff --git a/src/main/java/kafdrop/form/SearchMessageFormForJson.java b/src/main/java/kafdrop/form/SearchMessageFormForJson.java index ac991ea4..776e9fdc 100644 --- a/src/main/java/kafdrop/form/SearchMessageFormForJson.java +++ b/src/main/java/kafdrop/form/SearchMessageFormForJson.java @@ -1,6 +1,5 @@ package kafdrop.form; -import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIgnore; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.Max; @@ -29,14 +28,14 @@ public class SearchMessageFormForJson { private MessageFormat format; @Schema(example = "DEFAULT") private MessageFormat keyFormat; - @Schema(type = "string", example = "1970-01-01 03:00:00.000") - @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss.SSS") + @Schema(type = "string", example = "1970-01-01T00:00:00.000+00:00") + @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) private Date startTimestamp; protected SearchMessageFormForJson() { // Default constructor } + protected SearchMessageFormForJson(String searchText, MessageFormat format) { this.searchText = searchText; this.format = format; diff --git a/src/main/java/kafdrop/model/MessageVO.java b/src/main/java/kafdrop/model/MessageVO.java index cca4f0eb..be7f9436 100644 --- a/src/main/java/kafdrop/model/MessageVO.java +++ b/src/main/java/kafdrop/model/MessageVO.java @@ -18,8 +18,6 @@ package kafdrop.model; -import com.fasterxml.jackson.annotation.JsonFormat; - import java.util.Date; import java.util.Map; import java.util.stream.Collectors; @@ -30,7 +28,6 @@ public final class MessageVO { private String message; private String key; private Map headers; - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") private Date timestamp; public int getPartition() { diff --git a/src/main/resources/templates/search-message.ftlh b/src/main/resources/templates/search-message.ftlh index 909960e1..fd7fc69b 100644 --- a/src/main/resources/templates/search-message.ftlh +++ b/src/main/resources/templates/search-message.ftlh @@ -87,10 +87,10 @@ - <@spring.bind path="searchMessageForm.startTimestamp"/> + <@spring.bind path="searchMessageForm.startTimestampUi"/>
- <@spring.formInput path="searchMessageForm.startTimestamp" attributes='pattern="[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]+" class="form-control ${spring.status.error?string("has-error", "")}" size="30"'/> + <@spring.formInput path="searchMessageForm.startTimestampUi" attributes='pattern="[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]+" class="form-control ${spring.status.error?string("has-error", "")}" size="30"'/> yyyy-MM-dd HH:mm:ss.SSS <#if spring.status.error> <@spring.showErrors "
"/>
From c03037a42fcdc540e64bc85f2bca3d5c07a803f4 Mon Sep 17 00:00:00 2001 From: ndanilin Date: Fri, 11 Jul 2025 11:35:39 +0300 Subject: [PATCH 10/14] Revert "fixed date format" This reverts commit a2ff1e9d2f46db1a744cb1c47466cdc67720480a. --- .../java/kafdrop/form/SearchMessageForm.java | 28 ++++++------------- .../form/SearchMessageFormForJson.java | 7 +++-- src/main/java/kafdrop/model/MessageVO.java | 3 ++ .../resources/templates/search-message.ftlh | 4 +-- 4 files changed, 18 insertions(+), 24 deletions(-) diff --git a/src/main/java/kafdrop/form/SearchMessageForm.java b/src/main/java/kafdrop/form/SearchMessageForm.java index b5c0674e..b127033b 100644 --- a/src/main/java/kafdrop/form/SearchMessageForm.java +++ b/src/main/java/kafdrop/form/SearchMessageForm.java @@ -1,18 +1,23 @@ package kafdrop.form; +import com.fasterxml.jackson.annotation.JsonIgnore; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; import kafdrop.util.MessageFormat; +import org.springframework.format.annotation.DateTimeFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; +import java.util.Date; public class SearchMessageForm extends SearchMessageFormForJson { private String descFile; + private String msgTypeName; - private final SimpleDateFormat UI_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); public SearchMessageForm(String searchText, MessageFormat format) { - super(searchText, format); + super (searchText, format); } public SearchMessageForm(String searchText) { @@ -38,19 +43,4 @@ public void setMsgTypeName(String msgTypeName) { this.msgTypeName = msgTypeName; } - public String getStartTimestampUi() { - if (super.getStartTimestamp() == null) { - return ""; - } - return UI_DATE_FORMAT.format(super.getStartTimestamp()); - } - - public void setStartTimestampUi(String value) throws ParseException { - if (value == null || value.trim().isEmpty()) { - super.setStartTimestamp(null); - } else { - super.setStartTimestamp(UI_DATE_FORMAT.parse(value)); - } - } - } diff --git a/src/main/java/kafdrop/form/SearchMessageFormForJson.java b/src/main/java/kafdrop/form/SearchMessageFormForJson.java index 776e9fdc..ac991ea4 100644 --- a/src/main/java/kafdrop/form/SearchMessageFormForJson.java +++ b/src/main/java/kafdrop/form/SearchMessageFormForJson.java @@ -1,5 +1,6 @@ package kafdrop.form; +import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIgnore; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.Max; @@ -28,14 +29,14 @@ public class SearchMessageFormForJson { private MessageFormat format; @Schema(example = "DEFAULT") private MessageFormat keyFormat; - @Schema(type = "string", example = "1970-01-01T00:00:00.000+00:00") - @DateTimeFormat(iso = DateTimeFormat.ISO.DATE_TIME) + @Schema(type = "string", example = "1970-01-01 03:00:00.000") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss.SSS") private Date startTimestamp; protected SearchMessageFormForJson() { // Default constructor } - protected SearchMessageFormForJson(String searchText, MessageFormat format) { this.searchText = searchText; this.format = format; diff --git a/src/main/java/kafdrop/model/MessageVO.java b/src/main/java/kafdrop/model/MessageVO.java index be7f9436..cca4f0eb 100644 --- a/src/main/java/kafdrop/model/MessageVO.java +++ b/src/main/java/kafdrop/model/MessageVO.java @@ -18,6 +18,8 @@ package kafdrop.model; +import com.fasterxml.jackson.annotation.JsonFormat; + import java.util.Date; import java.util.Map; import java.util.stream.Collectors; @@ -28,6 +30,7 @@ public final class MessageVO { private String message; private String key; private Map headers; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") private Date timestamp; public int getPartition() { diff --git a/src/main/resources/templates/search-message.ftlh b/src/main/resources/templates/search-message.ftlh index fd7fc69b..909960e1 100644 --- a/src/main/resources/templates/search-message.ftlh +++ b/src/main/resources/templates/search-message.ftlh @@ -87,10 +87,10 @@
- <@spring.bind path="searchMessageForm.startTimestampUi"/> + <@spring.bind path="searchMessageForm.startTimestamp"/>
- <@spring.formInput path="searchMessageForm.startTimestampUi" attributes='pattern="[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]+" class="form-control ${spring.status.error?string("has-error", "")}" size="30"'/> + <@spring.formInput path="searchMessageForm.startTimestamp" attributes='pattern="[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}.[0-9]+" class="form-control ${spring.status.error?string("has-error", "")}" size="30"'/> yyyy-MM-dd HH:mm:ss.SSS <#if spring.status.error> <@spring.showErrors "
"/>
From f0a3f5bd4de391839690ab2088b5de9ef874ee2b Mon Sep 17 00:00:00 2001 From: ndanilin Date: Fri, 11 Jul 2025 14:31:28 +0300 Subject: [PATCH 11/14] fix date format by @JsonFormat --- .../kafdrop/controller/MessageController.java | 76 +++++++------------ .../form/SearchMessageFormForJson.java | 7 +- src/main/java/kafdrop/model/MessageVO.java | 2 +- 3 files changed, 33 insertions(+), 52 deletions(-) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index fd2b3440..9582b511 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -35,49 +35,22 @@ import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties; import kafdrop.form.SearchMessageForm; import kafdrop.form.SearchMessageFormForJson; -import kafdrop.model.CreateMessageVO; -import kafdrop.model.MessageVO; -import kafdrop.model.SearchResultsVO; -import kafdrop.model.TopicPartitionVO; -import kafdrop.model.TopicVO; +import kafdrop.model.*; import kafdrop.service.KafkaMonitor; import kafdrop.service.MessageInspector; import kafdrop.service.TopicNotFoundException; -import kafdrop.util.AvroMessageDeserializer; -import kafdrop.util.AvroMessageSerializer; -import kafdrop.util.DefaultMessageDeserializer; -import kafdrop.util.DefaultMessageSerializer; -import kafdrop.util.Deserializers; -import kafdrop.util.KeyFormat; -import kafdrop.util.MessageDeserializer; -import kafdrop.util.MessageFormat; -import kafdrop.util.MessageSerializer; -import kafdrop.util.MsgPackMessageDeserializer; -import kafdrop.util.MsgPackMessageSerializer; -import kafdrop.util.ProtobufMessageDeserializer; -import kafdrop.util.ProtobufMessageSerializer; -import kafdrop.util.ProtobufSchemaRegistryMessageDeserializer; -import kafdrop.util.Serializers; +import kafdrop.util.*; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.validation.BindingResult; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.ModelAttribute; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.*; import org.springframework.web.server.ResponseStatusException; import java.io.File; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.Date; -import java.util.List; +import java.util.*; @Tag(name = "message-controller", description = "Message Controller") @Controller @@ -135,7 +108,7 @@ public String viewAllMessages(@PathVariable("name") String topicName, * JSON array of reading all topic messages sorted by timestamp. * * @param topicName Name of topic - * @param count Count of messages + * @param count Count of messages * @return JSON array for seeing all messages in a topic sorted by timestamp. */ @Operation(summary = "getAllMessages", description = "Get all messages from topic") @@ -370,31 +343,36 @@ private SearchResultsVO searchMessageVOs(String topicName, SearchMessageFormForJ } /** + * Searches for messages in a specific topic based on criteria provided in the request body. + * This endpoint expects a POST request with a JSON payload. * - * @param topicName Name of topic - * @param searchMessageForm Message form for submitting requests to search messages (all fields are not required):
- *  * searchText (default value = "")
- *  * maximumCount default value = "1000")
- *  * partition (default value = "-1")
- *  * format (default value = "DEFAULT")
- *  * keyFormat (default value = "DEFAULT")
- *  * startTimestamp (default value = "1970-01-01 00:00:00.000") - * @param keys Keys to filter messages (not required) - * @param errors - * @return JSON array of found messages (sorted by timestamp) and completionDetails about search results + * @param topicName The name of the topic to search within. + * @param searchMessageForm A JSON object in the request body containing the search criteria. All fields are optional. + *
    + *
  • searchText: Text to search for in the message payload. (Default: "")
  • + *
  • maximumCount: Maximum number of messages to return. (Default: 1000)
  • + *
  • partition: Specific partition to search in. (Default: -1 for all partitions)
  • + *
  • format: Deserialization format for the message value. (Default: DEFAULT)
  • + *
  • keyFormat: Deserialization format for the message key. (Default: DEFAULT)
  • + *
  • startTimestamp: Start timestamp in ISO 8601 UTC format. (Example: {@code 1970-01-01T00:00:00.000Z})
  • + *
  • keys: An array of message keys to filter by. (Example: {@code ["key1", "key2"]})
  • + *
+ * @param errors BindingResult for validation, automatically populated by Spring. + * @return A {@link SearchResultsVO} object containing the found messages (sorted by timestamp) and search completion details. */ + @Operation(summary = "searchMessages", description = "Search messages and return results as JSON") @ApiResponses(value = { @ApiResponse(responseCode = "200", description = "Success"), @ApiResponse(responseCode = "400", description = "Body has validation errors"), @ApiResponse(responseCode = "404", description = "Invalid topic name") }) - @GetMapping(value = "/topic/{name:.+}/search-messages", produces = MediaType.APPLICATION_JSON_VALUE) + @PostMapping(value = "/topic/{name:.+}/search-messages", + produces = MediaType.APPLICATION_JSON_VALUE, + consumes = MediaType.APPLICATION_JSON_VALUE) @ResponseBody public SearchResultsVO searchMessages(@PathVariable("name") String topicName, - @Valid @ModelAttribute SearchMessageFormForJson searchMessageForm, - @Parameter(description = "Keys to filter messages", explode = Explode.TRUE) - @RequestParam(name = "keys", required = false) String[] keys, + @Valid @RequestBody SearchMessageFormForJson searchMessageForm, BindingResult errors) { if (errors.hasErrors()) throw new ResponseStatusException(HttpStatus.BAD_REQUEST, errors.getAllErrors().toString()); @@ -404,10 +382,10 @@ public SearchResultsVO searchMessages(@PathVariable("name") String topicName, final var searchResultsVO = searchMessageVOs(topicName, searchMessageForm, null, null); - if (keys != null) { + if (searchMessageForm.getKeys() != null) { var filteredByKeyMessages = searchResultsVO.getMessages().stream() .filter( - messageVO -> Arrays.asList(keys).contains(messageVO.getKey())) + messageVO -> Arrays.asList(searchMessageForm.getKeys()).contains(messageVO.getKey())) .sorted(Comparator.comparing(MessageVO::getTimestamp)) .toList(); diff --git a/src/main/java/kafdrop/form/SearchMessageFormForJson.java b/src/main/java/kafdrop/form/SearchMessageFormForJson.java index ac991ea4..f9f99334 100644 --- a/src/main/java/kafdrop/form/SearchMessageFormForJson.java +++ b/src/main/java/kafdrop/form/SearchMessageFormForJson.java @@ -29,14 +29,17 @@ public class SearchMessageFormForJson { private MessageFormat format; @Schema(example = "DEFAULT") private MessageFormat keyFormat; - @Schema(type = "string", example = "1970-01-01 03:00:00.000") + @Schema(type = "string", example = "1970-01-01T00:00:00.000Z") @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss.SSS") + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", timezone = "UTC") private Date startTimestamp; + @Schema(description = "Keys to filter messages", example = "[\"key1\", \"key2\"]") + private String[] keys; protected SearchMessageFormForJson() { // Default constructor } + protected SearchMessageFormForJson(String searchText, MessageFormat format) { this.searchText = searchText; this.format = format; diff --git a/src/main/java/kafdrop/model/MessageVO.java b/src/main/java/kafdrop/model/MessageVO.java index cca4f0eb..4a630107 100644 --- a/src/main/java/kafdrop/model/MessageVO.java +++ b/src/main/java/kafdrop/model/MessageVO.java @@ -30,7 +30,7 @@ public final class MessageVO { private String message; private String key; private Map headers; - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", timezone = "UTC") private Date timestamp; public int getPartition() { From b9953caeaae42aa6d481a2a027a35369e3d056a5 Mon Sep 17 00:00:00 2001 From: Bert Roos Date: Sat, 12 Jul 2025 10:18:23 +0200 Subject: [PATCH 12/14] Corrected imports --- .../kafdrop/controller/MessageController.java | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 9582b511..f7942ee0 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -21,8 +21,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import io.swagger.v3.oas.annotations.Operation; -import io.swagger.v3.oas.annotations.Parameter; -import io.swagger.v3.oas.annotations.enums.Explode; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponses; import io.swagger.v3.oas.annotations.tags.Tag; @@ -35,22 +33,50 @@ import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties; import kafdrop.form.SearchMessageForm; import kafdrop.form.SearchMessageFormForJson; -import kafdrop.model.*; +import kafdrop.model.CreateMessageVO; +import kafdrop.model.MessageVO; +import kafdrop.model.SearchResultsVO; +import kafdrop.model.TopicPartitionVO; +import kafdrop.model.TopicVO; import kafdrop.service.KafkaMonitor; import kafdrop.service.MessageInspector; import kafdrop.service.TopicNotFoundException; -import kafdrop.util.*; +import kafdrop.util.AvroMessageDeserializer; +import kafdrop.util.AvroMessageSerializer; +import kafdrop.util.DefaultMessageDeserializer; +import kafdrop.util.DefaultMessageSerializer; +import kafdrop.util.Deserializers; +import kafdrop.util.KeyFormat; +import kafdrop.util.MessageDeserializer; +import kafdrop.util.MessageFormat; +import kafdrop.util.MessageSerializer; +import kafdrop.util.MsgPackMessageDeserializer; +import kafdrop.util.MsgPackMessageSerializer; +import kafdrop.util.ProtobufMessageDeserializer; +import kafdrop.util.ProtobufMessageSerializer; +import kafdrop.util.ProtobufSchemaRegistryMessageDeserializer; +import kafdrop.util.Serializers; import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.validation.BindingResult; -import org.springframework.web.bind.annotation.*; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.ModelAttribute; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.server.ResponseStatusException; import java.io.File; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Date; +import java.util.List; @Tag(name = "message-controller", description = "Message Controller") @Controller From 360594044fd9d0f10e54cf6889f1b91388f7c42e Mon Sep 17 00:00:00 2001 From: Bert Roos Date: Sat, 12 Jul 2025 10:24:22 +0200 Subject: [PATCH 13/14] Wrap long lines to comply with linter constraints --- .../kafdrop/controller/MessageController.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index f7942ee0..d69b8928 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -377,11 +377,16 @@ private SearchResultsVO searchMessageVOs(String topicName, SearchMessageFormForJ *
    *
  • searchText: Text to search for in the message payload. (Default: "")
  • *
  • maximumCount: Maximum number of messages to return. (Default: 1000)
  • - *
  • partition: Specific partition to search in. (Default: -1 for all partitions)
  • - *
  • format: Deserialization format for the message value. (Default: DEFAULT)
  • - *
  • keyFormat: Deserialization format for the message key. (Default: DEFAULT)
  • - *
  • startTimestamp: Start timestamp in ISO 8601 UTC format. (Example: {@code 1970-01-01T00:00:00.000Z})
  • - *
  • keys: An array of message keys to filter by. (Example: {@code ["key1", "key2"]})
  • + *
  • partition: Specific partition to search in. (Default: -1 for all + * partitions)
  • + *
  • format: Deserialization format for the message value. (Default: DEFAULT) + *
  • + *
  • keyFormat: Deserialization format for the message key. (Default: + * DEFAULT)
  • + *
  • startTimestamp: Start timestamp in ISO 8601 UTC format. (Example: {@code + * 1970-01-01T00:00:00.000Z})
  • + *
  • keys: An array of message keys to filter by. (Example: {@code ["key1", + * "key2"]})
  • *
* @param errors BindingResult for validation, automatically populated by Spring. * @return A {@link SearchResultsVO} object containing the found messages (sorted by timestamp) and search completion details. From ae6456ef83c3005d4263c25816c0c4ae874d57a2 Mon Sep 17 00:00:00 2001 From: Bert Roos Date: Sat, 12 Jul 2025 10:28:48 +0200 Subject: [PATCH 14/14] Wrap long line to comply with linter constraints --- src/main/java/kafdrop/controller/MessageController.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index d69b8928..5aecbd16 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -389,7 +389,8 @@ private SearchResultsVO searchMessageVOs(String topicName, SearchMessageFormForJ * "key2"]}) * * @param errors BindingResult for validation, automatically populated by Spring. - * @return A {@link SearchResultsVO} object containing the found messages (sorted by timestamp) and search completion details. + * @return A {@link SearchResultsVO} object containing the found messages (sorted by timestamp) and search + * completion details. */ @Operation(summary = "searchMessages", description = "Search messages and return results as JSON")