Skip to content

Commit 5275899

Browse files
Added functionality to allow user to add a message to a topic partition manually (#703)
1 parent 0ed94d0 commit 5275899

15 files changed

+465
-14
lines changed

src/main/java/kafdrop/controller/MessageController.java

Lines changed: 100 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,28 @@
3939
import kafdrop.service.MessageInspector;
4040
import kafdrop.service.TopicNotFoundException;
4141
import kafdrop.util.AvroMessageDeserializer;
42+
import kafdrop.util.AvroMessageSerializer;
4243
import kafdrop.util.DefaultMessageDeserializer;
44+
import kafdrop.util.DefaultMessageSerializer;
4345
import kafdrop.util.Deserializers;
4446
import kafdrop.util.KeyFormat;
4547
import kafdrop.util.MessageDeserializer;
4648
import kafdrop.util.MessageFormat;
49+
import kafdrop.util.MessageSerializer;
4750
import kafdrop.util.MsgPackMessageDeserializer;
51+
import kafdrop.util.MsgPackMessageSerializer;
4852
import kafdrop.util.ProtobufMessageDeserializer;
53+
import kafdrop.util.ProtobufMessageSerializer;
4954
import kafdrop.util.ProtobufSchemaRegistryMessageDeserializer;
55+
56+
import java.io.File;
57+
import java.util.ArrayList;
58+
import java.util.Comparator;
59+
import java.util.List;
60+
61+
62+
import kafdrop.util.Serializers;
63+
import org.apache.kafka.clients.producer.RecordMetadata;
5064
import org.springframework.http.MediaType;
5165
import org.springframework.stereotype.Controller;
5266
import org.springframework.ui.Model;
@@ -57,11 +71,10 @@
5771
import org.springframework.web.bind.annotation.RequestParam;
5872
import org.springframework.web.bind.annotation.ResponseBody;
5973

60-
import java.io.File;
61-
import java.util.ArrayList;
62-
import java.util.Comparator;
6374
import java.util.Date;
64-
import java.util.List;
75+
76+
import org.springframework.web.bind.annotation.PostMapping;
77+
import kafdrop.model.CreateMessageVO;
6578

6679
@Tag(name = "message-controller", description = "Message Controller")
6780
@Controller
@@ -195,6 +208,60 @@ public String viewMessageForm(@PathVariable("name") String topicName,
195208
return "message-inspector";
196209
}
197210

211+
@PostMapping("/topic/{name:.+}/addmessage")
212+
public String addMessage(
213+
@PathVariable("name")
214+
String topicName,
215+
@ModelAttribute("addMessageForm") CreateMessageVO body,
216+
Model model) {
217+
try {
218+
final MessageFormat defaultFormat = messageFormatProperties.getFormat();
219+
final MessageFormat defaultKeyFormat = messageFormatProperties.getKeyFormat();
220+
221+
final var serializers = new Serializers(
222+
getSerializer(topicName, defaultKeyFormat, "", ""),
223+
getSerializer(topicName, defaultFormat, "", ""));
224+
RecordMetadata recordMetadata = kafkaMonitor.publishMessage(body, serializers);
225+
226+
final var deserializers = new Deserializers(
227+
getDeserializer(topicName, defaultKeyFormat, "", ""),
228+
getDeserializer(topicName, defaultFormat, "", "")
229+
);
230+
231+
final PartitionOffsetInfo defaultForm = new PartitionOffsetInfo();
232+
233+
defaultForm.setCount(100l);
234+
defaultForm.setOffset(recordMetadata.offset());
235+
defaultForm.setPartition(body.getTopicPartition());
236+
defaultForm.setFormat(defaultFormat);
237+
defaultForm.setKeyFormat(defaultFormat);
238+
239+
model.addAttribute("messageForm", defaultForm);
240+
241+
final TopicVO topic = kafkaMonitor.getTopic(topicName)
242+
.orElseThrow(() -> new TopicNotFoundException(topicName));
243+
244+
model.addAttribute("topic", topic);
245+
246+
model.addAttribute("defaultFormat", defaultFormat);
247+
model.addAttribute("messageFormats", MessageFormat.values());
248+
model.addAttribute("defaultKeyFormat", defaultKeyFormat);
249+
model.addAttribute("keyFormats", KeyFormat.values());
250+
model.addAttribute("descFiles", protobufProperties.getDescFilesList());
251+
model.addAttribute("messages",
252+
messageInspector.getMessages(topicName,
253+
body.getTopicPartition(),
254+
recordMetadata.offset(),
255+
100,
256+
deserializers));
257+
model.addAttribute("isAnyProtoOpts", List.of(true, false));
258+
259+
} catch (Exception ex) {
260+
model.addAttribute("errorMessage", ex.getMessage());
261+
}
262+
return "message-inspector";
263+
}
264+
198265
/**
199266
* Human friendly view of searching messages.
200267
*
@@ -339,6 +406,11 @@ List<Object> getPartitionOrMessages(
339406
}
340407
}
341408

409+
private MessageDeserializer getDeserializer(String topicName, MessageFormat format, String descFile,
410+
String msgTypeName) {
411+
return getDeserializer(topicName, format, descFile, msgTypeName, false);
412+
}
413+
342414
private MessageDeserializer getDeserializer(String topicName, MessageFormat format, String descFile,
343415
String msgTypeName, boolean isAnyProto) {
344416
final MessageDeserializer deserializer;
@@ -370,6 +442,30 @@ private MessageDeserializer getDeserializer(String topicName, MessageFormat form
370442
return deserializer;
371443
}
372444

445+
private MessageSerializer getSerializer(String topicName, MessageFormat format, String descFile, String msgTypeName) {
446+
final MessageSerializer serializer;
447+
448+
if (format == MessageFormat.AVRO) {
449+
final var schemaRegistryUrl = schemaRegistryProperties.getConnect();
450+
final var schemaRegistryAuth = schemaRegistryProperties.getAuth();
451+
452+
serializer = new AvroMessageSerializer(topicName, schemaRegistryUrl, schemaRegistryAuth);
453+
} else if (format == MessageFormat.PROTOBUF) {
454+
// filter the input file name
455+
final var descFileName = descFile.replace(".desc", "")
456+
.replaceAll("\\.", "")
457+
.replaceAll("/", "");
458+
final var fullDescFile = protobufProperties.getDirectory() + File.separator + descFileName + ".desc";
459+
serializer = new ProtobufMessageSerializer(fullDescFile, msgTypeName);
460+
} else if (format == MessageFormat.MSGPACK) {
461+
serializer = new MsgPackMessageSerializer();
462+
} else {
463+
serializer = new DefaultMessageSerializer();
464+
}
465+
466+
return serializer;
467+
}
468+
373469
/**
374470
* Encapsulates offset data for a single partition.
375471
*/
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package kafdrop.model;
2+
3+
import lombok.Data;
4+
import lombok.RequiredArgsConstructor;
5+
6+
@Data
7+
@RequiredArgsConstructor
8+
public final class CreateMessageVO {
9+
10+
private int topicPartition;
11+
12+
private String key;
13+
14+
private String value;
15+
16+
private String topic;
17+
}

src/main/java/kafdrop/service/KafkaHighLevelConsumer.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,10 +212,9 @@ synchronized SearchResults searchRecords(String topic,
212212
List<TopicPartition> partitions = determinePartitionsForTopic(topic);
213213
if (partition != -1) {
214214
var partitionOpt = partitions.stream().filter(p -> p.partition() == partition).findAny();
215-
if (partitionOpt.isEmpty()) {
216-
throw new IllegalArgumentException("Partition does not exist in topic");
217-
}
218-
partitions = List.of(partitionOpt.get());
215+
partitions = List.of(partitionOpt.orElseThrow(
216+
() -> new IllegalArgumentException("Partition " + partition + " does not exist in topic")
217+
));
219218
}
220219
kafkaConsumer.assign(partitions);
221220
seekToTimestamp(partitions, startTimestamp);
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package kafdrop.service;
2+
3+
import java.util.Properties;
4+
import java.util.concurrent.Future;
5+
6+
7+
import jakarta.annotation.PostConstruct;
8+
import org.apache.kafka.clients.producer.KafkaProducer;
9+
import org.apache.kafka.clients.producer.ProducerConfig;
10+
import org.apache.kafka.clients.producer.ProducerRecord;
11+
import org.apache.kafka.clients.producer.RecordMetadata;
12+
import org.apache.kafka.common.serialization.ByteArraySerializer;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
import org.springframework.stereotype.Service;
16+
17+
import kafdrop.config.KafkaConfiguration;
18+
import kafdrop.model.CreateMessageVO;
19+
import kafdrop.util.Serializers;
20+
21+
@Service
22+
public final class KafkaHighLevelProducer {
23+
24+
private static final Logger LOG = LoggerFactory.getLogger(KafkaHighLevelProducer.class);
25+
private final KafkaConfiguration kafkaConfiguration;
26+
private KafkaProducer<byte[], byte[]> kafkaProducer;
27+
28+
public KafkaHighLevelProducer(KafkaConfiguration kafkaConfiguration) {
29+
this.kafkaConfiguration = kafkaConfiguration;
30+
}
31+
32+
@PostConstruct
33+
private void initializeClient() {
34+
if (kafkaProducer == null) {
35+
final var properties = new Properties();
36+
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
37+
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
38+
properties.put(ProducerConfig.ACKS_CONFIG, "all");
39+
properties.put(ProducerConfig.RETRIES_CONFIG, 0);
40+
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
41+
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "kafdrop-producer");
42+
kafkaConfiguration.applyCommon(properties);
43+
44+
kafkaProducer = new KafkaProducer<>(properties);
45+
}
46+
}
47+
48+
public RecordMetadata publishMessage(CreateMessageVO message, Serializers serializers) {
49+
initializeClient();
50+
51+
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(message.getTopic(),
52+
message.getTopicPartition(), serializers.getKeySerializer().serializeMessage(message.getKey()),
53+
serializers.getValueSerializer().serializeMessage(message.getValue()));
54+
55+
Future<RecordMetadata> result = kafkaProducer.send(record);
56+
try {
57+
RecordMetadata recordMetadata = result.get();
58+
LOG.info("Record published successfully [{}]", recordMetadata);
59+
return recordMetadata;
60+
} catch (Exception e) {
61+
LOG.error("Failed to publish message", e);
62+
throw new KafkaProducerException(e);
63+
}
64+
}
65+
}

src/main/java/kafdrop/service/KafkaMonitor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@
2222
import kafdrop.model.BrokerVO;
2323
import kafdrop.model.ClusterSummaryVO;
2424
import kafdrop.model.ConsumerVO;
25+
import kafdrop.model.CreateMessageVO;
2526
import kafdrop.model.CreateTopicVO;
2627
import kafdrop.model.MessageVO;
2728
import kafdrop.model.SearchResultsVO;
2829
import kafdrop.model.TopicVO;
2930
import kafdrop.util.Deserializers;
31+
import kafdrop.util.Serializers;
3032
import org.apache.kafka.common.TopicPartition;
3133

34+
import org.apache.kafka.clients.producer.RecordMetadata;
35+
3236
import java.util.Collection;
3337
import java.util.Date;
3438
import java.util.List;
@@ -79,5 +83,7 @@ SearchResultsVO searchMessages(String topic,
7983
*/
8084
void deleteTopic(String topic);
8185

86+
RecordMetadata publishMessage(CreateMessageVO message, Serializers serializers);
87+
8288
List<AclVO> getAcls();
8389
}

src/main/java/kafdrop/service/KafkaMonitorImpl.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,36 +21,40 @@
2121
import kafdrop.model.AclVO;
2222
import kafdrop.model.BrokerVO;
2323
import kafdrop.model.ClusterSummaryVO;
24-
import kafdrop.model.ConsumerPartitionVO;
25-
import kafdrop.model.ConsumerTopicVO;
2624
import kafdrop.model.ConsumerVO;
25+
import kafdrop.model.ConsumerTopicVO;
26+
import kafdrop.model.ConsumerPartitionVO;
27+
import kafdrop.model.CreateMessageVO;
2728
import kafdrop.model.CreateTopicVO;
2829
import kafdrop.model.MessageVO;
2930
import kafdrop.model.SearchResultsVO;
30-
import kafdrop.model.TopicPartitionVO;
3131
import kafdrop.model.TopicVO;
32+
import kafdrop.model.TopicPartitionVO;
33+
import kafdrop.util.Serializers;
3234
import kafdrop.util.Deserializers;
3335
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource;
3436
import org.apache.kafka.clients.admin.NewTopic;
3537
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
38+
import org.apache.kafka.clients.producer.RecordMetadata;
3639
import org.apache.kafka.common.PartitionInfo;
3740
import org.apache.kafka.common.TopicPartition;
3841
import org.apache.kafka.common.header.Headers;
3942
import org.slf4j.Logger;
4043
import org.slf4j.LoggerFactory;
4144
import org.springframework.stereotype.Service;
4245

46+
4347
import java.util.ArrayList;
4448
import java.util.Collection;
4549
import java.util.Collections;
4650
import java.util.Comparator;
4751
import java.util.Date;
4852
import java.util.List;
4953
import java.util.Map;
50-
import java.util.Map.Entry;
5154
import java.util.Optional;
5255
import java.util.Set;
5356
import java.util.TreeMap;
57+
import java.util.Map.Entry;
5458
import java.util.function.Function;
5559
import java.util.stream.Collectors;
5660

@@ -64,9 +68,13 @@ public final class KafkaMonitorImpl implements KafkaMonitor {
6468

6569
private final KafkaHighLevelAdminClient highLevelAdminClient;
6670

67-
public KafkaMonitorImpl(KafkaHighLevelConsumer highLevelConsumer, KafkaHighLevelAdminClient highLevelAdminClient) {
71+
private final KafkaHighLevelProducer highLevelProducer;
72+
73+
public KafkaMonitorImpl(KafkaHighLevelConsumer highLevelConsumer, KafkaHighLevelAdminClient highLevelAdminClient,
74+
KafkaHighLevelProducer highLevelProducer) {
6875
this.highLevelConsumer = highLevelConsumer;
6976
this.highLevelAdminClient = highLevelAdminClient;
77+
this.highLevelProducer = highLevelProducer;
7078
}
7179

7280
@Override
@@ -419,4 +427,9 @@ private List<ConsumerGroupOffsets> getConsumerOffsets(Set<String> topics) {
419427
.filter(not(ConsumerGroupOffsets::isEmpty))
420428
.collect(Collectors.toList());
421429
}
430+
431+
@Override
432+
public RecordMetadata publishMessage(CreateMessageVO message, Serializers serializers) {
433+
return highLevelProducer.publishMessage(message, serializers);
434+
}
422435
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package kafdrop.service;
2+
3+
import org.springframework.http.HttpStatus;
4+
import org.springframework.web.bind.annotation.ResponseStatus;
5+
6+
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
7+
public class KafkaProducerException extends RuntimeException {
8+
9+
public KafkaProducerException(Throwable exception) {
10+
super(exception);
11+
}
12+
13+
public KafkaProducerException(String message) {
14+
super(message);
15+
}
16+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package kafdrop.util;
2+
3+
import java.util.HashMap;
4+
5+
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
6+
import io.confluent.kafka.serializers.KafkaAvroSerializer;
7+
8+
public class AvroMessageSerializer implements MessageSerializer {
9+
10+
private final String topicName;
11+
private final KafkaAvroSerializer serializer;
12+
13+
public AvroMessageSerializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) {
14+
this.topicName = topicName;
15+
this.serializer = getSerializer(schemaRegistryUrl, schemaRegistryAuth);
16+
}
17+
18+
@Override
19+
public byte[] serializeMessage(String value) {
20+
final var bytes = value.getBytes();
21+
return serializer.serialize(topicName, bytes);
22+
}
23+
24+
private KafkaAvroSerializer getSerializer(String schemaRegistryUrl, String schemaRegistryAuth) {
25+
final var config = new HashMap<String, Object>();
26+
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
27+
if (schemaRegistryAuth != null) {
28+
config.put(AbstractKafkaAvroSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
29+
config.put(AbstractKafkaAvroSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
30+
}
31+
final var kafkaAvroSerializer = new KafkaAvroSerializer();
32+
kafkaAvroSerializer.configure(config, false);
33+
return kafkaAvroSerializer;
34+
}
35+
36+
}

0 commit comments

Comments
 (0)