Skip to content

Commit 69cd656

Browse files
authored
Merge pull request #49 from TrashCoder96/master
Avro serialization support
2 parents 30f4946 + e9bffd7 commit 69cd656

File tree

9 files changed

+66
-12
lines changed

9 files changed

+66
-12
lines changed

src/main/java/io/github/majusko/pulsar/PulsarAutoConfiguration.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import org.apache.pulsar.client.api.PulsarClient;
44
import org.apache.pulsar.client.api.PulsarClientException;
55
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
6-
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
76
import org.springframework.boot.context.properties.EnableConfigurationProperties;
87
import org.springframework.context.annotation.Bean;
98
import org.springframework.context.annotation.ComponentScan;
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.github.majusko.pulsar;
2+
3+
import io.github.majusko.pulsar.constant.Serialization;
4+
import io.github.majusko.pulsar.error.exception.ProducerInitException;
5+
import org.apache.pulsar.client.api.Schema;
6+
7+
public class PulsarSpringStarterUtils {
8+
9+
public static <T> Schema<?> getSchema(Serialization serialization, Class<T> clazz) throws RuntimeException {
10+
switch (serialization) {
11+
case JSON: {
12+
return Schema.JSON(clazz);
13+
}
14+
case AVRO: {
15+
return Schema.AVRO(clazz);
16+
}
17+
default: {
18+
throw new ProducerInitException("Unknown producer schema.");
19+
}
20+
}
21+
}
22+
23+
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.majusko.pulsar.constant;
22

33
public enum Serialization {
4-
JSON
4+
JSON,
5+
AVRO
56
}

src/main/java/io/github/majusko/pulsar/consumer/ConsumerBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package io.github.majusko.pulsar.consumer;
22

3+
import io.github.majusko.pulsar.PulsarSpringStarterUtils;
34
import io.github.majusko.pulsar.collector.ConsumerCollector;
45
import io.github.majusko.pulsar.collector.ConsumerHolder;
56
import io.github.majusko.pulsar.error.FailedMessage;
67
import io.github.majusko.pulsar.error.exception.ConsumerInitException;
78
import org.apache.pulsar.client.api.Consumer;
89
import org.apache.pulsar.client.api.PulsarClient;
910
import org.apache.pulsar.client.api.PulsarClientException;
10-
import org.apache.pulsar.client.api.Schema;
1111
import org.springframework.context.EmbeddedValueResolverAware;
1212
import org.springframework.context.annotation.DependsOn;
1313
import org.springframework.stereotype.Component;
@@ -46,7 +46,7 @@ private void init() {
4646
private Consumer<?> subscribe(String name, ConsumerHolder holder) {
4747
try {
4848
return pulsarClient
49-
.newConsumer(Schema.JSON(holder.getAnnotation().clazz()))
49+
.newConsumer(PulsarSpringStarterUtils.getSchema(holder.getAnnotation().serialization(), holder.getAnnotation().clazz()))
5050
.consumerName("consumer-" + name)
5151
.subscriptionName("subscription-" + name)
5252
.topic(stringValueResolver.resolveStringValue(holder.getAnnotation().topic()))

src/main/java/io/github/majusko/pulsar/producer/ProducerCollector.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package io.github.majusko.pulsar.producer;
22

3+
import io.github.majusko.pulsar.PulsarSpringStarterUtils;
34
import io.github.majusko.pulsar.annotation.PulsarProducer;
45
import io.github.majusko.pulsar.collector.ProducerHolder;
5-
import io.github.majusko.pulsar.constant.Serialization;
66
import io.github.majusko.pulsar.error.exception.ProducerInitException;
77
import org.apache.pulsar.client.api.Producer;
88
import org.apache.pulsar.client.api.PulsarClient;
@@ -55,10 +55,7 @@ private Producer<?> buildProducer(ProducerHolder holder) {
5555
}
5656

5757
private <T> Schema<?> getSchema(ProducerHolder holder) throws RuntimeException {
58-
if (holder.getSerialization().equals(Serialization.JSON)) {
59-
return Schema.JSON(holder.getClazz());
60-
}
61-
throw new ProducerInitException("Unknown producer schema.");
58+
return PulsarSpringStarterUtils.getSchema(holder.getSerialization(), holder.getClazz());
6259
}
6360

6461
Map<String, Producer> getProducers() {
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.github.majusko.pulsar;
2+
3+
public class AvroMsg {
4+
5+
private String data;
6+
7+
public String getData() {
8+
return data;
9+
}
10+
11+
public void setData(String data) {
12+
this.data = data;
13+
}
14+
}

src/test/java/io/github/majusko/pulsar/PulsarJavaSpringBootStarterApplicationTests.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.testcontainers.junit.jupiter.Container;
2121
import org.testcontainers.junit.jupiter.Testcontainers;
2222

23+
import java.time.Duration;
2324
import java.util.HashSet;
2425
import java.util.List;
2526
import java.util.Map;
@@ -48,6 +49,9 @@ class PulsarJavaSpringBootStarterApplicationTests {
4849
@Autowired
4950
private PulsarTemplate<String> producerForError;
5051

52+
@Autowired
53+
private PulsarTemplate<AvroMsg> producerForAvroTopic;
54+
5155
@Container
5256
static PulsarContainer pulsarContainer = new PulsarContainer();
5357

@@ -70,7 +74,7 @@ void testProducerSendMethod() throws PulsarClientException {
7074
void testConsumerRegistration1() throws Exception {
7175
final List<Consumer> consumers = consumerBuilder.getConsumers();
7276

73-
Assertions.assertEquals(2, consumers.size());
77+
Assertions.assertEquals(3, consumers.size());
7478

7579
final Consumer<?> consumer = consumers.stream().filter( $-> $.getTopic().equals("topic-one")).findFirst().orElseThrow(Exception::new);
7680

@@ -94,7 +98,7 @@ void testProducerRegistration() {
9498

9599
final Map<String, ImmutablePair<Class<?>, Serialization>> topics = producerFactory.getTopics();
96100

97-
Assertions.assertEquals(3, topics.size());
101+
Assertions.assertEquals(4, topics.size());
98102

99103
final Set<String> topicNames = new HashSet<>(topics.keySet());
100104

@@ -119,4 +123,12 @@ void testMessageErrorHandling() throws PulsarClientException {
119123

120124
await().untilTrue(receivedError);
121125
}
126+
127+
@Test
128+
void avroSerializationTestOk() throws Exception {
129+
AvroMsg testAvroMsg = new AvroMsg();
130+
testAvroMsg.setData("avro-test");
131+
producerForAvroTopic.send("topic-avro", testAvroMsg);
132+
await().atMost(Duration.ofSeconds(10)).until(() -> testConsumers.avroTopicReceived.get());
133+
}
122134
}

src/test/java/io/github/majusko/pulsar/TestConsumers.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
public class TestConsumers {
1212

1313
public AtomicBoolean mockTopicListenerReceived = new AtomicBoolean(false);
14+
public AtomicBoolean avroTopicReceived = new AtomicBoolean(false);
1415

1516
@PulsarConsumer(topic = "topic-one", clazz = MyMsg.class, serialization = Serialization.JSON)
1617
public void topicOneListener(MyMsg myMsg) {
@@ -21,4 +22,10 @@ public void topicOneListener(MyMsg myMsg) {
2122
@PulsarConsumer(topic = "topic-for-error", clazz = String.class, serialization = Serialization.JSON)
2223
public void topicForErrorListener(Integer myMsg) {
2324
}
25+
26+
@PulsarConsumer(topic = "topic-avro", clazz = AvroMsg.class, serialization = Serialization.AVRO)
27+
public void avroTopic(AvroMsg avroMsg) {
28+
Assertions.assertNotNull(avroMsg);
29+
avroTopicReceived.set(true);
30+
}
2431
}

src/test/java/io/github/majusko/pulsar/TestProducerConfiguration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public ProducerFactory producerFactory() {
1313
return new ProducerFactory()
1414
.addProducer("topic-for-error", String.class)
1515
.addProducer("topic-one", MyMsg.class)
16-
.addProducer("topic-two", MyMsg2.class, Serialization.JSON);
16+
.addProducer("topic-two", MyMsg2.class, Serialization.JSON)
17+
.addProducer("topic-avro", AvroMsg.class, Serialization.AVRO);
1718
}
1819
}

0 commit comments

Comments
 (0)