Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/kafdrop/config/ObjectMapperConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package kafdrop.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;

import java.util.TimeZone;

@Configuration
public class ObjectMapperConfig {

@Bean
public ObjectMapper objectMapper(Jackson2ObjectMapperBuilder builder) {
return builder
.timeZone(TimeZone.getDefault())
.build();
}
}
149 changes: 113 additions & 36 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.enums.Explode;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.tags.Tag;
Expand All @@ -32,49 +34,23 @@
import kafdrop.config.ProtobufDescriptorConfiguration.ProtobufDescriptorProperties;
import kafdrop.config.SchemaRegistryConfiguration.SchemaRegistryProperties;
import kafdrop.form.SearchMessageForm;
import kafdrop.model.MessageVO;
import kafdrop.model.TopicPartitionVO;
import kafdrop.model.TopicVO;
import kafdrop.form.SearchMessageFormForJson;
import kafdrop.model.*;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our coding convention, we don't use wildcard imports. Please restore the original specific imports.

import kafdrop.service.KafkaMonitor;
import kafdrop.service.MessageInspector;
import kafdrop.service.TopicNotFoundException;
import kafdrop.util.AvroMessageDeserializer;
import kafdrop.util.AvroMessageSerializer;
import kafdrop.util.DefaultMessageDeserializer;
import kafdrop.util.DefaultMessageSerializer;
import kafdrop.util.Deserializers;
import kafdrop.util.KeyFormat;
import kafdrop.util.MessageDeserializer;
import kafdrop.util.MessageFormat;
import kafdrop.util.MessageSerializer;
import kafdrop.util.MsgPackMessageDeserializer;
import kafdrop.util.MsgPackMessageSerializer;
import kafdrop.util.ProtobufMessageDeserializer;
import kafdrop.util.ProtobufMessageSerializer;
import kafdrop.util.ProtobufSchemaRegistryMessageDeserializer;

import java.io.File;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;


import kafdrop.util.Serializers;
import kafdrop.util.*;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ModelAttribute;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.*;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here

import org.springframework.web.server.ResponseStatusException;

import java.util.Date;

import org.springframework.web.bind.annotation.PostMapping;
import kafdrop.model.CreateMessageVO;
import java.io.File;
import java.util.*;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here


@Tag(name = "message-controller", description = "Message Controller")
@Controller
Expand Down Expand Up @@ -127,7 +103,7 @@ public String viewAllMessages(@PathVariable("name") String topicName,
getDeserializer(topicName, defaultKeyFormat, "", "", protobufProperties.getParseAnyProto()),
getDeserializer(topicName, defaultFormat, "", "", protobufProperties.getParseAnyProto()));

final List<MessageVO> messages = messageInspector.getMessages(topicName, size, deserializers);
final List<MessageVO> messages = new ArrayList<>();

for (TopicPartitionVO partition : topic.getPartitions()) {
messages.addAll(messageInspector.getMessages(topicName,
Expand All @@ -143,6 +119,47 @@ public String viewAllMessages(@PathVariable("name") String topicName,
return "topic-messages";
}

/**
* JSON array of reading all topic messages sorted by timestamp.
*
* @param topicName Name of topic
* @param count Count of messages
* @return JSON array for seeing all messages in a topic sorted by timestamp.
*/
@Operation(summary = "getAllMessages", description = "Get all messages from topic")
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Success"),
@ApiResponse(responseCode = "404", description = "Invalid topic name")
})
@GetMapping(value = "/topic/{name:.+}/allmessages", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public List<MessageVO> getAllMessages(@PathVariable("name") String topicName,
@RequestParam(name = "count", required = false) Integer count) {
final int size = (count != null ? count : 100);
final MessageFormat defaultFormat = messageFormatProperties.getFormat();
final MessageFormat defaultKeyFormat = messageFormatProperties.getKeyFormat();
final TopicVO topic = kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));

final var deserializers = new Deserializers(
getDeserializer(topicName, defaultKeyFormat, "", "", protobufProperties.getParseAnyProto()),
getDeserializer(topicName, defaultFormat, "", "", protobufProperties.getParseAnyProto()));

final List<MessageVO> messages = new ArrayList<>();

for (TopicPartitionVO partition : topic.getPartitions()) {
messages.addAll(messageInspector.getMessages(topicName,
partition.getId(),
partition.getFirstOffset(),
size,
deserializers));
}

messages.sort(Comparator.comparing(MessageVO::getTimestamp));
Comment on lines +144 to +176
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This duplicates the lines 102-116 of viewAllMessages. Extract these into a private method and call that method here instead of duplicating them.


return messages;
}

/**
* Human friendly view of reading messages.
*
Expand Down Expand Up @@ -328,6 +345,67 @@ public String searchMessageForm(@PathVariable("name") String topicName,
return "search-message";
}

/**
*
* @param topicName Name of topic
* @param searchMessageForm Message form for submitting requests to search messages (all fields are not required):<br>
* &nbsp;* searchText (default value = "")<br>
* &nbsp;* maximumCount default value = "1000")<br>
* &nbsp;* partition (default value = "-1")<br>
* &nbsp;* format (default value = "DEFAULT")<br>
* &nbsp;* keyFormat (default value = "DEFAULT")<br>
* &nbsp;* startTimestamp (default value = "1970-01-01 00:00:00.000")
* @param keys Keys to filter messages (not required)
* @param errors
* @return JSON array of found messages (sorted by timestamp) and completionDetails about search results
*/
@Operation(summary = "searchMessages", description = "Search messages and return results as JSON")
@ApiResponses(value = {
@ApiResponse(responseCode = "200", description = "Success"),
@ApiResponse(responseCode = "400", description = "Body has validation errors"),
@ApiResponse(responseCode = "404", description = "Invalid topic name")
})
@GetMapping(value = "/topic/{name:.+}/search-messages", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public SearchResultsVO searchMessages(@PathVariable("name") String topicName,
@Valid @ModelAttribute SearchMessageFormForJson searchMessageForm,
@Parameter(description = "Keys to filter messages", explode = Explode.TRUE)
@RequestParam(name = "keys", required = false) String[] keys,
BindingResult errors) {

if (errors.hasErrors()) throw new ResponseStatusException(HttpStatus.BAD_REQUEST, errors.getAllErrors().toString());

kafkaMonitor.getTopic(topicName)
.orElseThrow(() -> new TopicNotFoundException(topicName));

final var deserializers = new Deserializers(
getDeserializer(topicName, searchMessageForm.getKeyFormat(), null, null,
protobufProperties.getParseAnyProto()),
getDeserializer(topicName, searchMessageForm.getFormat(), null, null,
protobufProperties.getParseAnyProto())
);

var searchResultsVO = kafkaMonitor.searchMessages(
topicName,
searchMessageForm.getSearchText(),
searchMessageForm.getPartition(),
searchMessageForm.getMaximumCount(),
searchMessageForm.getStartTimestamp(),
deserializers);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines duplicate the lines 325-339 of searchMessageForm, except that descFile and msgTypeName are set to null. Extract the lines into a method that takes descFile and msgTypeName as parameters and call the method here and above.


if (keys != null) {
var filteredByKeyMessages = searchResultsVO.getMessages().stream()
.filter(
messageVO -> Arrays.asList(keys).contains(messageVO.getKey()))
.sorted(Comparator.comparing(MessageVO::getTimestamp))
.toList();

searchResultsVO.setMessages(filteredByKeyMessages);
}

return searchResultsVO;
}

/**
* Returns the selected message format based on the form submission
*
Expand Down Expand Up @@ -588,6 +666,5 @@ public Boolean getIsAnyProto() {
public void setIsAnyProto(Boolean isAnyProto) {
this.isAnyProto = isAnyProto;
}

}
}
41 changes: 41 additions & 0 deletions src/main/java/kafdrop/form/SearchMessageFormForJson.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package kafdrop.form;

import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import kafdrop.util.MessageFormat;
import lombok.Data;
import org.springframework.format.annotation.DateTimeFormat;

import java.util.Date;

@Data
public class SearchMessageFormForJson {
@Schema(example = "Some search text")
private String searchText;
@Min(1)
@Max(1000)
@Schema(example = "1000")
private Integer maximumCount;
@Schema(example = "-1")
private Integer partition;
@Schema(example = "DEFAULT")
private MessageFormat format;
@Schema(example = "DEFAULT")
private MessageFormat keyFormat;
@Schema(type = "string", example = "1970-01-01 03:00:00.000")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS")
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss.SSS")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ndanilin Why this format instead of the regular ISO 8601 format?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bert-R this format was for "SearchMessageForm" before my changes (ed9d1cd). I decided to do the same thing. Should we change it to ISO 8601?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please change it to the ISO format. That's a better fit for an API.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bert-R
fixed this moment:

  1. APIs work with iso format (input and output)
  2. UI still works with "yyyy-MM-dd HH:mm:ss.SSS" (haven't dealt with ftlh files before it, so check please)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ndanilin Sorry, that's not what I meant. My comment only applied to the JSON API. Just this change should suffice:

- @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd HH:mm:ss.SSS")
+ @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", timezone = "UTC")

Copy link
Copy Markdown
Contributor Author

@ndanilin ndanilin Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bert-R sorry to complicate your idea.

  1. canceled a previous commit
  2. replaced the "searchMessages" API with the POST method (because GET doesn't work with @JSON format)
  3. corrected documentation for "searchMessages"
  4. changed @jsonformat to "SearchMessageFormForJson" and "MessageVO", as you said

private Date startTimestamp;

public SearchMessageFormForJson(String searchText, Integer maximumCount, Integer partition, MessageFormat format,
MessageFormat keyFormat, Date startTimestamp) {
this.searchText = (searchText == null) ? "" : searchText;
this.maximumCount = (maximumCount == null) ? 1000 : maximumCount;
this.partition = (partition == null) ? -1 : partition;
this.format = (format == null) ? MessageFormat.DEFAULT : format;
this.keyFormat = (keyFormat == null) ? MessageFormat.DEFAULT : keyFormat;
this.startTimestamp = (startTimestamp == null) ? new Date(0) : startTimestamp;
}
}
3 changes: 3 additions & 0 deletions src/main/java/kafdrop/model/MessageVO.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package kafdrop.model;

import com.fasterxml.jackson.annotation.JsonFormat;

import java.util.Date;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -28,6 +30,7 @@ public final class MessageVO {
private String message;
private String key;
private Map<String, String> headers;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this format instead of ISO 8601?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bert-R it's the same as for "SearchMessageForm" and "SearchMessageFormForJson" (my comment below)

private Date timestamp;

public int getPartition() {
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/kafdrop/service/KafkaHighLevelConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,16 @@ synchronized SearchResults searchRecords(String topic,
kafkaConsumer.assign(partitions);
seekToTimestamp(partitions, startTimestamp);

return searchRecords(searchString, maximumCount, deserializers);
var records = searchRecords(searchString, maximumCount, deserializers);
var filteredByTimestampResults = records.getResults().stream()
.filter(result -> result.timestamp() >= startTimestamp.getTime())
.toList();

return new SearchResults(
filteredByTimestampResults,
records.getCompletionReason(),
records.getFinalMessageTimestamp(),
records.getMessagesScannedCount());
}

private void seekToTimestamp(List<TopicPartition> partitions, Date startTimestamp) {
Expand Down