Skip to content

Commit e52cced

Browse files
committed
Added support for bytes, string and protobuf. Fixed issue with duplicated key. Test coverage for new features.
1 parent 544d49b commit e52cced

File tree

8 files changed

+92
-33
lines changed

8 files changed

+92
-33
lines changed

src/main/java/io/github/majusko/pulsar/PulsarSpringStarterUtils.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,29 +30,26 @@ private static <T> Schema<?> getGenericSchema(Serialization serialization, Class
3030
}
3131

3232
private static <T extends com.google.protobuf.GeneratedMessageV3> Schema<?> getProtoSchema(Serialization serialization, Class<T> clazz) throws RuntimeException {
33-
switch (serialization) {
34-
case PROTOBUF: {
35-
return Schema.PROTOBUF(clazz);
36-
}
37-
case PROTOBUF_NATIVE: {
38-
return Schema.PROTOBUF_NATIVE(clazz);
39-
}
40-
default: {
41-
throw new ProducerInitException("Unknown producer schema.");
42-
}
33+
if (serialization == Serialization.PROTOBUF) {
34+
return Schema.PROTOBUF(clazz);
4335
}
36+
throw new ProducerInitException("Unknown producer schema.");
4437
}
4538

4639
public static Schema<?> getSchema(Serialization serialisation, Class<?> clazz) {
47-
if(isProto(serialisation)) {
40+
if (clazz == byte[].class) {
41+
return Schema.BYTES;
42+
}
43+
44+
if (isProto(serialisation)) {
4845
return getProtoSchema(serialisation, (Class<? extends GeneratedMessageV3>) clazz);
4946
}
5047

5148
return getGenericSchema(serialisation, clazz);
5249
}
5350

5451
public static boolean isProto(Serialization serialization) {
55-
return serialization == Serialization.PROTOBUF || serialization == Serialization.PROTOBUF_NATIVE;
52+
return serialization == Serialization.PROTOBUF;
5653
}
5754

5855
public static Class<?> getParameterType(Method method) {

src/main/java/io/github/majusko/pulsar/annotation/PulsarConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
public @interface PulsarConsumer {
1414
String topic();
1515

16-
Class<?> clazz();
16+
Class<?> clazz() default byte[].class;
1717

1818
Serialization serialization() default Serialization.JSON;
1919

src/main/java/io/github/majusko/pulsar/collector/ConsumerCollector.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package io.github.majusko.pulsar.collector;
22

3-
import io.github.majusko.pulsar.PulsarSpringStarterUtils;
43
import io.github.majusko.pulsar.annotation.PulsarConsumer;
54
import org.springframework.beans.factory.annotation.Value;
65
import org.springframework.beans.factory.config.BeanPostProcessor;
76
import org.springframework.context.annotation.Configuration;
87

8+
import java.lang.reflect.Method;
9+
import java.lang.reflect.Type;
910
import java.util.Arrays;
1011
import java.util.Map;
1112
import java.util.Optional;
@@ -29,8 +30,9 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) {
2930
consumers.putAll(Arrays.stream(beanClass.getDeclaredMethods())
3031
.filter($ -> $.isAnnotationPresent(PulsarConsumer.class))
3132
.collect(Collectors.toMap(
32-
method -> beanClass.getName() + consumerNameDelimiter + method.getName(),
33-
method -> new ConsumerHolder(method.getAnnotation(PulsarConsumer.class), method, bean, getParameterType(method)))));
33+
method -> getConsumerName(beanClass, method),
34+
method -> new ConsumerHolder(method.getAnnotation(PulsarConsumer.class), method, bean,
35+
getParameterType(method)))));
3436

3537
return bean;
3638
}
@@ -47,4 +49,11 @@ public Map<String, ConsumerHolder> getConsumers() {
4749
public Optional<ConsumerHolder> getConsumer(String methodDescriptor) {
4850
return Optional.ofNullable(consumers.get(methodDescriptor));
4951
}
52+
53+
public String getConsumerName(Class<?> clazz, Method method) {
54+
return clazz.getName() + consumerNameDelimiter + method.getName() + Arrays
55+
.stream(method.getGenericParameterTypes())
56+
.map(Type::getTypeName)
57+
.collect(Collectors.joining(consumerNameDelimiter));
58+
}
5059
}

src/main/java/io/github/majusko/pulsar/constant/Serialization.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,5 @@ public enum Serialization {
55
AVRO,
66
STRING,
77
BYTE,
8-
PROTOBUF,
9-
PROTOBUF_NATIVE
8+
PROTOBUF
109
}

src/main/java/io/github/majusko/pulsar/producer/ProducerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class ProducerFactory implements PulsarProducerFactory {
1313
private final Map<String, ImmutablePair<Class<?>, Serialization>> topics = new HashMap<>();
1414

1515
public ProducerFactory addProducer(String topic) {
16-
return addProducer(topic, Byte.class, Serialization.BYTE);
16+
return addProducer(topic, byte[].class, Serialization.BYTE);
1717
}
1818

1919
public ProducerFactory addProducer(String topic, Class<?> clazz) {

src/test/java/io/github/majusko/pulsar/PulsarJavaSpringBootStarterApplicationTests.java

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.testcontainers.junit.jupiter.Testcontainers;
2626
import reactor.core.Disposable;
2727

28+
import java.nio.charset.StandardCharsets;
2829
import java.time.Duration;
2930
import java.util.*;
3031
import java.util.concurrent.atomic.AtomicBoolean;
@@ -59,12 +60,20 @@ class PulsarJavaSpringBootStarterApplicationTests {
5960
@Autowired
6061
private PulsarTemplate<ProtoMsg> producerForProtoTopic;
6162

63+
@Autowired
64+
private PulsarTemplate<byte[]> producerForByteTopic;
65+
66+
@Autowired
67+
private PulsarTemplate<String> producerForStringTopic;
68+
6269
@Container
6370
static PulsarContainer pulsarContainer = new PulsarContainer();
6471

6572
@Autowired
6673
private TestConsumers testConsumers;
6774

75+
public static final String VALIDATION_STRING = "validation-string";
76+
6877
@DynamicPropertySource
6978
static void propertySettings(DynamicPropertyRegistry registry) {
7079
registry.add("pulsar.serviceUrl", pulsarContainer::getPulsarBrokerUrl);
@@ -99,11 +108,11 @@ void testProducerSendAsyncMethod() throws PulsarClientException {
99108
@Test
100109
void testProducerCreateMessageMethod() throws PulsarClientException {
101110
producer.createMessage("topic-message", new MyMsg("my-message"))
102-
.property("my-key", "my-value")
103-
.property("my-other-key", "my-other-value")
104-
.sequenceId(123l)
105-
.key("my-key")
106-
.send();
111+
.property("my-key", "my-value")
112+
.property("my-other-key", "my-other-value")
113+
.sequenceId(123l)
114+
.key("my-key")
115+
.send();
107116

108117
await().untilTrue(testConsumers.mockTopicMessageListenerReceived);
109118
}
@@ -112,9 +121,10 @@ void testProducerCreateMessageMethod() throws PulsarClientException {
112121
void testConsumerRegistration1() throws Exception {
113122
final List<Consumer> consumers = consumerAggregator.getConsumers();
114123

115-
Assertions.assertEquals(6, consumers.size());
124+
Assertions.assertEquals(9, consumers.size());
116125

117-
final Consumer<?> consumer = consumers.stream().filter( $-> $.getTopic().equals("topic-one")).findFirst().orElseThrow(Exception::new);
126+
final Consumer<?> consumer =
127+
consumers.stream().filter($ -> $.getTopic().equals("topic-one")).findFirst().orElseThrow(Exception::new);
118128

119129
Assertions.assertNotNull(consumer);
120130
}
@@ -123,22 +133,26 @@ void testConsumerRegistration1() throws Exception {
123133
void testConsumerRegistration2() {
124134
final Class<TestConsumers> clazz = TestConsumers.class;
125135
final List<ConsumerHolder> consumerHolders = Arrays.stream(clazz.getMethods())
126-
.map($ -> consumerCollector.getConsumer(clazz.getName() + $.getName()).orElse(null))
136+
.map($ -> consumerCollector.getConsumer(consumerCollector.getConsumerName(clazz, $)))
137+
.filter(Optional::isPresent)
138+
.map(Optional::get)
127139
.collect(Collectors.toList());
128140

129141
Assertions.assertNotNull(consumerHolders);
130142
Assertions.assertTrue(consumerHolders.stream().anyMatch($ -> $.getAnnotation().topic().equals("topic-one")));
131-
Assertions.assertTrue(consumerHolders.stream().anyMatch($ -> $.getAnnotation().topic().equals("topic-for-error")));
143+
Assertions.assertTrue(consumerHolders.stream().anyMatch($ -> $.getAnnotation().topic().equals("topic-for" +
144+
"-error")));
132145
Assertions.assertTrue(consumerHolders.stream().anyMatch($ -> $.getBean().getClass().equals(TestConsumers.class)));
133-
Assertions.assertTrue(consumerHolders.stream().anyMatch($ -> $.getHandler().getName().equals("topicOneListener")));
146+
Assertions.assertTrue(consumerHolders.stream().anyMatch($ -> $.getHandler().getName().equals(
147+
"topicOneListener")));
134148
}
135149

136150
@Test
137151
void testProducerRegistration() {
138152

139153
final Map<String, ImmutablePair<Class<?>, Serialization>> topics = producerFactory.getTopics();
140154

141-
Assertions.assertEquals(7, topics.size());
155+
Assertions.assertEquals(10, topics.size());
142156

143157
final Set<String> topicNames = new HashSet<>(topics.keySet());
144158

@@ -175,8 +189,22 @@ void avroSerializationTestOk() throws Exception {
175189

176190
@Test
177191
void protoSerializationTestOk() throws Exception {
178-
final ProtoMsg msg = ProtoMsg.newBuilder().setData("data").build();
192+
final ProtoMsg msg = ProtoMsg.newBuilder().setData(VALIDATION_STRING).build();
179193
producerForProtoTopic.send("topic-proto", msg);
180194
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.protoTopicReceived.get());
181195
}
196+
197+
@Test
198+
void byteSerializationTestOk() throws Exception {
199+
byte[] data = VALIDATION_STRING.getBytes(StandardCharsets.UTF_8);
200+
201+
producerForByteTopic.send("topic-byte", data);
202+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.byteTopicReceived.get());
203+
}
204+
205+
@Test
206+
void stringSerializationTestOk() throws Exception {
207+
producerForStringTopic.send("topic-string", VALIDATION_STRING);
208+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.stringTopicReceived.get());
209+
}
182210
}

src/test/java/io/github/majusko/pulsar/TestConsumers.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.junit.jupiter.api.Assertions;
1010
import org.springframework.stereotype.Service;
1111

12+
import java.nio.charset.StandardCharsets;
1213
import java.util.concurrent.atomic.AtomicBoolean;
1314
import java.util.concurrent.atomic.AtomicInteger;
1415

@@ -20,6 +21,8 @@ public class TestConsumers {
2021
public AtomicBoolean mockTopicMessageListenerReceived = new AtomicBoolean(false);
2122
public AtomicBoolean avroTopicReceived = new AtomicBoolean(false);
2223
public AtomicBoolean protoTopicReceived = new AtomicBoolean(false);
24+
public AtomicBoolean byteTopicReceived = new AtomicBoolean(false);
25+
public AtomicBoolean stringTopicReceived = new AtomicBoolean(false);
2326
public AtomicBoolean mockRetryCountListenerReceived = new AtomicBoolean(false);
2427
public AtomicInteger retryCount = new AtomicInteger(0);
2528

@@ -42,9 +45,27 @@ public void avroTopic(AvroMsg avroMsg) {
4245
@PulsarConsumer(topic = "topic-proto", clazz = ProtoMsg.class, serialization = Serialization.PROTOBUF)
4346
public void protoTopic(ProtoMsg protoMsg) {
4447
Assertions.assertNotNull(protoMsg);
48+
Assertions.assertEquals(PulsarJavaSpringBootStarterApplicationTests.VALIDATION_STRING, protoMsg.getData());
49+
4550
protoTopicReceived.set(true);
4651
}
4752

53+
@PulsarConsumer(topic = "topic-byte")
54+
public void byteTopic(byte[] byteMsg) {
55+
Assertions.assertNotNull(byteMsg);
56+
Assertions.assertEquals(PulsarJavaSpringBootStarterApplicationTests.VALIDATION_STRING, new String(byteMsg, StandardCharsets.UTF_8));
57+
58+
byteTopicReceived.set(true);
59+
}
60+
61+
@PulsarConsumer(topic = "topic-string", clazz = String.class, serialization = Serialization.STRING)
62+
public void byteTopic(String stringMsg) {
63+
Assertions.assertNotNull(stringMsg);
64+
Assertions.assertEquals(PulsarJavaSpringBootStarterApplicationTests.VALIDATION_STRING, stringMsg);
65+
66+
stringTopicReceived.set(true);
67+
}
68+
4869
@PulsarConsumer(topic = "topic-async", clazz = MyMsg.class, serialization = Serialization.JSON)
4970
public void topicAsyncListener(MyMsg myMsg) {
5071
Assertions.assertNotNull(myMsg);
@@ -73,5 +94,11 @@ public void failTwice(MyMsg myMsg) throws Exception {
7394
}
7495
Assertions.assertNotNull(myMsg);
7596
mockRetryCountListenerReceived.set(true);
97+
98+
99+
}
100+
101+
public static Serialization aa() {
102+
return Serialization.BYTE;
76103
}
77104
}

src/test/java/io/github/majusko/pulsar/TestProducerConfiguration.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ public ProducerFactory producerFactory() {
2424
.addProducer("topic-retry", MyMsg.class)
2525
.addProducer("topic-string", String.class, Serialization.STRING)
2626
.addProducer("topic-byte")
27-
.addProducer("topic-proto", ProtoMsg.class, Serialization.PROTOBUF)
28-
.addProducer("topic-proto-native", ProtoMsg.class, Serialization.PROTOBUF_NATIVE);
27+
.addProducer("topic-proto", ProtoMsg.class, Serialization.PROTOBUF);
2928
}
3029
}

0 commit comments

Comments
 (0)