Skip to content

Commit 0ff2b13

Browse files
authored
Merge pull request #26 from majusko/feature/issue-25-message-error-handling
Feature/issue 25 message error handling
2 parents b7f44c3 + ba01716 commit 0ff2b13

File tree

9 files changed

+101
-12
lines changed

9 files changed

+101
-12
lines changed

pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<testcontainers.version>1.14.3</testcontainers.version>
2424
<junit-jupiter.version>5.6.2</junit-jupiter.version>
2525
<awaitility.version>4.0.3</awaitility.version>
26+
<reactor.core.version>3.3.6.RELEASE</reactor.core.version>
2627
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
2728
</properties>
2829

@@ -38,6 +39,13 @@
3839
<version>${apache.pulsar.client.version}</version>
3940
</dependency>
4041

42+
<dependency>
43+
<groupId>io.projectreactor</groupId>
44+
<artifactId>reactor-core</artifactId>
45+
<version>${reactor.core.version}</version>
46+
</dependency>
47+
48+
<!-- TESTS -->
4149
<dependency>
4250
<groupId>org.springframework.boot</groupId>
4351
<artifactId>spring-boot-starter-test</artifactId>
@@ -63,7 +71,6 @@
6371
<scope>test</scope>
6472
</dependency>
6573

66-
<!-- Pulsar integration tests -->
6774
<dependency>
6875
<groupId>org.testcontainers</groupId>
6976
<artifactId>testcontainers</artifactId>

src/main/java/io/github/majusko/pulsar/consumer/ConsumerBuilder.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@
22

33
import io.github.majusko.pulsar.collector.ConsumerCollector;
44
import io.github.majusko.pulsar.collector.ConsumerHolder;
5+
import io.github.majusko.pulsar.error.FailedMessage;
6+
import io.github.majusko.pulsar.error.exception.ConsumerInitException;
57
import org.apache.pulsar.client.api.Consumer;
68
import org.apache.pulsar.client.api.PulsarClient;
79
import org.apache.pulsar.client.api.PulsarClientException;
810
import org.apache.pulsar.client.api.Schema;
911
import org.springframework.context.annotation.DependsOn;
1012
import org.springframework.stereotype.Component;
13+
import reactor.core.Disposable;
14+
import reactor.core.publisher.EmitterProcessor;
1115

1216
import javax.annotation.PostConstruct;
1317
import java.lang.reflect.Method;
@@ -18,6 +22,7 @@
1822
@DependsOn({"pulsarClient", "consumerCollector"})
1923
public class ConsumerBuilder {
2024

25+
private final EmitterProcessor<FailedMessage> exceptionEmitter = EmitterProcessor.create();
2126
private final ConsumerCollector consumerCollector;
2227
private final PulsarClient pulsarClient;
2328

@@ -53,15 +58,19 @@ private Consumer<?> subscribe(String name, ConsumerHolder holder) {
5358
consumer.acknowledge(msg);
5459
} catch (Exception e) {
5560
consumer.negativeAcknowledge(msg);
56-
throw new RuntimeException("TODO Custom Exception!", e);
61+
exceptionEmitter.onNext(new FailedMessage(e, consumer, msg));
5762
}
5863
}).subscribe();
5964
} catch (PulsarClientException e) {
60-
throw new RuntimeException("TODO Custom Exception!", e);
65+
throw new ConsumerInitException("Failed to init consumer.", e);
6166
}
6267
}
6368

6469
public List<Consumer> getConsumers() {
6570
return consumers;
6671
}
72+
73+
public Disposable onError(java.util.function.Consumer<? super FailedMessage> consumer) {
74+
return exceptionEmitter.subscribe(consumer);
75+
}
6776
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.github.majusko.pulsar.error;
2+
3+
import org.apache.pulsar.client.api.Consumer;
4+
import org.apache.pulsar.client.api.Message;
5+
6+
public class FailedMessage {
7+
private final Exception exception;
8+
private final Consumer<?> consumer;
9+
private final Message<?> message;
10+
11+
public FailedMessage(Exception exception, Consumer<?> consumer, Message<?> message) {
12+
this.exception = exception;
13+
this.consumer = consumer;
14+
this.message = message;
15+
}
16+
17+
public Exception getException() {
18+
return exception;
19+
}
20+
21+
public Consumer<?> getConsumer() {
22+
return consumer;
23+
}
24+
25+
public Message<?> getMessage() {
26+
return message;
27+
}
28+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.github.majusko.pulsar.error.exception;
2+
3+
public class ConsumerInitException extends RuntimeException {
4+
public ConsumerInitException(String message, Throwable cause) {
5+
super(message, cause);
6+
}
7+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.github.majusko.pulsar.error.exception;
2+
3+
public class ProducerInitException extends RuntimeException {
4+
public ProducerInitException(String message, Throwable cause) {
5+
super(message, cause);
6+
}
7+
8+
public ProducerInitException(String message) {
9+
super(message);
10+
}
11+
}

src/main/java/io/github/majusko/pulsar/producer/ProducerCollector.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.github.majusko.pulsar.annotation.PulsarProducer;
44
import io.github.majusko.pulsar.collector.ProducerHolder;
55
import io.github.majusko.pulsar.constant.Serialization;
6+
import io.github.majusko.pulsar.error.exception.ProducerInitException;
67
import org.apache.pulsar.client.api.Producer;
78
import org.apache.pulsar.client.api.PulsarClient;
89
import org.apache.pulsar.client.api.PulsarClientException;
@@ -19,7 +20,7 @@ public class ProducerCollector implements BeanPostProcessor {
1920

2021
private final PulsarClient pulsarClient;
2122

22-
private Map<String, Producer> producers = new ConcurrentHashMap<>();
23+
private final Map<String, Producer> producers = new ConcurrentHashMap<>();
2324

2425
public ProducerCollector(PulsarClient pulsarClient) {
2526
this.pulsarClient = pulsarClient;
@@ -29,7 +30,7 @@ public ProducerCollector(PulsarClient pulsarClient) {
2930
public Object postProcessBeforeInitialization(Object bean, String beanName) {
3031
final Class<?> beanClass = bean.getClass();
3132

32-
if(beanClass.isAnnotationPresent(PulsarProducer.class) && bean instanceof PulsarProducerFactory) {
33+
if (beanClass.isAnnotationPresent(PulsarProducer.class) && bean instanceof PulsarProducerFactory) {
3334
producers.putAll(((PulsarProducerFactory) bean).getTopics().entrySet().stream()
3435
.map($ -> new ProducerHolder($.getKey(), $.getValue().left, $.getValue().right))
3536
.collect(Collectors.toMap(ProducerHolder::getTopic, this::buildProducer)));
@@ -48,16 +49,16 @@ private Producer<?> buildProducer(ProducerHolder holder) {
4849
return pulsarClient.newProducer(getSchema(holder))
4950
.topic(holder.getTopic())
5051
.create();
51-
} catch(PulsarClientException e) {
52-
throw new RuntimeException("TODO Custom Exception!", e);
52+
} catch (PulsarClientException e) {
53+
throw new ProducerInitException("Failed to init producer.", e);
5354
}
5455
}
5556

5657
private <T> Schema<?> getSchema(ProducerHolder holder) throws RuntimeException {
5758
if (holder.getSerialization().equals(Serialization.JSON)) {
5859
return Schema.JSON(holder.getClazz());
5960
}
60-
throw new RuntimeException("TODO custom runtime exception");
61+
throw new ProducerInitException("Unknown producer schema.");
6162
}
6263

6364
Map<String, Producer> getProducers() {

src/test/java/io/github/majusko/pulsar/PulsarJavaSpringBootStarterApplicationTests.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Set;
27+
import java.util.concurrent.atomic.AtomicBoolean;
2728

2829
import static org.awaitility.Awaitility.await;
2930

@@ -44,6 +45,9 @@ class PulsarJavaSpringBootStarterApplicationTests {
4445
@Autowired
4546
private PulsarTemplate<MyMsg> producer;
4647

48+
@Autowired
49+
private PulsarTemplate<String> producerForError;
50+
4751
@Container
4852
static PulsarContainer pulsarContainer = new PulsarContainer();
4953

@@ -66,12 +70,11 @@ void testProducerSendMethod() throws PulsarClientException {
6670
void testConsumerRegistration1() throws Exception {
6771
final List<Consumer> consumers = consumerBuilder.getConsumers();
6872

69-
Assertions.assertEquals(1, consumers.size());
73+
Assertions.assertEquals(2, consumers.size());
7074

71-
final Consumer consumer = consumers.stream().findFirst().orElseThrow(Exception::new);
75+
final Consumer<?> consumer = consumers.stream().filter( $-> $.getTopic().equals("topic-one")).findFirst().orElseThrow(Exception::new);
7276

7377
Assertions.assertNotNull(consumer);
74-
Assertions.assertEquals("topic-one", consumer.getTopic());
7578
}
7679

7780
@Test
@@ -91,11 +94,29 @@ void testProducerRegistration() {
9194

9295
final Map<String, ImmutablePair<Class<?>, Serialization>> topics = producerFactory.getTopics();
9396

94-
Assertions.assertEquals(2, topics.size());
97+
Assertions.assertEquals(3, topics.size());
9598

9699
final Set<String> topicNames = new HashSet<>(topics.keySet());
97100

98101
Assertions.assertTrue(topicNames.contains("topic-one"));
99102
Assertions.assertTrue(topicNames.contains("topic-two"));
100103
}
104+
105+
@Test
106+
void testMessageErrorHandling() throws PulsarClientException {
107+
final AtomicBoolean receivedError = new AtomicBoolean(false);
108+
final String messageToSend = "This message will never arrive.";
109+
110+
producerForError.send("topic-for-error", messageToSend);
111+
112+
consumerBuilder.onError(($) -> {
113+
Assertions.assertEquals($.getConsumer().getTopic(), "topic-for-error");
114+
Assertions.assertEquals($.getMessage().getValue(), messageToSend);
115+
Assertions.assertNotNull($.getException());
116+
117+
receivedError.set(true);
118+
});
119+
120+
await().untilTrue(receivedError);
121+
}
101122
}

src/test/java/io/github/majusko/pulsar/TestConsumers.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,8 @@ public void topicOneListener(MyMsg myMsg) {
1717
Assertions.assertNotNull(myMsg);
1818
mockTopicListenerReceived.set(true);
1919
}
20+
21+
@PulsarConsumer(topic = "topic-for-error", clazz = String.class, serialization = Serialization.JSON)
22+
public void topicForErrorListener(Integer myMsg) {
23+
}
2024
}

src/test/java/io/github/majusko/pulsar/TestProducerConfiguration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public class TestProducerConfiguration {
1111
@Bean
1212
public ProducerFactory producerFactory() {
1313
return new ProducerFactory()
14+
.addProducer("topic-for-error", String.class)
1415
.addProducer("topic-one", MyMsg.class)
1516
.addProducer("topic-two", MyMsg2.class, Serialization.JSON);
1617
}

0 commit comments

Comments
 (0)