Skip to content

Commit d9ff6a9

Browse files
committed
Implemented exception wrappers for producer init lifecycle.
1 parent d775490 commit d9ff6a9

File tree

2 files changed

+18
-6
lines changed

2 files changed

+18
-6
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.github.majusko.pulsar.error.exception;
2+
3+
public class ProducerInitException extends RuntimeException {
4+
public ProducerInitException(String message, Throwable cause) {
5+
super(message, cause);
6+
}
7+
8+
public ProducerInitException(String message) {
9+
super(message);
10+
}
11+
}

src/main/java/io/github/majusko/pulsar/producer/ProducerCollector.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.github.majusko.pulsar.annotation.PulsarProducer;
44
import io.github.majusko.pulsar.collector.ProducerHolder;
55
import io.github.majusko.pulsar.constant.Serialization;
6+
import io.github.majusko.pulsar.error.exception.ProducerInitException;
67
import org.apache.pulsar.client.api.Producer;
78
import org.apache.pulsar.client.api.PulsarClient;
89
import org.apache.pulsar.client.api.PulsarClientException;
@@ -19,7 +20,7 @@ public class ProducerCollector implements BeanPostProcessor {
1920

2021
private final PulsarClient pulsarClient;
2122

22-
private Map<String, Producer> producers = new ConcurrentHashMap<>();
23+
private final Map<String, Producer<?>> producers = new ConcurrentHashMap<>();
2324

2425
public ProducerCollector(PulsarClient pulsarClient) {
2526
this.pulsarClient = pulsarClient;
@@ -29,7 +30,7 @@ public ProducerCollector(PulsarClient pulsarClient) {
2930
public Object postProcessBeforeInitialization(Object bean, String beanName) {
3031
final Class<?> beanClass = bean.getClass();
3132

32-
if(beanClass.isAnnotationPresent(PulsarProducer.class) && bean instanceof PulsarProducerFactory) {
33+
if (beanClass.isAnnotationPresent(PulsarProducer.class) && bean instanceof PulsarProducerFactory) {
3334
producers.putAll(((PulsarProducerFactory) bean).getTopics().entrySet().stream()
3435
.map($ -> new ProducerHolder($.getKey(), $.getValue().left, $.getValue().right))
3536
.collect(Collectors.toMap(ProducerHolder::getTopic, this::buildProducer)));
@@ -48,19 +49,19 @@ private Producer<?> buildProducer(ProducerHolder holder) {
4849
return pulsarClient.newProducer(getSchema(holder))
4950
.topic(holder.getTopic())
5051
.create();
51-
} catch(PulsarClientException e) {
52-
throw new RuntimeException("TODO Custom Exception!", e);
52+
} catch (PulsarClientException e) {
53+
throw new ProducerInitException("Failed to init producer.", e);
5354
}
5455
}
5556

5657
private <T> Schema<?> getSchema(ProducerHolder holder) throws RuntimeException {
5758
if (holder.getSerialization().equals(Serialization.JSON)) {
5859
return Schema.JSON(holder.getClazz());
5960
}
60-
throw new RuntimeException("TODO custom runtime exception");
61+
throw new ProducerInitException("Unknown producer schema.");
6162
}
6263

63-
Map<String, Producer> getProducers() {
64+
Map<String, Producer<?>> getProducers() {
6465
return producers;
6566
}
6667
}

0 commit comments

Comments
 (0)