Skip to content

Commit 5aa97e4

Browse files
alesharikonobc
authored andcommitted
Add @PulsarTypeMapping for default topic/schema
This commit introduces the @PulsarTypeMapping annotation which can be used on message classes to specify default topic and/or schema info.
1 parent 0da4142 commit 5aa97e4

File tree

6 files changed

+111
-3
lines changed

6 files changed

+111
-3
lines changed

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/topic-resolution.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@ NOTE: The `message-type` is the fully-qualified name of the message class.
3434

3535
WARNING: If the message (or the first message of a `Publisher` input) is `null`, the framework won't be able to determine the topic from it. Another method shall be used to specify the topic if your application is likely to send `null` messages.
3636

37+
=== Specified via annotation
38+
When no topic passed into API and no mappings configured, the system looks for `PulsarTopic` annotation. The following example configures topic for `Baz` class using annotation:
39+
40+
[source,java,indent=0,subs="verbatim"]
41+
----
42+
@PulsarTopic("baz-topic")
43+
record Baz(String value) {
44+
}
45+
----
46+
3747
=== Custom topic resolver
3848
The preferred method of adding mappings is via the property mentioned above.
3949
However, if more control is needed you can replace the default resolver by proving your own implementation, for example:
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.annotation;
18+
19+
import org.apache.pulsar.common.schema.SchemaType;
20+
21+
import java.lang.annotation.Documented;
22+
import java.lang.annotation.ElementType;
23+
import java.lang.annotation.Retention;
24+
import java.lang.annotation.RetentionPolicy;
25+
import java.lang.annotation.Target;
26+
27+
/**
28+
* Specifies default topic and schema for class.
29+
*
30+
* @author Aleksei Arsenev
31+
*/
32+
@Target(ElementType.TYPE)
33+
@Retention(RetentionPolicy.RUNTIME)
34+
@Documented
35+
public @interface PulsarTypeMapping {
36+
37+
/**
38+
* Default topic for class.
39+
* @return topic
40+
*/
41+
String topic() default "";
42+
43+
/**
44+
* Default schema type for class.
45+
* @return schema type
46+
*/
47+
SchemaType schemaType() default SchemaType.NONE;
48+
49+
/**
50+
* Message key type (must be specified when schema type is {@code KEY_VALUE})
51+
* @return message key type
52+
*/
53+
Class<?> messageKeyType() default Void.class;
54+
55+
}

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,10 @@
3939
import org.apache.pulsar.common.schema.SchemaType;
4040

4141
import org.springframework.core.ResolvableType;
42+
import org.springframework.core.annotation.AnnotationUtils;
4243
import org.springframework.core.log.LogAccessor;
4344
import org.springframework.lang.Nullable;
45+
import org.springframework.pulsar.annotation.PulsarTypeMapping;
4446

4547
/**
4648
* Default schema resolver capable of handling basic message types.
@@ -138,6 +140,14 @@ public <T> Resolved<Schema<T>> resolveSchema(@Nullable Class<?> messageClass, bo
138140
@Nullable
139141
protected Schema<?> getCustomSchemaOrMaybeDefault(@Nullable Class<?> messageClass, boolean returnDefault) {
140142
Schema<?> schema = this.customSchemaMappings.get(messageClass);
143+
if (schema == null && messageClass != null) {
144+
PulsarTypeMapping annotation = AnnotationUtils.findAnnotation(messageClass, PulsarTypeMapping.class);
145+
if (annotation != null && annotation.schemaType() != SchemaType.NONE) {
146+
var resolvedSchema = resolveSchema(annotation.schemaType(), messageClass, annotation.messageKeyType());
147+
resolvedSchema.ifResolved(objectSchema -> addCustomSchemaMapping(messageClass, objectSchema));
148+
schema = resolvedSchema.get().orElse(null);
149+
}
150+
}
141151
if (schema == null && returnDefault) {
142152
if (messageClass != null) {
143153
try {

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultTopicResolver.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717
package org.springframework.pulsar.core;
1818

1919
import java.util.Collections;
20-
import java.util.LinkedHashMap;
2120
import java.util.Map;
21+
import java.util.concurrent.ConcurrentHashMap;
2222
import java.util.function.Supplier;
2323

24+
import org.springframework.core.annotation.AnnotationUtils;
2425
import org.springframework.lang.Nullable;
26+
import org.springframework.pulsar.annotation.PulsarTypeMapping;
2527
import org.springframework.util.StringUtils;
2628

2729
/**
@@ -35,7 +37,7 @@
3537
*/
3638
public class DefaultTopicResolver implements TopicResolver {
3739

38-
private final Map<Class<?>, String> customTopicMappings = new LinkedHashMap<>();
40+
private final Map<Class<?>, String> customTopicMappings = new ConcurrentHashMap<>();
3941

4042
/**
4143
* Adds a custom mapping from message type to topic.
@@ -100,7 +102,19 @@ protected Resolved<String> doResolveTopic(@Nullable String userSpecifiedTopic, @
100102
if (messageType == null) {
101103
return Resolved.failed("Topic must be specified when the message is null");
102104
}
103-
String topic = this.customTopicMappings.getOrDefault(messageType, defaultTopicSupplier.get());
105+
106+
String topic = this.getCustomTopicMappings().get(messageType);
107+
if (topic == null) {
108+
PulsarTypeMapping annotation = AnnotationUtils.findAnnotation(messageType, PulsarTypeMapping.class);
109+
if (annotation != null && !annotation.topic().isBlank()) {
110+
this.addCustomTopicMapping(messageType, annotation.topic());
111+
topic = annotation.topic();
112+
}
113+
}
114+
115+
if (topic == null) {
116+
topic = defaultTopicSupplier.get();
117+
}
104118
return topic == null ? Resolved.failed("Topic must be specified when no default topic is configured")
105119
: Resolved.of(topic);
106120
}

spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.junit.jupiter.params.provider.MethodSource;
5050

5151
import org.springframework.core.ResolvableType;
52+
import org.springframework.pulsar.annotation.PulsarTypeMapping;
5253
import org.springframework.pulsar.listener.Proto;
5354
import org.springframework.pulsar.listener.Proto.Person;
5455

@@ -201,6 +202,11 @@ void customMessageTypes() {
201202
assertThat(resolver.resolveSchema(Bar.class, true).orElseThrow()).isEqualTo(Schema.BYTES);
202203
}
203204

205+
206+
@Test
207+
void annotatedMessageType() {
208+
assertThat(resolver.resolveSchema(Zaz.class, false).orElseThrow()).isEqualTo(Schema.STRING);
209+
}
204210
}
205211

206212
@Nested
@@ -371,4 +377,8 @@ record Bar<T>(T value) {
371377
record Zaa(String value) {
372378
}
373379

380+
@PulsarTypeMapping(schemaType = SchemaType.STRING)
381+
record Zaz(String value) {
382+
}
383+
374384
}

spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultTopicResolverTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.junit.jupiter.params.provider.MethodSource;
3131

3232
import org.springframework.lang.Nullable;
33+
import org.springframework.pulsar.annotation.PulsarTypeMapping;
3334

3435
/**
3536
* Unit tests for {@link DefaultTopicResolver}.
@@ -44,6 +45,8 @@ class DefaultTopicResolverTests {
4445

4546
private static final String fooTopic = "foo-topic1";
4647

48+
private static final String bazTopic = "baz-topic1";
49+
4750
private static final String stringTopic = "string-topic1";
4851

4952
private DefaultTopicResolver resolver = new DefaultTopicResolver();
@@ -111,6 +114,8 @@ static Stream<Arguments> resolveByMessageTypeProvider() {
111114
arguments("complexMessageWithUserTopic", userTopic, Foo.class, defaultTopic, userTopic),
112115
arguments("complexMessageNoUserTopic", null, Foo.class, defaultTopic, fooTopic),
113116
arguments("nullMessageWithUserTopicAndDefault", userTopic, null, defaultTopic, userTopic),
117+
arguments("annotationMessageWithUserTopic", userTopic, Baz.class, defaultTopic, userTopic),
118+
arguments("annotationMessageNoUserTopic", null, Baz.class, defaultTopic, bazTopic),
114119
arguments("nullMessageWithDefault", null, null, defaultTopic, null),
115120
arguments("noMatchWithUserTopicAndDefault", userTopic, Bar.class, defaultTopic, userTopic),
116121
arguments("noMatchWithUserTopic", userTopic, Bar.class, null, userTopic),
@@ -166,4 +171,8 @@ record Foo(String value) {
166171
record Bar(String value) {
167172
}
168173

174+
@PulsarTypeMapping(topic = bazTopic)
175+
record Baz(String value) {
176+
}
177+
169178
}

0 commit comments

Comments
 (0)