Skip to content

Commit 4865613

Browse files
Merge pull request #18278 from LordMaduz/BAEL-6621
Bael 6621
2 parents 88e7958 + 9f09cee commit 4865613

File tree

7 files changed

+357
-0
lines changed

7 files changed

+357
-0
lines changed

spring-reactive-modules/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
<module>spring-reactor</module>
3333
<module>spring-webflux-amqp</module>
3434
<module>spring-reactive-kafka-stream-binder</module>
35+
<module>spring-reactive-kafka</module>
3536

3637
<!-- the following submodules are commented out as a workaround in order to use java 19+ and SB 3.2.x -->
3738
<!-- <module>spring-reactive-performance</module>--> <!-- JAVA-42050 -->
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<artifactId>spring-reactive-kafka</artifactId>
8+
<version>1.0.0-SNAPSHOT</version>
9+
<packaging>jar</packaging>
10+
<name>spring-reactive-kafka</name>
11+
<description>Spring Reactive Kafka</description>
12+
13+
<parent>
14+
<groupId>com.baeldung.spring.reactive</groupId>
15+
<artifactId>spring-reactive-modules</artifactId>
16+
<version>1.0.0-SNAPSHOT</version>
17+
</parent>
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.springframework.kafka</groupId>
22+
<artifactId>spring-kafka</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>io.projectreactor.kafka</groupId>
26+
<artifactId>reactor-kafka</artifactId>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.springframework.boot</groupId>
30+
<artifactId>spring-boot-starter-webflux</artifactId>
31+
</dependency>
32+
<dependency>
33+
<groupId>org.springframework.boot</groupId>
34+
<artifactId>spring-boot-starter-test</artifactId>
35+
<scope>test</scope>
36+
</dependency>
37+
<dependency>
38+
<groupId>io.projectreactor</groupId>
39+
<artifactId>reactor-test</artifactId>
40+
<scope>test</scope>
41+
</dependency>
42+
<dependency>
43+
<groupId>org.springframework.kafka</groupId>
44+
<artifactId>spring-kafka-test</artifactId>
45+
<scope>test</scope>
46+
</dependency>
47+
<dependency>
48+
<groupId>org.junit.jupiter</groupId>
49+
<artifactId>junit-jupiter</artifactId>
50+
<scope>test</scope>
51+
</dependency>
52+
<dependency>
53+
<groupId>org.projectlombok</groupId>
54+
<artifactId>lombok</artifactId>
55+
</dependency>
56+
</dependencies>
57+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.baeldung;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class SpringReactiveKafkaApplication {
8+
9+
public static void main(String[] args) {
10+
SpringApplication.run(SpringReactiveKafkaApplication.class, args);
11+
}
12+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package com.baeldung.config;
2+
3+
import java.util.Collections;
4+
import java.util.HashMap;
5+
import java.util.Map;
6+
7+
import org.apache.kafka.clients.consumer.ConsumerConfig;
8+
import org.apache.kafka.clients.producer.ProducerConfig;
9+
import org.apache.kafka.common.serialization.StringDeserializer;
10+
import org.apache.kafka.common.serialization.StringSerializer;
11+
import org.springframework.beans.factory.annotation.Value;
12+
import org.springframework.context.annotation.Bean;
13+
import org.springframework.context.annotation.Configuration;
14+
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
15+
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
16+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
17+
18+
import reactor.kafka.receiver.ReceiverOptions;
19+
import reactor.kafka.sender.SenderOptions;
20+
21+
@Configuration
22+
public class KafkaConfig {
23+
24+
@Value("${spring.kafka.bootstrap-servers}")
25+
private String bootstrapServers;
26+
27+
private Map<String, Object> consumerConfig() {
28+
Map<String, Object> config = new HashMap<>();
29+
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
30+
config.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-consumer-group");
31+
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
32+
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
33+
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
34+
return config;
35+
}
36+
37+
private Map<String, Object> errorHandlingConsumerConfig() {
38+
Map<String, Object> config = new HashMap<>();
39+
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
40+
config.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-consumer-group");
41+
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
42+
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
43+
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
44+
config.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
45+
config.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, StringDeserializer.class);
46+
return config;
47+
}
48+
49+
private Map<String, Object> producerConfig() {
50+
Map<String, Object> config = new HashMap<>();
51+
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
52+
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
53+
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
54+
return config;
55+
}
56+
57+
private ReceiverOptions<String, String> receiverOptions() {
58+
Map<String, Object> consumerConfig = consumerConfig();
59+
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(consumerConfig);
60+
return receiverOptions.subscription(Collections.singletonList("test-topic"));
61+
}
62+
63+
private SenderOptions<String, String> senderOptions() {
64+
Map<String, Object> producerConfig = producerConfig();
65+
return SenderOptions.create(producerConfig);
66+
}
67+
68+
@Bean
69+
public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate() {
70+
return new ReactiveKafkaProducerTemplate<>(senderOptions());
71+
}
72+
73+
@Bean
74+
public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate() {
75+
return new ReactiveKafkaConsumerTemplate<>(receiverOptions());
76+
}
77+
78+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.baeldung.consumer;
2+
3+
import java.time.Duration;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
7+
import org.springframework.stereotype.Service;
8+
9+
import jakarta.annotation.PostConstruct;
10+
import lombok.RequiredArgsConstructor;
11+
import lombok.extern.slf4j.Slf4j;
12+
import reactor.core.publisher.Flux;
13+
import reactor.kafka.receiver.ReceiverRecord;
14+
import reactor.util.retry.Retry;
15+
16+
@Service
17+
@RequiredArgsConstructor
18+
@Slf4j
19+
public class ConsumerService {
20+
21+
private final ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate;
22+
23+
@PostConstruct
24+
public Flux<String> consumeRecord() {
25+
return reactiveKafkaConsumerTemplate.receive()
26+
.map(ReceiverRecord::value)
27+
.doOnNext(msg -> log.info("Received: {}", msg));
28+
}
29+
30+
public Flux<String> consumeAsABatch() {
31+
return reactiveKafkaConsumerTemplate.receive()
32+
.buffer(2)
33+
.flatMap(messages -> Flux.fromStream(messages.stream()
34+
.map(ReceiverRecord::value)));
35+
}
36+
37+
public Flux<String> consumeWithLimit() {
38+
return reactiveKafkaConsumerTemplate.receive()
39+
.limitRate(2)
40+
.map(ReceiverRecord::value);
41+
}
42+
43+
public Flux<String> consumeWithRetryWithBackOff(AtomicInteger attempts) {
44+
return reactiveKafkaConsumerTemplate.receive()
45+
.flatMap(msg -> attempts.incrementAndGet() < 3 ? Flux.error(new RuntimeException("Failure")) : Flux.just(msg))
46+
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(1)))
47+
.map(ReceiverRecord::value);
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.baeldung.publisher;
2+
3+
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
4+
import org.springframework.stereotype.Service;
5+
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
import reactor.core.publisher.Mono;
9+
10+
@Service
11+
@RequiredArgsConstructor
12+
@Slf4j
13+
public class PublisherService {
14+
15+
private final ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate;
16+
17+
public Mono<Void> publish(String message, String topic) {
18+
return kafkaProducerTemplate.send(topic, message)
19+
.doOnError(error -> log.info("unable to send message due to: {}", error.getMessage()))
20+
.then();
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package com.baeldung.reactive.kafka;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import java.util.Collections;
6+
import java.util.Map;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
10+
import org.apache.kafka.clients.admin.AdminClient;
11+
import org.apache.kafka.clients.admin.AdminClientConfig;
12+
import org.apache.kafka.clients.admin.DeleteTopicsResult;
13+
import org.junit.jupiter.api.AfterEach;
14+
import org.junit.jupiter.api.BeforeEach;
15+
import org.junit.jupiter.api.Test;
16+
import org.springframework.beans.factory.annotation.Autowired;
17+
import org.springframework.beans.factory.annotation.Value;
18+
import org.springframework.boot.test.context.SpringBootTest;
19+
import org.springframework.kafka.core.KafkaAdmin;
20+
import org.springframework.kafka.test.context.EmbeddedKafka;
21+
import org.springframework.test.context.TestPropertySource;
22+
23+
import com.baeldung.config.KafkaConfig;
24+
import com.baeldung.consumer.ConsumerService;
25+
import com.baeldung.publisher.PublisherService;
26+
27+
import lombok.extern.slf4j.Slf4j;
28+
import reactor.core.publisher.Flux;
29+
import reactor.core.publisher.Mono;
30+
import reactor.test.StepVerifier;
31+
32+
@SpringBootTest(classes = { KafkaConfig.class, PublisherService.class, ConsumerService.class })
33+
@EmbeddedKafka(partitions = 1, topics = { "test-topic" }, controlledShutdown = true)
34+
@TestPropertySource(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}")
35+
@Slf4j
36+
public class ReactorKafkaConsumerUnitTest {
37+
38+
@Value("${spring.kafka.bootstrap-servers}")
39+
private String bootstrapServers;
40+
41+
@Autowired
42+
private ConsumerService consumerService;
43+
44+
@Autowired
45+
private PublisherService publisherService;
46+
47+
private KafkaAdmin kafkaAdmin;
48+
49+
@BeforeEach
50+
void setUp() {
51+
kafkaAdmin = new KafkaAdmin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers));
52+
}
53+
54+
@AfterEach
55+
public void cleanUp() throws ExecutionException, InterruptedException {
56+
AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties());
57+
58+
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singletonList("test-topic"));
59+
deleteTopicsResult.all()
60+
.get();
61+
}
62+
63+
@Test
64+
void whenMessageIsPublished_thenConsumerReceivesSuccessfully() {
65+
66+
Mono<Void> publisher = publisherService.publish("message", "test-topic");
67+
68+
Flux<String> consumer = consumerService.consumeRecord();
69+
70+
StepVerifier.create(publisher)
71+
.expectComplete()
72+
.verify();
73+
74+
StepVerifier.create(consumer)
75+
.expectNext("message")
76+
.thenCancel()
77+
.verify();
78+
}
79+
80+
@Test
81+
void whenMessagesArePublished_thenConsumedAsABatch() {
82+
83+
Mono<Void> publisher = Mono.when(publisherService.publish("message-1", "test-topic")
84+
.then(), publisherService.publish("message-2", "test-topic")
85+
.then());
86+
87+
Flux<String> consumer = consumerService.consumeAsABatch();
88+
89+
StepVerifier.create(publisher)
90+
.expectComplete()
91+
.verify();
92+
93+
StepVerifier.create(consumer)
94+
.expectNext("message-1")
95+
.expectNext("message-2")
96+
.thenCancel()
97+
.verify();
98+
}
99+
100+
@Test
101+
void whenMessagesArePublished_thenConsumedWithLimit() {
102+
103+
Mono<Void> publisher = Mono.when(publisherService.publish("message-1", "test-topic")
104+
.then(), publisherService.publish("message-2", "test-topic")
105+
.then());
106+
107+
Flux<String> consumer = consumerService.consumeWithLimit();
108+
109+
StepVerifier.create(publisher)
110+
.expectComplete()
111+
.verify();
112+
113+
StepVerifier.create(consumer)
114+
.expectNextCount(2)
115+
.thenCancel()
116+
.verify();
117+
}
118+
119+
@Test
120+
void whenMessageIsPublished_thenConsumedWithRetry() {
121+
122+
AtomicInteger attempts = new AtomicInteger();
123+
Mono<Void> publisher = publisherService.publish("message", "test-topic");
124+
125+
Flux<String> consumer = consumerService.consumeWithRetryWithBackOff(attempts);
126+
127+
StepVerifier.create(publisher)
128+
.expectComplete()
129+
.verify();
130+
131+
StepVerifier.create(consumer)
132+
.expectNext("message")
133+
.thenCancel()
134+
.verify();
135+
136+
assertEquals(3, attempts.get());
137+
}
138+
}

0 commit comments

Comments
 (0)