Skip to content

Commit 37d88e2

Browse files
committed
Implemented simple emmitter for catching exceptions during the message consumption.
1 parent b7f44c3 commit 37d88e2

File tree

5 files changed

+58
-7
lines changed

5 files changed

+58
-7
lines changed

pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
<testcontainers.version>1.14.3</testcontainers.version>
2424
<junit-jupiter.version>5.6.2</junit-jupiter.version>
2525
<awaitility.version>4.0.3</awaitility.version>
26+
<reactor.core.version>3.3.6.RELEASE</reactor.core.version>
2627
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
2728
</properties>
2829

@@ -38,6 +39,13 @@
3839
<version>${apache.pulsar.client.version}</version>
3940
</dependency>
4041

42+
<dependency>
43+
<groupId>io.projectreactor</groupId>
44+
<artifactId>reactor-core</artifactId>
45+
<version>${reactor.core.version}</version>
46+
</dependency>
47+
48+
<!-- TESTS -->
4149
<dependency>
4250
<groupId>org.springframework.boot</groupId>
4351
<artifactId>spring-boot-starter-test</artifactId>
@@ -63,7 +71,6 @@
6371
<scope>test</scope>
6472
</dependency>
6573

66-
<!-- Pulsar integration tests -->
6774
<dependency>
6875
<groupId>org.testcontainers</groupId>
6976
<artifactId>testcontainers</artifactId>

src/main/java/io/github/majusko/pulsar/consumer/ConsumerBuilder.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@
22

33
import io.github.majusko.pulsar.collector.ConsumerCollector;
44
import io.github.majusko.pulsar.collector.ConsumerHolder;
5+
import io.github.majusko.pulsar.error.FailedMessage;
6+
import io.github.majusko.pulsar.error.exception.ConsumerInitException;
57
import org.apache.pulsar.client.api.Consumer;
68
import org.apache.pulsar.client.api.PulsarClient;
79
import org.apache.pulsar.client.api.PulsarClientException;
810
import org.apache.pulsar.client.api.Schema;
911
import org.springframework.context.annotation.DependsOn;
1012
import org.springframework.stereotype.Component;
13+
import reactor.core.Disposable;
14+
import reactor.core.publisher.EmitterProcessor;
1115

1216
import javax.annotation.PostConstruct;
1317
import java.lang.reflect.Method;
@@ -18,10 +22,11 @@
1822
@DependsOn({"pulsarClient", "consumerCollector"})
1923
public class ConsumerBuilder {
2024

25+
private final EmitterProcessor<FailedMessage> exceptionEmitter = EmitterProcessor.create();
2126
private final ConsumerCollector consumerCollector;
2227
private final PulsarClient pulsarClient;
2328

24-
private List<Consumer> consumers;
29+
private List<Consumer<?>> consumers;
2530

2631
public ConsumerBuilder(ConsumerCollector consumerCollector, PulsarClient pulsarClient) {
2732
this.consumerCollector = consumerCollector;
@@ -53,15 +58,19 @@ private Consumer<?> subscribe(String name, ConsumerHolder holder) {
5358
consumer.acknowledge(msg);
5459
} catch (Exception e) {
5560
consumer.negativeAcknowledge(msg);
56-
throw new RuntimeException("TODO Custom Exception!", e);
61+
exceptionEmitter.onNext(new FailedMessage(e, consumer, msg));
5762
}
5863
}).subscribe();
5964
} catch (PulsarClientException e) {
60-
throw new RuntimeException("TODO Custom Exception!", e);
65+
throw new ConsumerInitException("Failed to init consumer.", e);
6166
}
6267
}
6368

64-
public List<Consumer> getConsumers() {
69+
public List<Consumer<?>> getConsumers() {
6570
return consumers;
6671
}
72+
73+
public Disposable onError(java.util.function.Consumer<? super FailedMessage> consumer) {
74+
return exceptionEmitter.subscribe(consumer);
75+
}
6776
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package io.github.majusko.pulsar.error;
2+
3+
import org.apache.pulsar.client.api.Consumer;
4+
import org.apache.pulsar.client.api.Message;
5+
6+
public class FailedMessage {
7+
private final Exception exception;
8+
private final Consumer<?> consumer;
9+
private final Message<?> message;
10+
11+
public FailedMessage(Exception exception, Consumer<?> consumer, Message<?> message) {
12+
this.exception = exception;
13+
this.consumer = consumer;
14+
this.message = message;
15+
}
16+
17+
public Exception getException() {
18+
return exception;
19+
}
20+
21+
public Consumer<?> getConsumer() {
22+
return consumer;
23+
}
24+
25+
public Message<?> getMessage() {
26+
return message;
27+
}
28+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.github.majusko.pulsar.error.exception;
2+
3+
public class ConsumerInitException extends RuntimeException {
4+
public ConsumerInitException(String message, Throwable cause) {
5+
super(message, cause);
6+
}
7+
}

src/test/java/io/github/majusko/pulsar/PulsarJavaSpringBootStarterApplicationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,11 @@ void testProducerSendMethod() throws PulsarClientException {
6464

6565
@Test
6666
void testConsumerRegistration1() throws Exception {
67-
final List<Consumer> consumers = consumerBuilder.getConsumers();
67+
final List<Consumer<?>> consumers = consumerBuilder.getConsumers();
6868

6969
Assertions.assertEquals(1, consumers.size());
7070

71-
final Consumer consumer = consumers.stream().findFirst().orElseThrow(Exception::new);
71+
final Consumer<?> consumer = consumers.stream().findFirst().orElseThrow(Exception::new);
7272

7373
Assertions.assertNotNull(consumer);
7474
Assertions.assertEquals("topic-one", consumer.getTopic());

0 commit comments

Comments
 (0)