Skip to content

Commit 0e95f3c

Browse files
authored
Add support for AUTO_PRODUCE schema (#572)
This commit adds support for producing raw JSON or Avro payloads with AUTO_SCHEMA.
1 parent ca44f0b commit 0e95f3c

File tree

8 files changed

+161
-37
lines changed

8 files changed

+161
-37
lines changed

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/attributes/attributes-variables.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
:spring-cloud-stream-docs: https://docs.spring.io/spring-cloud-stream/docs/{spring-cloud-stream-version}/reference/html/
1515
:spring-cloud-function: https://spring.io/projects/spring-cloud-function
1616

17-
:apache-pulsar-docs: https://pulsar.apache.org/docs/3.1.x
17+
:apache-pulsar-docs: https://pulsar.apache.org/docs/3.2.x
1818
:apache-pulsar-cient-docs: {apache-pulsar-docs}/client-libraries-java
1919
:apache-pulsar-io-docs: {apache-pulsar-docs}/io-connectors
2020
:apache-pulsar-function-docs: {apache-pulsar-docs}/functions-overview
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
[source,java,subs="attributes,verbatim"]
2+
----
3+
void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
4+
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
5+
}
6+
----
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
[source,java,subs="attributes,verbatim"]
2+
----
3+
void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {
4+
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
5+
}
6+
----
Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
2+
include::../../attributes/attributes-variables.adoc[]
3+
14
== Specifying Schema Information
25
If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data.
36
For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the `{template-class}`, the Spring for Apache Pulsar framework will try to build a `Schema.JSON` from the type.
47

5-
IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding.
8+
IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_PRODUCE_BYTES, and KEY_VALUE w/ INLINE encoding.
69

710
=== Custom Schema Mapping
811
As an alternative to specifying the schema when invoking send operations on the `{template-class}` for complex types, the schema resolver can be configured with mappings for the types.
@@ -11,3 +14,14 @@ This removes the need to specify the schema as the framework consults the resolv
1114
include::custom-schema-mapping.adoc[]
1215

1316
With this configuration in place, there is no need to set specify the schema on send operations.
17+
18+
=== Producing with AUTO_SCHEMA
19+
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use an {apache-pulsar-docs}/schema-get-started/#auto_produce[AUTO_PRODUCE] schema to publish a raw JSON or Avro payload as a `byte[]` safely.
20+
21+
In this case, the producer validates whether the outbound bytes are compatible with the schema of the destination topic.
22+
23+
Simply specify a schema of `Schema.AUTO_PRODUCE_BYTES()` on your template send operations as shown in the example below:
24+
25+
include::{template-class}/template-snippet.adoc[]
26+
27+
NOTE: This is only supported with Avro and JSON schema types.

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,6 +32,7 @@
3232
import org.apache.pulsar.client.api.PulsarClient;
3333
import org.apache.pulsar.client.api.PulsarClientException;
3434
import org.apache.pulsar.client.api.Schema;
35+
import org.apache.pulsar.client.api.SchemaSerializationException;
3536
import org.apache.pulsar.reactive.client.api.MessageSpec;
3637
import org.assertj.core.api.InstanceOfAssertFactories;
3738
import org.junit.jupiter.api.AfterEach;
@@ -47,9 +48,12 @@
4748
import org.springframework.pulsar.core.DefaultSchemaResolver;
4849
import org.springframework.pulsar.core.DefaultTopicResolver;
4950
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
51+
import org.springframework.pulsar.test.support.model.UserRecord;
5052
import org.springframework.util.function.ThrowingConsumer;
5153

54+
import com.fasterxml.jackson.databind.ObjectMapper;
5255
import reactor.core.publisher.Flux;
56+
import reactor.test.StepVerifier;
5357

5458
/**
5559
* Tests for {@link ReactivePulsarTemplate}.
@@ -157,8 +161,8 @@ void sendMessageWithMessageCustomizer() throws Exception {
157161
.withMessageCustomizer((mb) -> mb.key("test-key"))
158162
.send()
159163
.subscribe();
160-
Message<String> msg = sendAndConsume(sendFunction, "sendMessageWithMessageCustomizer", Schema.STRING,
161-
"test-message", true);
164+
Message<?> msg = sendAndConsume(sendFunction, "sendMessageWithMessageCustomizer", Schema.STRING, "test-message",
165+
true);
162166
assertThat(msg.getKey()).isEqualTo("test-key");
163167
}
164168

@@ -168,15 +172,15 @@ void sendMessageWithSenderCustomizer() throws Exception {
168172
.withSenderCustomizer((sb) -> sb.producerName("test-producer"))
169173
.send()
170174
.subscribe();
171-
Message<String> msg = sendAndConsume(sendFunction, "sendMessageWithSenderCustomizer", Schema.STRING,
172-
"test-message", true);
175+
Message<?> msg = sendAndConsume(sendFunction, "sendMessageWithSenderCustomizer", Schema.STRING, "test-message",
176+
true);
173177
assertThat(msg.getProducerName()).isEqualTo("test-producer");
174178
}
175179

176180
@ParameterizedTest
177181
@ValueSource(booleans = { true, false })
178182
void sendMessageWithTopicInferredByTypeMappings(boolean producerFactoryHasDefaultTopic) throws Exception {
179-
String topic = "ptt-topicInferred-" + producerFactoryHasDefaultTopic + "-topic";
183+
String topic = "rptt-topicInferred-" + producerFactoryHasDefaultTopic + "-topic";
180184
ReactivePulsarSenderFactory<Foo> producerFactory = DefaultReactivePulsarSenderFactory.<Foo>builderFor(client)
181185
.withDefaultTopic(producerFactoryHasDefaultTopic ? "fake-topic" : null)
182186
.build();
@@ -200,25 +204,25 @@ void sendMessageWithoutTopicFails() {
200204
.withMessage("Topic must be specified when no default topic is configured");
201205
}
202206

203-
private <T> Message<T> sendAndConsume(Consumer<ReactivePulsarTemplate<T>> sendFunction, String topic,
204-
Schema<T> schema, @Nullable T expectedValue, Boolean withDefaultTopic) throws Exception {
207+
private <T, V> Message<?> sendAndConsume(Consumer<ReactivePulsarTemplate<T>> sendFunction, String topic,
208+
Schema<V> schema, @Nullable V expectedValue, Boolean withDefaultTopic) throws Exception {
205209
ReactivePulsarSenderFactory<T> senderFactory = DefaultReactivePulsarSenderFactory.<T>builderFor(client)
206210
.withDefaultTopic(withDefaultTopic ? topic : null)
207211
.build();
208212
ReactivePulsarTemplate<T> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory);
209213
return sendAndConsume(pulsarTemplate, sendFunction, topic, schema, expectedValue);
210214
}
211215

212-
private <T> Message<T> sendAndConsume(ReactivePulsarTemplate<T> template,
213-
Consumer<ReactivePulsarTemplate<T>> sendFunction, String topic, Schema<T> schema, @Nullable T expectedValue)
216+
private <T, V> Message<?> sendAndConsume(ReactivePulsarTemplate<T> template,
217+
Consumer<ReactivePulsarTemplate<T>> sendFunction, String topic, Schema<V> schema, @Nullable V expectedValue)
214218
throws Exception {
215-
try (org.apache.pulsar.client.api.Consumer<T> consumer = client.newConsumer(schema)
219+
try (org.apache.pulsar.client.api.Consumer<V> consumer = client.newConsumer(schema)
216220
.topic(topic)
217221
.subscriptionName(topic + "-sub")
218222
.subscribe()) {
219223
sendFunction.accept(template);
220-
221-
Message<T> msg = consumer.receive(3, TimeUnit.SECONDS);
224+
Message<?> msg = consumer.receive(3, TimeUnit.SECONDS);
225+
consumer.acknowledge(msg);
222226
assertThat(msg).isNotNull();
223227
assertThat(msg.getValue()).isEqualTo(expectedValue);
224228
return msg;
@@ -230,7 +234,7 @@ class SendNonPrimitiveSchemaTests {
230234

231235
@Test
232236
void withSpecifiedSchema() throws Exception {
233-
String topic = "ptt-specificSchema-topic";
237+
String topic = "rptt-specificSchema-topic";
234238
Foo foo = new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID());
235239
ThrowingConsumer<ReactivePulsarTemplate<Foo>> sendFunction = (
236240
template) -> template.send(foo, Schema.AVRO(Foo.class)).subscribe();
@@ -239,15 +243,15 @@ void withSpecifiedSchema() throws Exception {
239243

240244
@Test
241245
void withSchemaInferredByMessageType() throws Exception {
242-
String topic = "ptt-nospecificSchema-topic";
246+
String topic = "rptt-nospecificSchema-topic";
243247
Foo foo = new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID());
244248
ThrowingConsumer<ReactivePulsarTemplate<Foo>> sendFunction = (template) -> template.send(foo).subscribe();
245249
sendAndConsume(sendFunction, topic, Schema.JSON(Foo.class), foo, true);
246250
}
247251

248252
@Test
249253
void withSchemaInferredByTypeMappings() throws Exception {
250-
String topic = "ptt-schemaInferred-topic";
254+
String topic = "rptt-schemaInferred-topic";
251255
ReactivePulsarSenderFactory<Foo> producerFactory = DefaultReactivePulsarSenderFactory
252256
.<Foo>builderFor(client)
253257
.withDefaultTopic(topic)
@@ -293,6 +297,41 @@ void sendNullWithoutSchemaFails() {
293297

294298
}
295299

300+
@Nested
301+
class SendAutoProduceSchemaTests {
302+
303+
@Test
304+
void withJsonSchema() throws Exception {
305+
var topic = "rptt-auto-json-topic";
306+
307+
// First send to the topic as JSON to establish the schema for the topic
308+
var userJsonSchema = Schema.JSON(UserRecord.class);
309+
var user = new UserRecord("Jason", 5150);
310+
ThrowingConsumer<ReactivePulsarTemplate<UserRecord>> sendAsUserFunction = (
311+
template) -> template.send(user, userJsonSchema).subscribe();
312+
sendAndConsume(sendAsUserFunction, topic, userJsonSchema, user, true);
313+
314+
// Next send another user using byte[] with AUTO_PRODUCE - it should be
315+
// consumed fine
316+
var user2 = new UserRecord("Who", 6160);
317+
var user2Bytes = new ObjectMapper().writeValueAsBytes(user2);
318+
ThrowingConsumer<ReactivePulsarTemplate<byte[]>> sendAsBytesFunction = (
319+
template) -> template.send(user2Bytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
320+
sendAndConsume(sendAsBytesFunction, topic, userJsonSchema, user2, true);
321+
322+
// Finally send another user using byte[] with AUTO_PRODUCE w/ invalid payload
323+
// - it should be rejected
324+
var bytesSenderFactory = DefaultReactivePulsarSenderFactory.<byte[]>builderFor(client)
325+
.withDefaultTopic(topic)
326+
.build();
327+
var bytesTemplate = new ReactivePulsarTemplate<>(bytesSenderFactory);
328+
329+
StepVerifier.create(bytesTemplate.send("invalid-payload".getBytes(), Schema.AUTO_PRODUCE_BYTES()))
330+
.expectError(SchemaSerializationException.class);
331+
}
332+
333+
}
334+
296335
public static class Foo {
297336

298337
private String foo;

spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import org.apache.pulsar.client.api.Schema;
3535
import org.apache.pulsar.client.api.TypedMessageBuilder;
3636
import org.apache.pulsar.client.api.transaction.Transaction;
37+
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
3738
import org.apache.pulsar.common.protocol.schema.SchemaHash;
39+
import org.apache.pulsar.common.schema.SchemaType;
3840

3941
import org.springframework.beans.factory.DisposableBean;
4042
import org.springframework.core.log.LogAccessor;
@@ -168,6 +170,8 @@ private void closeProducer(Producer<T> producer, boolean async) {
168170
*/
169171
static class ProducerCacheKey<T> {
170172

173+
private static final SchemaHash AUTO_PRODUCE_SCHEMA_HASH = SchemaHash.of(new byte[0], SchemaType.AUTO_PUBLISH);
174+
171175
private final Schema<T> schema;
172176

173177
private final SchemaHash schemaHash;
@@ -193,7 +197,8 @@ static class ProducerCacheKey<T> {
193197
Assert.notNull(schema, () -> "'schema' must be non-null");
194198
Assert.notNull(topic, () -> "'topic' must be non-null");
195199
this.schema = schema;
196-
this.schemaHash = SchemaHash.of(this.schema);
200+
this.schemaHash = (schema instanceof AutoProduceBytesSchema) ? AUTO_PRODUCE_SCHEMA_HASH
201+
: SchemaHash.of(this.schema);
197202
this.topic = topic;
198203
this.encryptionKeys = encryptionKeys;
199204
this.customizers = customizers;

spring-pulsar/src/test/java/org/springframework/pulsar/core/CachingPulsarProducerFactoryTests.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,6 +49,7 @@
4949
import org.springframework.pulsar.cache.provider.CacheProvider;
5050
import org.springframework.pulsar.core.CachingPulsarProducerFactory.ProducerCacheKey;
5151
import org.springframework.pulsar.core.CachingPulsarProducerFactory.ProducerWithCloseCallback;
52+
import org.springframework.pulsar.test.support.model.UserPojo;
5253
import org.springframework.test.util.ReflectionTestUtils;
5354
import org.springframework.util.ObjectUtils;
5455

@@ -74,7 +75,7 @@ void cleanupFromTests() {
7475
}
7576

7677
@Test
77-
void createProducerMultipleCalls() throws PulsarClientException {
78+
void createProducerMultipleCalls() {
7879
var producerFactory = newProducerFactory();
7980
var cacheKey = new ProducerCacheKey<>(schema, "topic1", null, null);
8081
var producer1 = producerFactory.createProducer(schema, "topic1");
@@ -103,7 +104,7 @@ void cachedProducerIsCloseSafeWrapper() throws PulsarClientException {
103104

104105
@SuppressWarnings("resource")
105106
@Test
106-
void createProducerWithMatrixOfCacheKeys() throws PulsarClientException {
107+
void createProducerWithMatrixOfCacheKeys() {
107108
String topic1 = "topic1";
108109
String topic2 = "topic2";
109110
var schema1 = new StringSchema();
@@ -167,7 +168,7 @@ void createProducerWithMatrixOfCacheKeys() throws PulsarClientException {
167168
}
168169

169170
@Test
170-
void factoryDestroyCleansUpCacheAndClosesProducers() throws PulsarClientException {
171+
void factoryDestroyCleansUpCacheAndClosesProducers() {
171172
CachingPulsarProducerFactory<String> producerFactory = producerFactory(pulsarClient, null, null);
172173
var actualProducer1 = actualProducer(producerFactory.createProducer(schema, "topic1"));
173174
var actualProducer2 = actualProducer(producerFactory.createProducer(schema, "topic2"));
@@ -183,7 +184,7 @@ void factoryDestroyCleansUpCacheAndClosesProducers() throws PulsarClientExceptio
183184
}
184185

185186
@Test
186-
void producerEvictedFromCache() throws PulsarClientException {
187+
void producerEvictedFromCache() {
187188
CachingPulsarProducerFactory<String> producerFactory = new CachingPulsarProducerFactory<>(pulsarClient, null,
188189
null, new DefaultTopicResolver(), Duration.ofSeconds(3L), 10L, 2);
189190
var actualProducer = actualProducer(producerFactory.createProducer(schema, "topic1"));
@@ -306,7 +307,21 @@ static Stream<Arguments> equalsAndHashCodeTestProvider() {
306307
arguments(
307308
Named.of("differentNullInterceptor",
308309
new ProducerCacheKey<>(Schema.STRING, "topic1", encryptionKeys1, customizers1)),
309-
new ProducerCacheKey<>(Schema.STRING, "topic1", null, null), false));
310+
new ProducerCacheKey<>(Schema.STRING, "topic1", null, null), false),
311+
arguments(
312+
Named.of("sameAutoProduceSchemaSameTopic",
313+
new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic1", null, null)),
314+
new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic1", null, null), true),
315+
arguments(
316+
Named.of("autoProduceSchemaOneWithAndOneWithoutSchemaInfo",
317+
new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic1", null, null)),
318+
new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(UserPojo.class)), "topic1",
319+
null, null),
320+
true),
321+
arguments(
322+
Named.of("sameAutoProduceSchemaDifferentTopic",
323+
new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic1", null, null)),
324+
new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic2", null, null), false));
310325
}
311326

312327
}
@@ -315,7 +330,7 @@ static Stream<Arguments> equalsAndHashCodeTestProvider() {
315330
class RestartFactoryTests {
316331

317332
@Test
318-
void restartLifecycle() throws PulsarClientException {
333+
void restartLifecycle() {
319334
var producerFactory = (CachingPulsarProducerFactory<String>) producerFactory(pulsarClient, null, null);
320335
producerFactory.start();
321336
var producer1 = producerFactory.createProducer(schema, "topic1");

0 commit comments

Comments
 (0)