Skip to content

Commit 023d366

Browse files
authored
Merge pull request #79 from majusko/feature/string-byte-proto-support
Feature/string byte proto support
2 parents 07c77ec + 4064ce7 commit 023d366

File tree

15 files changed

+734
-21
lines changed

15 files changed

+734
-21
lines changed

src/main/java/io/github/majusko/pulsar/PulsarSpringStarterUtils.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.majusko.pulsar;
22

3+
import com.google.protobuf.GeneratedMessageV3;
34
import io.github.majusko.pulsar.constant.Serialization;
45
import io.github.majusko.pulsar.error.exception.ProducerInitException;
56
import org.apache.pulsar.client.api.Schema;
@@ -8,20 +9,49 @@
89

910
public class PulsarSpringStarterUtils {
1011

11-
public static <T> Schema<?> getSchema(Serialization serialization, Class<T> clazz) throws RuntimeException {
12+
private PulsarSpringStarterUtils() {
13+
}
14+
15+
private static <T> Schema<?> getGenericSchema(Serialization serialization, Class<T> clazz) throws RuntimeException {
1216
switch (serialization) {
1317
case JSON: {
1418
return Schema.JSON(clazz);
1519
}
1620
case AVRO: {
1721
return Schema.AVRO(clazz);
1822
}
23+
case STRING: {
24+
return Schema.STRING;
25+
}
1926
default: {
2027
throw new ProducerInitException("Unknown producer schema.");
2128
}
2229
}
2330
}
2431

32+
private static <T extends com.google.protobuf.GeneratedMessageV3> Schema<?> getProtoSchema(Serialization serialization, Class<T> clazz) throws RuntimeException {
33+
if (serialization == Serialization.PROTOBUF) {
34+
return Schema.PROTOBUF(clazz);
35+
}
36+
throw new ProducerInitException("Unknown producer schema.");
37+
}
38+
39+
public static Schema<?> getSchema(Serialization serialisation, Class<?> clazz) {
40+
if (clazz == byte[].class) {
41+
return Schema.BYTES;
42+
}
43+
44+
if (isProto(serialisation)) {
45+
return getProtoSchema(serialisation, (Class<? extends GeneratedMessageV3>) clazz);
46+
}
47+
48+
return getGenericSchema(serialisation, clazz);
49+
}
50+
51+
public static boolean isProto(Serialization serialization) {
52+
return serialization == Serialization.PROTOBUF;
53+
}
54+
2555
public static Class<?> getParameterType(Method method) {
2656
return method.getParameterTypes()[0];
2757
}

src/main/java/io/github/majusko/pulsar/annotation/PulsarConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
public @interface PulsarConsumer {
1414
String topic();
1515

16-
Class<?> clazz();
16+
Class<?> clazz() default byte[].class;
1717

1818
Serialization serialization() default Serialization.JSON;
1919

src/main/java/io/github/majusko/pulsar/collector/ConsumerCollector.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package io.github.majusko.pulsar.collector;
22

3-
import io.github.majusko.pulsar.PulsarSpringStarterUtils;
43
import io.github.majusko.pulsar.annotation.PulsarConsumer;
54
import org.springframework.beans.factory.annotation.Value;
65
import org.springframework.beans.factory.config.BeanPostProcessor;
76
import org.springframework.context.annotation.Configuration;
87

8+
import java.lang.reflect.Method;
9+
import java.lang.reflect.Type;
910
import java.util.Arrays;
1011
import java.util.Map;
1112
import java.util.Optional;
@@ -29,8 +30,9 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) {
2930
consumers.putAll(Arrays.stream(beanClass.getDeclaredMethods())
3031
.filter($ -> $.isAnnotationPresent(PulsarConsumer.class))
3132
.collect(Collectors.toMap(
32-
method -> beanClass.getName() + consumerNameDelimiter + method.getName(),
33-
method -> new ConsumerHolder(method.getAnnotation(PulsarConsumer.class), method, bean, getParameterType(method)))));
33+
method -> getConsumerName(beanClass, method),
34+
method -> new ConsumerHolder(method.getAnnotation(PulsarConsumer.class), method, bean,
35+
getParameterType(method)))));
3436

3537
return bean;
3638
}
@@ -47,4 +49,11 @@ public Map<String, ConsumerHolder> getConsumers() {
4749
public Optional<ConsumerHolder> getConsumer(String methodDescriptor) {
4850
return Optional.ofNullable(consumers.get(methodDescriptor));
4951
}
52+
53+
public String getConsumerName(Class<?> clazz, Method method) {
54+
return clazz.getName() + consumerNameDelimiter + method.getName() + Arrays
55+
.stream(method.getGenericParameterTypes())
56+
.map(Type::getTypeName)
57+
.collect(Collectors.joining(consumerNameDelimiter));
58+
}
5059
}

src/main/java/io/github/majusko/pulsar/constant/Serialization.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,8 @@
22

33
public enum Serialization {
44
JSON,
5-
AVRO
5+
AVRO,
6+
STRING,
7+
BYTE,
8+
PROTOBUF
69
}

src/main/java/io/github/majusko/pulsar/consumer/ConsumerAggregator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package io.github.majusko.pulsar.consumer;
22

3+
import com.google.protobuf.GeneratedMessageV3;
34
import io.github.majusko.pulsar.ConsumerProperties;
45
import io.github.majusko.pulsar.PulsarMessage;
56
import io.github.majusko.pulsar.PulsarSpringStarterUtils;
67
import io.github.majusko.pulsar.collector.ConsumerCollector;
78
import io.github.majusko.pulsar.collector.ConsumerHolder;
9+
import io.github.majusko.pulsar.constant.Serialization;
810
import io.github.majusko.pulsar.error.FailedMessage;
911
import io.github.majusko.pulsar.error.exception.ConsumerInitException;
1012
import org.apache.pulsar.client.api.*;

src/main/java/io/github/majusko/pulsar/producer/ProducerFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ public class ProducerFactory implements PulsarProducerFactory {
1212

1313
private final Map<String, ImmutablePair<Class<?>, Serialization>> topics = new HashMap<>();
1414

15+
public ProducerFactory addProducer(String topic) {
16+
return addProducer(topic, byte[].class, Serialization.BYTE);
17+
}
18+
1519
public ProducerFactory addProducer(String topic, Class<?> clazz) {
1620
topics.put(topic, new ImmutablePair<>(clazz, Serialization.JSON));
1721
return this;

src/test/java/io/github/majusko/pulsar/PulsarJavaSpringBootStarterApplicationTests.java

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
import io.github.majusko.pulsar.collector.ConsumerHolder;
55
import io.github.majusko.pulsar.constant.Serialization;
66
import io.github.majusko.pulsar.consumer.ConsumerAggregator;
7+
import io.github.majusko.pulsar.msg.AvroMsg;
8+
import io.github.majusko.pulsar.msg.MyMsg;
9+
import io.github.majusko.pulsar.msg.ProtoMsg;
710
import io.github.majusko.pulsar.producer.ProducerFactory;
811
import io.github.majusko.pulsar.producer.PulsarTemplate;
912
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -22,6 +25,7 @@
2225
import org.testcontainers.junit.jupiter.Testcontainers;
2326
import reactor.core.Disposable;
2427

28+
import java.nio.charset.StandardCharsets;
2529
import java.time.Duration;
2630
import java.util.*;
2731
import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,12 +57,23 @@ class PulsarJavaSpringBootStarterApplicationTests {
5357
@Autowired
5458
private PulsarTemplate<AvroMsg> producerForAvroTopic;
5559

60+
@Autowired
61+
private PulsarTemplate<ProtoMsg> producerForProtoTopic;
62+
63+
@Autowired
64+
private PulsarTemplate<byte[]> producerForByteTopic;
65+
66+
@Autowired
67+
private PulsarTemplate<String> producerForStringTopic;
68+
5669
@Container
5770
static PulsarContainer pulsarContainer = new PulsarContainer();
5871

5972
@Autowired
6073
private TestConsumers testConsumers;
6174

75+
public static final String VALIDATION_STRING = "validation-string";
76+
6277
@DynamicPropertySource
6378
static void propertySettings(DynamicPropertyRegistry registry) {
6479
registry.add("pulsar.serviceUrl", pulsarContainer::getPulsarBrokerUrl);
@@ -93,11 +108,11 @@ void testProducerSendAsyncMethod() throws PulsarClientException {
93108
@Test
94109
void testProducerCreateMessageMethod() throws PulsarClientException {
95110
producer.createMessage("topic-message", new MyMsg("my-message"))
96-
.property("my-key", "my-value")
97-
.property("my-other-key", "my-other-value")
98-
.sequenceId(123l)
99-
.key("my-key")
100-
.send();
111+
.property("my-key", "my-value")
112+
.property("my-other-key", "my-other-value")
113+
.sequenceId(123l)
114+
.key("my-key")
115+
.send();
101116

102117
await().untilTrue(testConsumers.mockTopicMessageListenerReceived);
103118
}
@@ -106,9 +121,10 @@ void testProducerCreateMessageMethod() throws PulsarClientException {
106121
void testConsumerRegistration1() throws Exception {
107122
final List<Consumer> consumers = consumerAggregator.getConsumers();
108123

109-
Assertions.assertEquals(6, consumers.size());
124+
Assertions.assertEquals(9, consumers.size());
110125

111-
final Consumer<?> consumer = consumers.stream().filter( $-> $.getTopic().equals("topic-one")).findFirst().orElseThrow(Exception::new);
126+
final Consumer<?> consumer =
127+
consumers.stream().filter($ -> $.getTopic().equals("topic-one")).findFirst().orElseThrow(Exception::new);
112128

113129
Assertions.assertNotNull(consumer);
114130
}
@@ -117,22 +133,26 @@ void testConsumerRegistration1() throws Exception {
117133
void testConsumerRegistration2() {
118134
final Class<TestConsumers> clazz = TestConsumers.class;
119135
final List<ConsumerHolder> consumerHolders = Arrays.stream(clazz.getMethods())
120-
.map($ -> consumerCollector.getConsumer(clazz.getName() + $.getName()).orElse(null))
136+
.map($ -> consumerCollector.getConsumer(consumerCollector.getConsumerName(clazz, $)))
137+
.filter(Optional::isPresent)
138+
.map(Optional::get)
121139
.collect(Collectors.toList());
122140

123141
Assertions.assertNotNull(consumerHolders);
124142
Assertions.assertTrue(consumerHolders.stream().anyMatch($ -> $.getAnnotation().topic().equals("topic-one")));
125-
Assertions.assertTrue(consumerHolders.stream().anyMatch($ -> $.getAnnotation().topic().equals("topic-for-error")));
143+
Assertions.assertTrue(consumerHolders.stream().anyMatch($ -> $.getAnnotation().topic().equals("topic-for" +
144+
"-error")));
126145
Assertions.assertTrue(consumerHolders.stream().anyMatch($ -> $.getBean().getClass().equals(TestConsumers.class)));
127-
Assertions.assertTrue(consumerHolders.stream().anyMatch($ -> $.getHandler().getName().equals("topicOneListener")));
146+
Assertions.assertTrue(consumerHolders.stream().anyMatch($ -> $.getHandler().getName().equals(
147+
"topicOneListener")));
128148
}
129149

130150
@Test
131151
void testProducerRegistration() {
132152

133153
final Map<String, ImmutablePair<Class<?>, Serialization>> topics = producerFactory.getTopics();
134154

135-
Assertions.assertEquals(7, topics.size());
155+
Assertions.assertEquals(10, topics.size());
136156

137157
final Set<String> topicNames = new HashSet<>(topics.keySet());
138158

@@ -166,4 +186,25 @@ void avroSerializationTestOk() throws Exception {
166186
producerForAvroTopic.send("topic-avro", testAvroMsg);
167187
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.avroTopicReceived.get());
168188
}
189+
190+
@Test
191+
void protoSerializationTestOk() throws Exception {
192+
final ProtoMsg msg = ProtoMsg.newBuilder().setData(VALIDATION_STRING).build();
193+
producerForProtoTopic.send("topic-proto", msg);
194+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.protoTopicReceived.get());
195+
}
196+
197+
@Test
198+
void byteSerializationTestOk() throws Exception {
199+
byte[] data = VALIDATION_STRING.getBytes(StandardCharsets.UTF_8);
200+
201+
producerForByteTopic.send("topic-byte", data);
202+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.byteTopicReceived.get());
203+
}
204+
205+
@Test
206+
void stringSerializationTestOk() throws Exception {
207+
producerForStringTopic.send("topic-string", VALIDATION_STRING);
208+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.stringTopicReceived.get());
209+
}
169210
}

src/test/java/io/github/majusko/pulsar/TestConsumers.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22

33
import io.github.majusko.pulsar.annotation.PulsarConsumer;
44
import io.github.majusko.pulsar.constant.Serialization;
5+
import io.github.majusko.pulsar.msg.AvroMsg;
6+
import io.github.majusko.pulsar.msg.MyMsg;
7+
import io.github.majusko.pulsar.msg.ProtoMsg;
58
import org.apache.pulsar.client.api.SubscriptionType;
69
import org.junit.jupiter.api.Assertions;
710
import org.springframework.stereotype.Service;
811

12+
import java.nio.charset.StandardCharsets;
913
import java.util.concurrent.atomic.AtomicBoolean;
1014
import java.util.concurrent.atomic.AtomicInteger;
1115

@@ -16,6 +20,9 @@ public class TestConsumers {
1620
public AtomicBoolean mockTopicAsyncListenerReceived = new AtomicBoolean(false);
1721
public AtomicBoolean mockTopicMessageListenerReceived = new AtomicBoolean(false);
1822
public AtomicBoolean avroTopicReceived = new AtomicBoolean(false);
23+
public AtomicBoolean protoTopicReceived = new AtomicBoolean(false);
24+
public AtomicBoolean byteTopicReceived = new AtomicBoolean(false);
25+
public AtomicBoolean stringTopicReceived = new AtomicBoolean(false);
1926
public AtomicBoolean mockRetryCountListenerReceived = new AtomicBoolean(false);
2027
public AtomicInteger retryCount = new AtomicInteger(0);
2128

@@ -35,6 +42,30 @@ public void avroTopic(AvroMsg avroMsg) {
3542
avroTopicReceived.set(true);
3643
}
3744

45+
@PulsarConsumer(topic = "topic-proto", clazz = ProtoMsg.class, serialization = Serialization.PROTOBUF)
46+
public void protoTopic(ProtoMsg protoMsg) {
47+
Assertions.assertNotNull(protoMsg);
48+
Assertions.assertEquals(PulsarJavaSpringBootStarterApplicationTests.VALIDATION_STRING, protoMsg.getData());
49+
50+
protoTopicReceived.set(true);
51+
}
52+
53+
@PulsarConsumer(topic = "topic-byte")
54+
public void byteTopic(byte[] byteMsg) {
55+
Assertions.assertNotNull(byteMsg);
56+
Assertions.assertEquals(PulsarJavaSpringBootStarterApplicationTests.VALIDATION_STRING, new String(byteMsg, StandardCharsets.UTF_8));
57+
58+
byteTopicReceived.set(true);
59+
}
60+
61+
@PulsarConsumer(topic = "topic-string", clazz = String.class, serialization = Serialization.STRING)
62+
public void byteTopic(String stringMsg) {
63+
Assertions.assertNotNull(stringMsg);
64+
Assertions.assertEquals(PulsarJavaSpringBootStarterApplicationTests.VALIDATION_STRING, stringMsg);
65+
66+
stringTopicReceived.set(true);
67+
}
68+
3869
@PulsarConsumer(topic = "topic-async", clazz = MyMsg.class, serialization = Serialization.JSON)
3970
public void topicAsyncListener(MyMsg myMsg) {
4071
Assertions.assertNotNull(myMsg);
@@ -63,5 +94,11 @@ public void failTwice(MyMsg myMsg) throws Exception {
6394
}
6495
Assertions.assertNotNull(myMsg);
6596
mockRetryCountListenerReceived.set(true);
97+
98+
99+
}
100+
101+
public static Serialization aa() {
102+
return Serialization.BYTE;
66103
}
67104
}

src/test/java/io/github/majusko/pulsar/TestProducerConfiguration.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package io.github.majusko.pulsar;
22

33
import io.github.majusko.pulsar.constant.Serialization;
4+
import io.github.majusko.pulsar.msg.AvroMsg;
5+
import io.github.majusko.pulsar.msg.MyMsg;
6+
import io.github.majusko.pulsar.msg.MyMsg2;
7+
import io.github.majusko.pulsar.msg.ProtoMsg;
48
import io.github.majusko.pulsar.producer.ProducerFactory;
59
import org.springframework.context.annotation.Bean;
610
import org.springframework.context.annotation.Configuration;
@@ -17,6 +21,9 @@ public ProducerFactory producerFactory() {
1721
.addProducer("topic-avro", AvroMsg.class, Serialization.AVRO)
1822
.addProducer("topic-async", MyMsg.class)
1923
.addProducer("topic-message", MyMsg.class)
20-
.addProducer("topic-retry", MyMsg.class);
24+
.addProducer("topic-retry", MyMsg.class)
25+
.addProducer("topic-string", String.class, Serialization.STRING)
26+
.addProducer("topic-byte")
27+
.addProducer("topic-proto", ProtoMsg.class, Serialization.PROTOBUF);
2128
}
2229
}

src/test/java/io/github/majusko/pulsar/AvroMsg.java renamed to src/test/java/io/github/majusko/pulsar/msg/AvroMsg.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package io.github.majusko.pulsar;
1+
package io.github.majusko.pulsar.msg;
22

33
public class AvroMsg {
44

0 commit comments

Comments
 (0)