Skip to content

Commit 3382edf

Browse files
feat(spring-kafka-example): add Producer kafka test and custom exception handling
1 parent 9f16e7e commit 3382edf

File tree

5 files changed

+130
-2
lines changed

5 files changed

+130
-2
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.io.example.exception;
2+
3+
public class DeserializationException extends RuntimeException {
4+
5+
public DeserializationException(String message) {
6+
super(message);
7+
}
8+
9+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.io.example.exception;
2+
3+
public class SerializationException extends RuntimeException {
4+
5+
public SerializationException(String message) {
6+
super(message);
7+
}
8+
9+
}

spring-kafka-example/src/main/java/com/io/example/mapper/JsonMapper.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.io.example.exception.SerializationException;
6+
import com.io.example.exception.DeserializationException;
57
import lombok.RequiredArgsConstructor;
68
import lombok.extern.slf4j.Slf4j;
79
import org.springframework.stereotype.Component;
@@ -18,7 +20,7 @@ public String toJsonString(Object obj) {
1820
return objectMapper.writeValueAsString(obj);
1921
} catch (JsonProcessingException e) {
2022
log.error("error converting an object of type {} to json. cause of the error: {}", obj.getClass(), e.getMessage());
21-
throw new RuntimeException("JSON serialization failed", e);
23+
throw new SerializationException("JSON serialization failed, cause: " + e.getMessage());
2224
}
2325
}
2426

@@ -27,7 +29,7 @@ public <T> T toObject(String jsonBody, Class<T> typeClass){
2729
return objectMapper.readValue(jsonBody, typeClass);
2830
} catch (JsonProcessingException e) {
2931
log.error("error converting an String to object. cause of the error: {}", e.getMessage());
30-
throw new IllegalArgumentException("Object deserialization failed for target type: " + typeClass.getSimpleName(), e);
32+
throw new DeserializationException("Object deserialization failed for target type: " + typeClass.getSimpleName() + ", cause: " + e.getMessage());
3133
}
3234
}
3335

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.io.example;
2+
3+
import com.io.example.controller.dto.request.MessageRequestDtoRequest;
4+
import com.io.example.mapper.JsonMapper;
5+
import com.io.example.producer.KafkaProducerServiceImpl;
6+
import org.apache.kafka.clients.consumer.ConsumerRecord;
7+
import org.apache.kafka.clients.consumer.ConsumerRecords;
8+
import org.apache.kafka.clients.consumer.KafkaConsumer;
9+
import org.junit.jupiter.api.BeforeEach;
10+
import org.junit.jupiter.api.Test;
11+
import org.springframework.beans.factory.annotation.Autowired;
12+
import org.springframework.beans.factory.annotation.Value;
13+
import org.springframework.boot.test.context.SpringBootTest;
14+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
15+
import org.springframework.kafka.test.context.EmbeddedKafka;
16+
import org.springframework.kafka.test.utils.KafkaTestUtils;
17+
import org.springframework.test.context.ActiveProfiles;
18+
19+
import java.time.Duration;
20+
import java.util.Collections;
21+
import java.util.Map;
22+
import java.util.Properties;
23+
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
@SpringBootTest
27+
@ActiveProfiles("test")
28+
@EmbeddedKafka(partitions = 1, topics = {"spring-kafka-example-simple-topic-test"})
29+
public class KafkaProducerServiceImplTest {
30+
31+
@SuppressWarnings("unused")
32+
@Value("${spring.kafka.topics.topic-1}")
33+
private String topic_1;
34+
35+
@SuppressWarnings("unused")
36+
@Value("${kafka.group-id}")
37+
private String group_id;
38+
39+
@Autowired
40+
@SuppressWarnings("unused")
41+
private KafkaProducerServiceImpl producer;
42+
43+
@Autowired
44+
@SuppressWarnings("unused")
45+
private JsonMapper mapper;
46+
47+
@Autowired
48+
@SuppressWarnings("unused")
49+
private EmbeddedKafkaBroker embeddedKafkaBroker;
50+
51+
private MessageRequestDtoRequest message;
52+
53+
@BeforeEach
54+
void init(){
55+
message = new MessageRequestDtoRequest("Teste Kafka");
56+
}
57+
58+
@Test
59+
void testSendMessage() {
60+
61+
producer.sendMessage(message);
62+
63+
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(this.getProps())) {
64+
consumer.subscribe(Collections.singleton(topic_1));
65+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
66+
67+
assertThat(records.count()).isGreaterThan(0);
68+
69+
ConsumerRecord<String, String> record = records.iterator().next();
70+
MessageRequestDtoRequest receivedMessage = mapper.toObject(record.value(), MessageRequestDtoRequest.class);
71+
assertThat(receivedMessage.message()).isEqualTo("Teste Kafka");
72+
73+
}
74+
}
75+
76+
private Properties getProps(){
77+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(group_id, "true", embeddedKafkaBroker);
78+
Properties props = new Properties();
79+
props.putAll(consumerProps);
80+
return props;
81+
}
82+
83+
84+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
spring:
2+
3+
application:
4+
name: spring-kafka-example-test
5+
6+
kafka:
7+
topics:
8+
topic-1: spring-kafka-example-simple-topic-test
9+
bootstrap-servers: localhost:9093
10+
consumer:
11+
group-id: spring-kafka-example-group-test
12+
auto-offset-reset: earliest
13+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
14+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
15+
listener:
16+
missing-topics-fatal: false
17+
18+
logging:
19+
20+
pattern:
21+
console: "%d{yyyy-MM-dd'T'HH:mm:ss} | ${spring.application.name} | %class{30} | %level | %m%n"
22+
23+
kafka:
24+
group-id: group-test

0 commit comments

Comments
 (0)