Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/kafdrop/config/ObjectMapperConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package kafdrop.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;

import java.util.TimeZone;

@Configuration
public class ObjectMapperConfig {

@Bean
public ObjectMapper objectMapper(Jackson2ObjectMapperBuilder builder) {
return builder
.timeZone(TimeZone.getDefault())
.build();
}
}
152 changes: 120 additions & 32 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.Explode;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.tags.Tag;
Expand All @@ -32,7 +34,10 @@
import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties;
import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties;
import kafdrop.form.SearchMessageForm;
import kafdrop.form.SearchMessageFormForJson;
import kafdrop.model.CreateMessageVO;
import kafdrop.model.MessageVO;
import kafdrop.model.SearchResultsVO;
import kafdrop.model.TopicPartitionVO;
import kafdrop.model.TopicVO;
import kafdrop.service.KafkaMonitor;
Expand All @@ -52,29 +57,27 @@
import kafdrop.util.ProtobufMessageDeserializer;
import kafdrop.util.ProtobufMessageSerializer;
import kafdrop.util.ProtobufSchemaRegistryMessageDeserializer;

import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;


import kafdrop.util.Serializers;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ModelAttribute;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.server.ResponseStatusException;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;

import org.springframework.web.bind.annotation.PostMapping;
import kafdrop.model.CreateMessageVO;
import java.util.List;

@Tag(name = "message-controller", description = "Message Controller")
@Controller
Expand Down Expand Up @@ -123,11 +126,45 @@ public String viewAllMessages(@PathVariable("name") String topicName,
model.addAttribute("keyFormats", KeyFormat.values());
model.addAttribute("descFiles", protobufProperties.getDescFilesList());

model.addAttribute("messages", getMessages(topicName, defaultKeyFormat, defaultFormat, topic, size));

return "topic-messages";
}

/**
* JSON array of reading all topic messages sorted by timestamp.
*
* @param topicName Name of topic
* @param count Count of messages
* @return JSON array for seeing all messages in a topic sorted by timestamp.
*/
@Operation(summary = "getAllMessages", description = "Get all messages from topic")
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Success"),
@ApiResponse(responseCode = "404", description = "Invalid topic name")
})
@GetMapping(value = "/topic/{name:.+}/allmessages", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public List<MessageVO> getAllMessages(@PathVariable("name") String topicName,
@RequestParam(name = "count", required = false) Integer count) {
final int size = (count != null ? count : 100);
final MessageFormat defaultFormat = messageFormatProperties.getFormat();
final MessageFormat defaultKeyFormat = messageFormatProperties.getKeyFormat();
final TopicVO topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));

return getMessages(topicName, defaultKeyFormat, defaultFormat, topic, size);
}

private @org.jetbrains.annotations.NotNull List<MessageVO> getMessages(String topicName,
MessageFormat defaultKeyFormat,
MessageFormat defaultFormat, TopicVO topic,
int size) {
final var deserializers = new Deserializers(
getDeserializer(topicName, defaultKeyFormat, "", "", protobufProperties.getParseAnyProto()),
getDeserializer(topicName, defaultFormat, "", "", protobufProperties.getParseAnyProto()));

final List<MessageVO> messages = messageInspector.getMessages(topicName, size, deserializers);
final List<MessageVO> messages = new ArrayList<>();

for (TopicPartitionVO partition : topic.getPartitions()) {
messages.addAll(messageInspector.getMessages(topicName,
Expand All @@ -138,9 +175,7 @@ public String viewAllMessages(@PathVariable("name") String topicName,
}

messages.sort(Comparator.comparing(MessageVO::getTimestamp));
Comment on lines +144 to +176
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This duplicates the lines 102-116 of viewAllMessages. Extract these into a private method and call that method here instead of duplicating them.

model.addAttribute("messages", messages);

return "topic-messages";
return messages;
}

/**
Expand Down Expand Up @@ -303,23 +338,8 @@ public String searchMessageForm(@PathVariable("name") String topicName,
model.addAttribute("descFiles", protobufProperties.getDescFilesList());

if (!searchMessageForm.isEmpty() && !errors.hasErrors()) {

final var deserializers = new Deserializers(
getDeserializer(topicName, searchMessageForm.getKeyFormat(), searchMessageForm.getDescFile(),
searchMessageForm.getMsgTypeName(),
protobufProperties.getParseAnyProto()),
getDeserializer(topicName, searchMessageForm.getFormat(), searchMessageForm.getDescFile(),
searchMessageForm.getMsgTypeName(),
protobufProperties.getParseAnyProto())
);

var searchResults = kafkaMonitor.searchMessages(
topicName,
searchMessageForm.getSearchText(),
searchMessageForm.getPartition(),
searchMessageForm.getMaximumCount(),
searchMessageForm.getStartTimestamp(),
deserializers);
final var searchResults = searchMessageVOs(topicName, searchMessageForm, searchMessageForm.getDescFile(),
searchMessageForm.getMsgTypeName());

model.addAttribute("messages", searchResults.getMessages());
model.addAttribute("details", searchResults.getCompletionDetails());
Expand All @@ -328,6 +348,75 @@ public String searchMessageForm(@PathVariable("name") String topicName,
return "search-message";
}

private SearchResultsVO searchMessageVOs(String topicName, SearchMessageFormForJson searchMessageForm,
String descFile,
String msgTypeName) {
final var deserializers = new Deserializers(
getDeserializer(topicName, searchMessageForm.getKeyFormat(), descFile,
msgTypeName,
protobufProperties.getParseAnyProto()),
getDeserializer(topicName, searchMessageForm.getFormat(), descFile,
msgTypeName,
protobufProperties.getParseAnyProto())
);

return kafkaMonitor.searchMessages(
topicName,
searchMessageForm.getSearchText(),
searchMessageForm.getPartition(),
searchMessageForm.getMaximumCount(),
searchMessageForm.getStartTimestamp(),
deserializers);
}

/**
*
* @param topicName Name of topic
* @param searchMessageForm Message form for submitting requests to search messages (all fields are not required):<br>
* &nbsp;* searchText (default value = "")<br>
* &nbsp;* maximumCount default value = "1000")<br>
* &nbsp;* partition (default value = "-1")<br>
* &nbsp;* format (default value = "DEFAULT")<br>
* &nbsp;* keyFormat (default value = "DEFAULT")<br>
* &nbsp;* startTimestamp (default value = "1970-01-01 00:00:00.000")
* @param keys Keys to filter messages (not required)
* @param errors
* @return JSON array of found messages (sorted by timestamp) and completionDetails about search results
*/
@Operation(summary = "searchMessages", description = "Search messages and return results as JSON")
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Success"),
@ApiResponse(responseCode = "400", description = "Body has validation errors"),
@ApiResponse(responseCode = "404", description = "Invalid topic name")
})
@GetMapping(value = "/topic/{name:.+}/search-messages", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public SearchResultsVO searchMessages(@PathVariable("name") String topicName,
@Valid @ModelAttribute SearchMessageFormForJson searchMessageForm,
@Parameter(description = "Keys to filter messages", explode = Explode.TRUE)
@RequestParam(name = "keys", required = false) String[] keys,
BindingResult errors) {

if (errors.hasErrors()) throw new ResponseStatusException(HttpStatus.BAD_REQUEST, errors.getAllErrors().toString());

kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));

final var searchResultsVO = searchMessageVOs(topicName, searchMessageForm, null, null);

if (keys != null) {
var filteredByKeyMessages = searchResultsVO.getMessages().stream()
.filter(
messageVO -> Arrays.asList(keys).contains(messageVO.getKey()))
.sorted(Comparator.comparing(MessageVO::getTimestamp))
.toList();

searchResultsVO.setMessages(filteredByKeyMessages);
}

return searchResultsVO;
}

/**
* Returns the selected message format based on the form submission
*
Expand Down Expand Up @@ -588,6 +677,5 @@ public Boolean getIsAnyProto() {
public void setIsAnyProto(Boolean isAnyProto) {
this.isAnyProto = isAnyProto;
}

}
}
74 changes: 2 additions & 72 deletions src/main/java/kafdrop/form/SearchMessageForm.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,14 @@

import java.util.Date;

public class SearchMessageForm {

@NotBlank
private String searchText;

@NotNull
@Min(1)
@Max(1000)
private Integer maximumCount;

private Integer partition;

private MessageFormat format;

private MessageFormat keyFormat;
public class SearchMessageForm extends SearchMessageFormForJson {

private String descFile;

private String msgTypeName;

@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS")
private Date startTimestamp;

public SearchMessageForm(String searchText, MessageFormat format) {
this.searchText = searchText;
this.format = format;
}

public Date getStartTimestamp() {
return startTimestamp;
}

public void setStartTimestamp(Date startTimestamp) {
this.startTimestamp = startTimestamp;
super (searchText, format);
}

public SearchMessageForm(String searchText) {
Expand All @@ -53,43 +27,6 @@ public SearchMessageForm(String searchText) {
public SearchMessageForm() {
}

@JsonIgnore
public boolean isEmpty() {
return searchText == null || searchText.isEmpty();
}

public String getSearchText() {
return searchText;
}

public void setSearchText(String searchText) {
this.searchText = searchText;
}

public Integer getMaximumCount() {
return maximumCount;
}

public void setMaximumCount(Integer maximumCount) {
this.maximumCount = maximumCount;
}

public MessageFormat getKeyFormat() {
return keyFormat;
}

public void setKeyFormat(MessageFormat keyFormat) {
this.keyFormat = keyFormat;
}

public MessageFormat getFormat() {
return format;
}

public void setFormat(MessageFormat format) {
this.format = format;
}

public String getDescFile() {
return descFile;
}
Expand All @@ -106,11 +43,4 @@ public void setMsgTypeName(String msgTypeName) {
this.msgTypeName = msgTypeName;
}

public Integer getPartition() {
return partition;
}

public void setPartition(Integer partition) {
this.partition = partition;
}
}
Loading