Skip to content

Commit 97d782f

Browse files
authored
feat: consumer schema validation for avro (#59)
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
1 parent 63ab552 commit 97d782f

File tree

9 files changed

+369
-106
lines changed

9 files changed

+369
-106
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ build/
3434
### Local env ###
3535
.env
3636

37-
### Register-schema scripts (dev/local) ###
37+
### Register-schema scripts (not fully tested; exclude from consumer-schema-validation) ###
3838
development/scripts/register-schema/
3939

4040
### Mac OS ###

docs/source/byte-array/manifests/api-key/byte-arr-consumer-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ data:
1515
# Example: OAuth2 - authParams: "${PULSAR_OAUTH_CLIENT_SECRET}"
1616
consumer: # see here for all configurations: https://pulsar.apache.org/reference/#/4.0.x/client/client-configuration-consumer
1717
enabled: true
18+
useAutoConsumeSchema: true
1819
consumerConfig:
1920
# Single topic (string) or multiple topics (comma-separated string)
2021
topicNames: "persistent://public/default/test-topic"

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@
3333
<version>${lombok.version}</version>
3434
<optional>true</optional>
3535
</dependency>
36+
<dependency>
37+
<groupId>org.apache.avro</groupId>
38+
<artifactId>avro</artifactId>
39+
<version>1.11.3</version>
40+
<scope>test</scope>
41+
</dependency>
3642
<dependency>
3743
<groupId>org.springframework.boot</groupId>
3844
<artifactId>spring-boot-starter-test</artifactId>

src/main/java/io/numaproj/pulsar/config/consumer/PulsarConsumerProperties.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ public class PulsarConsumerProperties {
2828

2929
private Map<String, Object> consumerConfig = new HashMap<>(); // Default to an empty map
3030

31+
/**
32+
* When true (default), the consumer uses Schema.AUTO_CONSUME so the client validates
33+
* each message against the topic schema when decoding. When false, uses Schema.BYTES
34+
* (no schema check; messages are read as raw bytes).
35+
* When the topic has no schema (Pulsar treats it as BYTES), no validation and no decoding
36+
* is performed on the message bytes; they are passed through as bytes.
37+
*/
38+
private boolean useAutoConsumeSchema = true;
39+
3140
@PostConstruct
3241
public void init() {
3342
// Pulsar expects topicNames to be type Set<String>. Config accepts a single string

src/main/java/io/numaproj/pulsar/consumer/PulsarConsumerManager.java

Lines changed: 49 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.apache.pulsar.client.api.PulsarClientException;
88
import org.apache.pulsar.client.api.Schema;
99
import org.apache.pulsar.client.api.SubscriptionType;
10+
import org.apache.pulsar.client.api.schema.GenericRecord;
1011
import org.springframework.beans.factory.annotation.Autowired;
1112
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
1213
import org.springframework.stereotype.Component;
@@ -17,9 +18,9 @@
1718
import java.util.concurrent.TimeUnit;
1819

1920
/**
20-
* PulsarConsumerManager creates and maintains a single Consumer instance.
21-
* A new consumer is created based on the provided batch size and read timeout,
22-
* but once created it will be reused until explicitly removed.
21+
* Creates and maintains Pulsar consumers: one for byte[] (Schema.BYTES) and one for
22+
* schema-backed messages (Schema.AUTO_CONSUME / GenericRecord). Only the consumer
23+
* matching the configured schema is created and used; the other remains null.
2324
*/
2425
@Slf4j
2526
@Component
@@ -32,52 +33,72 @@ public class PulsarConsumerManager {
3233
@Autowired
3334
private PulsarClient pulsarClient;
3435

35-
// The current consumer instance.
36-
private Consumer<byte[]> currentConsumer;
36+
private Consumer<byte[]> bytesConsumer;
37+
private Consumer<GenericRecord> genericRecordConsumer;
3738

38-
// Returns the current consumer if it exists. If not, creates a new one.
39-
public Consumer<byte[]> getOrCreateConsumer(long count, long timeoutMillis)
40-
throws PulsarClientException {
41-
if (currentConsumer != null) {
42-
return currentConsumer;
39+
/** Returns the byte-array consumer, creating it if necessary. Use when not using AUTO_CONSUME. */
40+
public Consumer<byte[]> getOrCreateBytesConsumer(long count, long timeoutMillis) throws PulsarClientException {
41+
if (bytesConsumer != null) {
42+
return bytesConsumer;
4343
}
44-
4544
BatchReceivePolicy batchPolicy = BatchReceivePolicy.builder()
4645
.maxNumMessages((int) count)
47-
.timeout((int) timeoutMillis, TimeUnit.MILLISECONDS) // We do not expect user to specify a number larger
48-
// than 2^63 - 1 which will cause an overflow
46+
.timeout((int) timeoutMillis, TimeUnit.MILLISECONDS) // We do not expect user to specify a number larger than 2^63 - 1 which will cause an overflow
4947
.build();
50-
51-
currentConsumer = pulsarClient.newConsumer(Schema.BYTES)
48+
bytesConsumer = pulsarClient.newConsumer(Schema.BYTES)
5249
.loadConf(pulsarConsumerProperties.getConsumerConfig())
5350
.batchReceivePolicy(batchPolicy)
5451
.subscriptionType(SubscriptionType.Shared) // Must be shared to support multiple pods
5552
.subscribe();
53+
log.info("Created byte-array consumer; batch receive: {}, timeoutMillis: {}", count, timeoutMillis);
54+
return bytesConsumer;
55+
}
5656

57-
log.info("Created new consumer with batch receive policy of: {} and timeoutMillis: {}", count, timeoutMillis);
58-
return currentConsumer;
57+
/** Returns the GenericRecord (AUTO_CONSUME) consumer, creating it if necessary. Use when using AUTO_CONSUME. */
58+
public Consumer<GenericRecord> getOrCreateGenericRecordConsumer(long count, long timeoutMillis) throws PulsarClientException {
59+
if (genericRecordConsumer != null) {
60+
return genericRecordConsumer;
61+
}
62+
BatchReceivePolicy batchPolicy = BatchReceivePolicy.builder()
63+
.maxNumMessages((int) count)
64+
.timeout((int) timeoutMillis, TimeUnit.MILLISECONDS) // We do not expect user to specify a number larger than 2^63 - 1 which will cause an overflow
65+
.build();
66+
genericRecordConsumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
67+
.loadConf(pulsarConsumerProperties.getConsumerConfig())
68+
.batchReceivePolicy(batchPolicy)
69+
.subscriptionType(SubscriptionType.Shared) // Must be shared to support multiple pods
70+
.subscribe();
71+
log.info("Created AUTO_CONSUME (GenericRecord) consumer; batch receive: {}, timeoutMillis: {}", count, timeoutMillis);
72+
return genericRecordConsumer;
5973
}
6074

6175
@PreDestroy
6276
public void cleanup() {
63-
if (currentConsumer != null) {
77+
if (bytesConsumer != null) {
6478
try {
65-
currentConsumer.close();
66-
log.info("Consumer closed during cleanup.");
67-
} catch (PulsarClientException e) {
68-
log.error("Error while closing consumer in cleanup", e);
79+
bytesConsumer.close();
80+
log.info("Byte-array consumer closed.");
81+
} catch (Exception e) {
82+
log.error("Error closing byte-array consumer", e);
6983
}
84+
bytesConsumer = null;
85+
}
86+
if (genericRecordConsumer != null) {
87+
try {
88+
genericRecordConsumer.close();
89+
log.info("GenericRecord consumer closed.");
90+
} catch (Exception e) {
91+
log.error("Error closing GenericRecord consumer", e);
92+
}
93+
genericRecordConsumer = null;
7094
}
71-
7295
if (pulsarClient != null) {
7396
try {
7497
pulsarClient.close();
75-
log.info("Pulsar client closed during cleanup.");
98+
log.info("Pulsar client closed.");
7699
} catch (PulsarClientException e) {
77-
log.error("Error while closing the Pulsar client in cleanup", e);
100+
log.error("Error closing Pulsar client", e);
78101
}
79-
80102
}
81-
82103
}
83-
}
104+
}

src/main/java/io/numaproj/pulsar/consumer/PulsarSource.java

Lines changed: 107 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
import org.apache.pulsar.client.api.MessageId;
1616
import org.apache.pulsar.client.api.Messages;
1717
import org.apache.pulsar.client.api.PulsarClientException;
18+
import org.apache.pulsar.client.api.SchemaSerializationException;
19+
import org.apache.pulsar.client.api.schema.GenericRecord;
20+
import org.apache.pulsar.client.api.schema.Field;
1821
import org.apache.pulsar.client.admin.PulsarAdmin;
1922
import org.apache.pulsar.client.admin.PulsarAdminException;
2023
import org.apache.pulsar.common.policies.data.TopicStats;
@@ -24,6 +27,7 @@
2427
import org.springframework.stereotype.Component;
2528

2629
import javax.annotation.PostConstruct;
30+
import java.io.IOException;
2731
import java.nio.charset.StandardCharsets;
2832
import java.time.Instant;
2933
import java.util.ArrayList;
@@ -37,8 +41,11 @@
3741
@ConditionalOnProperty(prefix = "spring.pulsar.consumer", name = "enabled", havingValue = "true")
3842
public class PulsarSource extends Sourcer {
3943

40-
// Map tracking received messages (keyed by topicName + messageId for multi-topic support)
41-
private final Map<String, org.apache.pulsar.client.api.Message<byte[]>> messagesToAck = new HashMap<>();
44+
// Map tracking received messages (keyed by topicName + messageId for multi-topic support).
45+
// Value is Message<?> because we hold either Message<byte[]> or Message<GenericRecord> depending on
46+
// useAutoConsumeSchema. only use the Message interface from the map (getMessageId, getTopicName, etc.
47+
// for ack and buildHeaders), never the payload type, so a single map with Message<?> is appropriate;
48+
private final Map<String, org.apache.pulsar.client.api.Message<?>> messagesToAck = new HashMap<>();
4249

4350
private Server server;
4451

@@ -60,49 +67,112 @@ public void startServer() throws Exception {
6067

6168
@Override
6269
public void read(ReadRequest request, OutputObserver observer) {
63-
// If there are messages not acknowledged, return
6470
if (!messagesToAck.isEmpty()) {
6571
log.trace("messagesToAck not empty: {}", messagesToAck);
6672
return;
6773
}
6874

69-
Consumer<byte[]> consumer = null;
70-
7175
try {
72-
// Obtain a consumer with the desired settings.
73-
consumer = pulsarConsumerManager.getOrCreateConsumer(request.getCount(), request.getTimeout().toMillis());
76+
if (pulsarConsumerProperties.isUseAutoConsumeSchema()) {
77+
readWithAutoConsume(request, observer);
78+
} else {
79+
readWithBytes(request, observer);
80+
}
81+
} catch (Exception e) {
82+
log.error("Failed to read from Pulsar", e);
83+
throw new RuntimeException(e);
84+
}
85+
}
7486

75-
Messages<byte[]> batchMessages = consumer.batchReceive();
87+
private void readWithBytes(ReadRequest request, OutputObserver observer) throws PulsarClientException {
88+
Consumer<byte[]> consumer = pulsarConsumerManager.getOrCreateBytesConsumer(request.getCount(), request.getTimeout().toMillis());
89+
Messages<byte[]> batchMessages = consumer.batchReceive();
7690

77-
if (batchMessages == null || batchMessages.size() == 0) {
78-
log.trace("Received 0 messages, return early.");
79-
return;
80-
}
91+
if (batchMessages == null || batchMessages.size() == 0) {
92+
log.trace("Received 0 messages, return early.");
93+
return;
94+
}
8195

82-
// Process each message in the batch.
83-
for (org.apache.pulsar.client.api.Message<byte[]> pMsg : batchMessages) {
84-
String topicName = pMsg.getTopicName();
85-
String msgId = pMsg.getMessageId().toString();
86-
String topicMessageIdKey = topicName + msgId;
87-
88-
// TODO : change to .debug or .trace to reduce log noise
89-
log.info("Consumed Pulsar message [topic: {}, id: {}]: {}", topicName, pMsg.getMessageId(),
90-
new String(pMsg.getValue(), StandardCharsets.UTF_8));
96+
for (org.apache.pulsar.client.api.Message<byte[]> pMsg : batchMessages) {
97+
98+
// TODO : change to .debug or .trace to reduce log noise
99+
log.info("Consumed Pulsar message [topic: {}, id: {}]: {}", pMsg.getTopicName(), pMsg.getMessageId(),
100+
new String(pMsg.getValue(), StandardCharsets.UTF_8));
101+
sendMessage(pMsg, pMsg.getValue(), observer);
102+
}
103+
}
104+
105+
private void readWithAutoConsume(ReadRequest request, OutputObserver observer) throws PulsarClientException {
106+
Consumer<GenericRecord> consumer = pulsarConsumerManager.getOrCreateGenericRecordConsumer(request.getCount(), request.getTimeout().toMillis());
107+
Messages<GenericRecord> batchMessages = consumer.batchReceive();
108+
109+
if (batchMessages == null || batchMessages.size() == 0) {
110+
log.trace("Received 0 messages, return early.");
111+
return;
112+
}
113+
114+
for (org.apache.pulsar.client.api.Message<GenericRecord> pMsg : batchMessages) {
91115

92-
byte[] offsetBytes = topicMessageIdKey.getBytes(StandardCharsets.UTF_8);
93-
Offset offset = new Offset(offsetBytes);
116+
try {
117+
GenericRecord record = pMsg.getValue(); // This will throw SchemaSerializationException if the message is not valid based on the topic schema
118+
byte[] payloadBytes = pMsg.getData();
94119

95-
Map<String, String> headers = buildHeaders(pMsg);
120+
String decoded = recordToLogString(record);
121+
// TODO : change to .debug or .trace to reduce log noise
122+
log.info("Consumed Pulsar message (AUTO_CONSUME) [topic: {}, id: {}]: {} bytes, decoded={}", pMsg.getTopicName(), pMsg.getMessageId(), payloadBytes.length, decoded);
123+
sendMessage(pMsg, payloadBytes, observer);
124+
} catch (Exception e) {
125+
if (isSchemaValidationFailure(e)) {
126+
throw new RuntimeException("Schema validation failure", e);
127+
}
128+
throw new RuntimeException(e);
129+
}
130+
}
131+
}
96132

97-
Message message = new Message(pMsg.getValue(), offset, Instant.now(), headers);
98-
observer.send(message);
133+
/** Builds offset and headers, sends the message to the observer, and records it for ack. */
134+
private void sendMessage(org.apache.pulsar.client.api.Message<?> pMsg, byte[] payloadBytes, OutputObserver observer) {
135+
String topicMessageIdKey = pMsg.getTopicName() + pMsg.getMessageId().toString();
136+
byte[] offsetBytes = topicMessageIdKey.getBytes(StandardCharsets.UTF_8);
137+
Offset offset = new Offset(offsetBytes);
138+
Map<String, String> headers = buildHeaders(pMsg);
139+
observer.send(new Message(payloadBytes, offset, Instant.now(), headers));
140+
messagesToAck.put(topicMessageIdKey, pMsg);
141+
}
99142

100-
messagesToAck.put(topicMessageIdKey, pMsg);
143+
/** True if the exception indicates schema deserialization failure. The Pulsar client throws
144+
* SchemaSerializationException when decoding fails (e.g. schema mismatch, bad payload)
145+
* and wraps underlying IOException from the decoder as the cause. */
146+
private static boolean isSchemaValidationFailure(Throwable e) {
147+
for (Throwable t = e; t != null; t = t.getCause()) {
148+
if (t instanceof SchemaSerializationException) {
149+
return true;
101150
}
102-
} catch (PulsarClientException e) {
103-
log.error("Failed to get consumer or receive messages from Pulsar", e);
104-
throw new RuntimeException("Failed to get consumer or receive messages from Pulsar", e);
105151
}
152+
return false;
153+
}
154+
155+
// TODO : remove this logging later to reduce log noise
156+
/** Builds a log-friendly string of the decoded record (field names and values). */
157+
private static String recordToLogString(GenericRecord record) {
158+
if (record == null) {
159+
return "null";
160+
}
161+
List<Field> fields = record.getFields();
162+
if (fields == null || fields.isEmpty()) {
163+
return record.getSchemaType() + ":{}";
164+
}
165+
StringBuilder sb = new StringBuilder();
166+
sb.append(record.getSchemaType()).append(":{");
167+
for (int i = 0; i < fields.size(); i++) {
168+
if (i > 0) sb.append(", ");
169+
Field f = fields.get(i);
170+
String name = f.getName();
171+
Object value = record.getField(f);
172+
sb.append(name).append("=").append(value);
173+
}
174+
sb.append("}");
175+
return sb.toString();
106176
}
107177

108178
@Override
@@ -128,8 +198,11 @@ public void ack(AckRequest request) {
128198
.toList();
129199

130200
try {
131-
Consumer<byte[]> consumer = pulsarConsumerManager.getOrCreateConsumer(0, 0);
132-
consumer.acknowledge(messageIds);
201+
if (pulsarConsumerProperties.isUseAutoConsumeSchema()) {
202+
pulsarConsumerManager.getOrCreateGenericRecordConsumer(0, 0).acknowledge(messageIds);
203+
} else {
204+
pulsarConsumerManager.getOrCreateBytesConsumer(0, 0).acknowledge(messageIds);
205+
}
133206
log.info("Successfully acknowledged {} messages", messageIds.size());
134207
} catch (PulsarClientException e) {
135208
log.error("Failed to acknowledge Pulsar messages", e);
@@ -138,9 +211,9 @@ public void ack(AckRequest request) {
138211
}
139212

140213
/**
141-
* Builds headers from Pulsar message metadata
214+
* Builds headers from Pulsar message metadata. Works for both Message<?> (byte[] or GenericRecord).
142215
*/
143-
private Map<String, String> buildHeaders(org.apache.pulsar.client.api.Message<byte[]> pulsarMessage) {
216+
private Map<String, String> buildHeaders(org.apache.pulsar.client.api.Message<?> pulsarMessage) {
144217
Map<String, String> headers = new HashMap<>();
145218

146219
headers.put(NumaHeaderKeys.PULSAR_PRODUCER_NAME, pulsarMessage.getProducerName());
@@ -150,7 +223,6 @@ private Map<String, String> buildHeaders(org.apache.pulsar.client.api.Message<by
150223
headers.put(NumaHeaderKeys.PULSAR_EVENT_TIME, String.valueOf(pulsarMessage.getEventTime()));
151224
headers.put(NumaHeaderKeys.PULSAR_REDELIVERY_COUNT, String.valueOf(pulsarMessage.getRedeliveryCount()));
152225

153-
// Add message properties as headers
154226
if (pulsarMessage.getProperties() != null && !pulsarMessage.getProperties().isEmpty()) {
155227
pulsarMessage.getProperties().forEach((key, value) -> {
156228
if (key != null && value != null) {

src/test/java/io/numaproj/pulsar/config/consumer/PulsarConsumerPropertiesTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ public void consumerProperties_DefaultSubscriptionName() {
7777
);
7878
}
7979

80+
@Test
81+
public void consumerProperties_useAutoConsumeSchema_defaultTrue() {
82+
PulsarConsumerProperties properties = new PulsarConsumerProperties();
83+
assertTrue("useAutoConsumeSchema should default to true", properties.isUseAutoConsumeSchema());
84+
properties.setUseAutoConsumeSchema(false);
85+
assertFalse(properties.isUseAutoConsumeSchema());
86+
}
87+
8088
/**
8189
* Verify that if 'subscriptionName' is specified in the consumerConfig,
8290
* it is not overwritten by the default value.

0 commit comments

Comments
 (0)