diff --git a/src/main/java/kafdrop/config/ObjectMapperConfig.java b/src/main/java/kafdrop/config/ObjectMapperConfig.java new file mode 100644 index 00000000..e67f3933 --- /dev/null +++ b/src/main/java/kafdrop/config/ObjectMapperConfig.java @@ -0,0 +1,19 @@ +package kafdrop.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; + +import java.util.TimeZone; + +@Configuration +public class ObjectMapperConfig { + + @Bean + public ObjectMapper objectMapper(Jackson2ObjectMapperBuilder builder) { + return builder + .timeZone(TimeZone.getDefault()) + .build(); + } +} diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index d3542f7d..5aecbd16 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -32,7 +32,10 @@ import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties; import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties; import kafdrop.form.SearchMessageForm; +import kafdrop.form.SearchMessageFormForJson; +import kafdrop.model.CreateMessageVO; import kafdrop.model.MessageVO; +import kafdrop.model.SearchResultsVO; import kafdrop.model.TopicPartitionVO; import kafdrop.model.TopicVO; import kafdrop.service.KafkaMonitor; @@ -52,15 +55,9 @@ import kafdrop.util.ProtobufMessageDeserializer; import kafdrop.util.ProtobufMessageSerializer; import kafdrop.util.ProtobufSchemaRegistryMessageDeserializer; - -import java.io.File; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; - - import kafdrop.util.Serializers; import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; @@ -68,13 +65,18 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.ModelAttribute; import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.server.ResponseStatusException; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.Date; - -import org.springframework.web.bind.annotation.PostMapping; -import kafdrop.model.CreateMessageVO; +import java.util.List; @Tag(name = "message-controller", description = "Message Controller") @Controller @@ -123,11 +125,45 @@ public String viewAllMessages(@PathVariable("name") String topicName, model.addAttribute("keyFormats", KeyFormat.values()); model.addAttribute("descFiles", protobufProperties.getDescFilesList()); + model.addAttribute("messages", getMessages(topicName, defaultKeyFormat, defaultFormat, topic, size)); + + return "topic-messages"; + } + + /** + * JSON array of reading all topic messages sorted by timestamp. + * + * @param topicName Name of topic + * @param count Count of messages + * @return JSON array for seeing all messages in a topic sorted by timestamp. + */ + @Operation(summary = "getAllMessages", description = "Get all messages from topic") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "Success"), + @ApiResponse(responseCode = "404", description = "Invalid topic name") + }) + @GetMapping(value = "/topic/{name:.+}/allmessages", produces = MediaType.APPLICATION_JSON_VALUE) + @ResponseBody + public List getAllMessages(@PathVariable("name") String topicName, + @RequestParam(name = "count", required = false) Integer count) { + final int size = (count != null ? count : 100); + final MessageFormat defaultFormat = messageFormatProperties.getFormat(); + final MessageFormat defaultKeyFormat = messageFormatProperties.getKeyFormat(); + final TopicVO topic = kafkaMonitor.getTopic(topicName) + .orElseThrow(() -> new TopicNotFoundException(topicName)); + + return getMessages(topicName, defaultKeyFormat, defaultFormat, topic, size); + } + + private @org.jetbrains.annotations.NotNull List getMessages(String topicName, + MessageFormat defaultKeyFormat, + MessageFormat defaultFormat, TopicVO topic, + int size) { final var deserializers = new Deserializers( getDeserializer(topicName, defaultKeyFormat, "", "", protobufProperties.getParseAnyProto()), getDeserializer(topicName, defaultFormat, "", "", protobufProperties.getParseAnyProto())); - final List messages = messageInspector.getMessages(topicName, size, deserializers); + final List messages = new ArrayList<>(); for (TopicPartitionVO partition : topic.getPartitions()) { messages.addAll(messageInspector.getMessages(topicName, @@ -138,9 +174,7 @@ public String viewAllMessages(@PathVariable("name") String topicName, } messages.sort(Comparator.comparing(MessageVO::getTimestamp)); - model.addAttribute("messages", messages); - - return "topic-messages"; + return messages; } /** @@ -303,23 +337,8 @@ public String searchMessageForm(@PathVariable("name") String topicName, model.addAttribute("descFiles", protobufProperties.getDescFilesList()); if (!searchMessageForm.isEmpty() && !errors.hasErrors()) { - - final var deserializers = new Deserializers( - getDeserializer(topicName, searchMessageForm.getKeyFormat(), searchMessageForm.getDescFile(), - searchMessageForm.getMsgTypeName(), - protobufProperties.getParseAnyProto()), - getDeserializer(topicName, searchMessageForm.getFormat(), searchMessageForm.getDescFile(), - searchMessageForm.getMsgTypeName(), - protobufProperties.getParseAnyProto()) - ); - - var searchResults = kafkaMonitor.searchMessages( - topicName, - searchMessageForm.getSearchText(), - searchMessageForm.getPartition(), - searchMessageForm.getMaximumCount(), - searchMessageForm.getStartTimestamp(), - deserializers); + final var searchResults = searchMessageVOs(topicName, searchMessageForm, searchMessageForm.getDescFile(), + searchMessageForm.getMsgTypeName()); model.addAttribute("messages", searchResults.getMessages()); model.addAttribute("details", searchResults.getCompletionDetails()); @@ -328,6 +347,86 @@ public String searchMessageForm(@PathVariable("name") String topicName, return "search-message"; } + private SearchResultsVO searchMessageVOs(String topicName, SearchMessageFormForJson searchMessageForm, + String descFile, + String msgTypeName) { + final var deserializers = new Deserializers( + getDeserializer(topicName, searchMessageForm.getKeyFormat(), descFile, + msgTypeName, + protobufProperties.getParseAnyProto()), + getDeserializer(topicName, searchMessageForm.getFormat(), descFile, + msgTypeName, + protobufProperties.getParseAnyProto()) + ); + + return kafkaMonitor.searchMessages( + topicName, + searchMessageForm.getSearchText(), + searchMessageForm.getPartition(), + searchMessageForm.getMaximumCount(), + searchMessageForm.getStartTimestamp(), + deserializers); + } + + /** + * Searches for messages in a specific topic based on criteria provided in the request body. + * This endpoint expects a POST request with a JSON payload. + * + * @param topicName The name of the topic to search within. + * @param searchMessageForm A JSON object in the request body containing the search criteria. All fields are optional. + *
    + *
  • searchText: Text to search for in the message payload. (Default: "")
  • + *
  • maximumCount: Maximum number of messages to return. (Default: 1000)
  • + *
  • partition: Specific partition to search in. (Default: -1 for all + * partitions)
  • + *
  • format: Deserialization format for the message value. (Default: DEFAULT) + *
  • + *
  • keyFormat: Deserialization format for the message key. (Default: + * DEFAULT)
  • + *
  • startTimestamp: Start timestamp in ISO 8601 UTC format. (Example: {@code + * 1970-01-01T00:00:00.000Z})
  • + *
  • keys: An array of message keys to filter by. (Example: {@code ["key1", + * "key2"]})
  • + *
+ * @param errors BindingResult for validation, automatically populated by Spring. + * @return A {@link SearchResultsVO} object containing the found messages (sorted by timestamp) and search + * completion details. + */ + + @Operation(summary = "searchMessages", description = "Search messages and return results as JSON") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "Success"), + @ApiResponse(responseCode = "400", description = "Body has validation errors"), + @ApiResponse(responseCode = "404", description = "Invalid topic name") + }) + @PostMapping(value = "/topic/{name:.+}/search-messages", + produces = MediaType.APPLICATION_JSON_VALUE, + consumes = MediaType.APPLICATION_JSON_VALUE) + @ResponseBody + public SearchResultsVO searchMessages(@PathVariable("name") String topicName, + @Valid @RequestBody SearchMessageFormForJson searchMessageForm, + BindingResult errors) { + + if (errors.hasErrors()) throw new ResponseStatusException(HttpStatus.BAD_REQUEST, errors.getAllErrors().toString()); + + kafkaMonitor.getTopic(topicName) + .orElseThrow(() -> new TopicNotFoundException(topicName)); + + final var searchResultsVO = searchMessageVOs(topicName, searchMessageForm, null, null); + + if (searchMessageForm.getKeys() != null) { + var filteredByKeyMessages = searchResultsVO.getMessages().stream() + .filter( + messageVO -> Arrays.asList(searchMessageForm.getKeys()).contains(messageVO.getKey())) + .sorted(Comparator.comparing(MessageVO::getTimestamp)) + .toList(); + + searchResultsVO.setMessages(filteredByKeyMessages); + } + + return searchResultsVO; + } + /** * Returns the selected message format based on the form submission * @@ -588,6 +687,5 @@ public Boolean getIsAnyProto() { public void setIsAnyProto(Boolean isAnyProto) { this.isAnyProto = isAnyProto; } - } } diff --git a/src/main/java/kafdrop/form/SearchMessageForm.java b/src/main/java/kafdrop/form/SearchMessageForm.java index 27564633..b127033b 100644 --- a/src/main/java/kafdrop/form/SearchMessageForm.java +++ b/src/main/java/kafdrop/form/SearchMessageForm.java @@ -10,40 +10,14 @@ import java.util.Date; -public class SearchMessageForm { - - @NotBlank - private String searchText; - - @NotNull - @Min(1) - @Max(1000) - private Integer maximumCount; - - private Integer partition; - - private MessageFormat format; - - private MessageFormat keyFormat; +public class SearchMessageForm extends SearchMessageFormForJson { private String descFile; private String msgTypeName; - @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") - private Date startTimestamp; - public SearchMessageForm(String searchText, MessageFormat format) { - this.searchText = searchText; - this.format = format; - } - - public Date getStartTimestamp() { - return startTimestamp; - } - - public void setStartTimestamp(Date startTimestamp) { - this.startTimestamp = startTimestamp; + super (searchText, format); } public SearchMessageForm(String searchText) { @@ -53,43 +27,6 @@ public SearchMessageForm(String searchText) { public SearchMessageForm() { } - @JsonIgnore - public boolean isEmpty() { - return searchText == null || searchText.isEmpty(); - } - - public String getSearchText() { - return searchText; - } - - public void setSearchText(String searchText) { - this.searchText = searchText; - } - - public Integer getMaximumCount() { - return maximumCount; - } - - public void setMaximumCount(Integer maximumCount) { - this.maximumCount = maximumCount; - } - - public MessageFormat getKeyFormat() { - return keyFormat; - } - - public void setKeyFormat(MessageFormat keyFormat) { - this.keyFormat = keyFormat; - } - - public MessageFormat getFormat() { - return format; - } - - public void setFormat(MessageFormat format) { - this.format = format; - } - public String getDescFile() { return descFile; } @@ -106,11 +43,4 @@ public void setMsgTypeName(String msgTypeName) { this.msgTypeName = msgTypeName; } - public Integer getPartition() { - return partition; - } - - public void setPartition(Integer partition) { - this.partition = partition; - } } diff --git a/src/main/java/kafdrop/form/SearchMessageFormForJson.java b/src/main/java/kafdrop/form/SearchMessageFormForJson.java new file mode 100644 index 00000000..f9f99334 --- /dev/null +++ b/src/main/java/kafdrop/form/SearchMessageFormForJson.java @@ -0,0 +1,110 @@ +package kafdrop.form; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonIgnore; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotBlank; +import jakarta.validation.constraints.NotNull; +import kafdrop.util.MessageFormat; +import lombok.Data; +import org.springframework.format.annotation.DateTimeFormat; + +import java.util.Date; + +@Data +public class SearchMessageFormForJson { + @NotBlank + @Schema(example = "Some search text") + private String searchText; + @NotNull + @Min(1) + @Max(1000) + @Schema(example = "1000") + private Integer maximumCount; + @Schema(example = "-1") + private Integer partition; + @Schema(example = "DEFAULT") + private MessageFormat format; + @Schema(example = "DEFAULT") + private MessageFormat keyFormat; + @Schema(type = "string", example = "1970-01-01T00:00:00.000Z") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS") + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", timezone = "UTC") + private Date startTimestamp; + @Schema(description = "Keys to filter messages", example = "[\"key1\", \"key2\"]") + private String[] keys; + + protected SearchMessageFormForJson() { + // Default constructor + } + + protected SearchMessageFormForJson(String searchText, MessageFormat format) { + this.searchText = searchText; + this.format = format; + } + + public SearchMessageFormForJson(String searchText, Integer maximumCount, Integer partition, MessageFormat format, + MessageFormat keyFormat, Date startTimestamp) { + this.searchText = (searchText == null) ? "" : searchText; + this.maximumCount = (maximumCount == null) ? 1000 : maximumCount; + this.partition = (partition == null) ? -1 : partition; + this.format = (format == null) ? MessageFormat.DEFAULT : format; + this.keyFormat = (keyFormat == null) ? MessageFormat.DEFAULT : keyFormat; + this.startTimestamp = (startTimestamp == null) ? new Date(0) : startTimestamp; + } + + @JsonIgnore + public boolean isEmpty() { + return searchText == null || searchText.isEmpty(); + } + + public String getSearchText() { + return searchText; + } + + public void setSearchText(String searchText) { + this.searchText = searchText; + } + + public Integer getMaximumCount() { + return maximumCount; + } + + public void setMaximumCount(Integer maximumCount) { + this.maximumCount = maximumCount; + } + + public Integer getPartition() { + return partition; + } + + public void setPartition(Integer partition) { + this.partition = partition; + } + + public MessageFormat getFormat() { + return format; + } + + public void setFormat(MessageFormat format) { + this.format = format; + } + + public MessageFormat getKeyFormat() { + return keyFormat; + } + + public void setKeyFormat(MessageFormat keyFormat) { + this.keyFormat = keyFormat; + } + + public Date getStartTimestamp() { + return startTimestamp; + } + + public void setStartTimestamp(Date startTimestamp) { + this.startTimestamp = startTimestamp; + } +} diff --git a/src/main/java/kafdrop/model/MessageVO.java b/src/main/java/kafdrop/model/MessageVO.java index be7f9436..4a630107 100644 --- a/src/main/java/kafdrop/model/MessageVO.java +++ b/src/main/java/kafdrop/model/MessageVO.java @@ -18,6 +18,8 @@ package kafdrop.model; +import com.fasterxml.jackson.annotation.JsonFormat; + import java.util.Date; import java.util.Map; import java.util.stream.Collectors; @@ -28,6 +30,7 @@ public final class MessageVO { private String message; private String key; private Map headers; + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", timezone = "UTC") private Date timestamp; public int getPartition() { diff --git a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java index f9697553..2501d0af 100644 --- a/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java +++ b/src/main/java/kafdrop/service/KafkaHighLevelConsumer.java @@ -219,7 +219,16 @@ synchronized SearchResults searchRecords(String topic, kafkaConsumer.assign(partitions); seekToTimestamp(partitions, startTimestamp); - return searchRecords(searchString, maximumCount, deserializers); + var records = searchRecords(searchString, maximumCount, deserializers); + var filteredByTimestampResults = records.getResults().stream() + .filter(result -> result.timestamp() >= startTimestamp.getTime()) + .toList(); + + return new SearchResults( + filteredByTimestampResults, + records.getCompletionReason(), + records.getFinalMessageTimestamp(), + records.getMessagesScannedCount()); } private void seekToTimestamp(List partitions, Date startTimestamp) {