Skip to content

Commit 63ab552

Browse files
authored
feat: producer avro schema validation (#60)
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
1 parent 5c9f9bc commit 63ab552

File tree

8 files changed

+187
-10
lines changed

8 files changed

+187
-10
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,8 @@ build/
3434
### Local env ###
3535
.env
3636

37+
### Register-schema scripts (dev/local) ###
38+
development/scripts/register-schema/
39+
3740
### Mac OS ###
3841
.DS_Store

docs/sink/byte-array/manifests/api-key/byte-arr-producer-config.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@ data:
1515
# Example: OAuth2 - authParams: "${PULSAR_OAUTH_CLIENT_SECRET}"
1616
producer:
1717
enabled: true
18-
producerConfig:
18+
# Use broker schema to validate payloads; when false, uses Schema.BYTES (no validation).
19+
use-auto-produce-schema: true
20+
# When true, drop messages that fail schema/serialization and continue publishing.
21+
drop-invalid-messages: false
22+
producerConfig:
1923
topicName: "persistent://public/default/test-topic"
2024
admin:
2125
adminConfig: # Accepts the same key-value pair configurations as pulsar client: https://pulsar.apache.org/reference/#/4.0.x/client/client-configuration-client

src/main/java/io/numaproj/pulsar/config/producer/PulsarProducerConfig.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313

1414
import lombok.extern.slf4j.Slf4j;
1515

16+
import org.apache.pulsar.common.schema.SchemaInfo;
17+
18+
import java.nio.charset.StandardCharsets;
1619
import java.util.Map;
1720
import java.util.UUID;
1821

@@ -25,29 +28,46 @@ public class PulsarProducerConfig {
2528

2629
@Bean
2730
@ConditionalOnProperty(prefix = "spring.pulsar.producer", name = "enabled", havingValue = "true", matchIfMissing = false)
28-
public Producer<byte[]> pulsarProducer(PulsarClient pulsarClient, PulsarProducerProperties pulsarProducerProperties, PulsarAdmin pulsarAdmin)
29-
throws Exception {
31+
public Producer<byte[]> pulsarProducer(PulsarClient pulsarClient, PulsarProducerProperties pulsarProducerProperties,
32+
PulsarAdmin pulsarAdmin) throws Exception {
3033
String podName = env.getProperty("NUMAFLOW_POD", "pod-" + UUID.randomUUID());
3134
String producerName = "producerName";
3235

36+
pulsarProducerProperties.validateConfig();
37+
3338
Map<String, Object> producerConfig = pulsarProducerProperties.getProducerConfig();
3439
if (producerConfig.containsKey(producerName)) {
3540
log.warn("User configured a 'producerName' in the config, but this can cause errors if multiple pods spin "
3641
+ "up with the same name. Overriding with '{}'", podName);
3742
}
3843
producerConfig.put(producerName, podName);
3944

40-
// Validate that the topic configured in the producer config
45+
// Validate that the topic configured in the producer config exists in the Pulsar cluster
4146
String topicName = (String) producerConfig.get("topicName");
4247
if (topicName == null || topicName.trim().isEmpty()) {
4348
throw new IllegalArgumentException("Topic name must be configured in producer config");
4449
}
4550

4651
validateTopicExists(pulsarAdmin, topicName);
4752

48-
return pulsarClient.newProducer(Schema.BYTES)
53+
final Schema<byte[]> schema;
54+
if (pulsarProducerProperties.isUseAutoProduceSchema()) {
55+
schema = Schema.AUTO_PRODUCE_BYTES();
56+
} else {
57+
schema = Schema.BYTES;
58+
log.info("Producer using Schema.BYTES: no broker-side schema validation.");
59+
}
60+
61+
Producer<byte[]> producer = pulsarClient.newProducer(schema)
4962
.loadConf(producerConfig)
5063
.create();
64+
65+
SchemaInfo schemaInfo = schema.getSchemaInfo();
66+
log.info("Producer connected; schema initialized: type={}, name={}, schema={}",
67+
schemaInfo.getType(), schemaInfo.getName(),
68+
schemaInfo.getSchema() != null ? new String(schemaInfo.getSchema(), StandardCharsets.UTF_8) : "null");
69+
70+
return producer;
5171
}
5272

5373
private void validateTopicExists(PulsarAdmin pulsarAdmin, String topicName) throws PulsarAdminException {

src/main/java/io/numaproj/pulsar/config/producer/PulsarProducerProperties.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,26 @@
1515
@ConfigurationProperties(prefix = "spring.pulsar.producer")
1616
public class PulsarProducerProperties {
1717
private Map<String, Object> producerConfig = new HashMap<>(); // Default to an empty map
18+
19+
/**
20+
* When true (default), the producer uses Schema.AUTO_PRODUCE_BYTES so the broker enforces
21+
* format compatibility based on registered topic schema.
22+
*/
23+
private boolean useAutoProduceSchema = true;
24+
25+
/**
26+
* When true, messages that fail schema/serialization validation (e.g. SchemaSerializationException)
27+
* are dropped: the sink responds OK so the message is not retried, and the invalid payload is not
28+
* sent to Pulsar. When false (default), such messages are reported as failures downstream and may be retried.
29+
*/
30+
private boolean dropInvalidMessages = false;
31+
32+
public void validateConfig() {
33+
if (!useAutoProduceSchema && dropInvalidMessages) {
34+
throw new IllegalArgumentException(
35+
"Invalid combination: useAutoProduceSchema=false and dropInvalidMessages=true. "
36+
+ "dropInvalidMessages only applies when useAutoProduceSchema is true (broker validates schema). "
37+
+ "With Schema.BYTES there is no schema validation, so dropInvalidMessages has no effect.");
38+
}
39+
}
1840
}

src/main/java/io/numaproj/pulsar/producer/PulsarSink.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
import io.numaproj.numaflow.sinker.ResponseList;
77
import io.numaproj.numaflow.sinker.Server;
88
import io.numaproj.numaflow.sinker.Sinker;
9+
import io.numaproj.pulsar.config.producer.PulsarProducerProperties;
910
import lombok.extern.slf4j.Slf4j;
1011
import org.apache.pulsar.client.api.Producer;
1112
import org.apache.pulsar.client.api.PulsarClient;
1213
import org.apache.pulsar.client.api.PulsarClientException;
14+
import org.apache.pulsar.client.api.SchemaSerializationException;
1315
import org.springframework.beans.factory.annotation.Autowired;
1416
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
1517
import org.springframework.stereotype.Component;
@@ -19,6 +21,7 @@
1921
import java.util.ArrayList;
2022
import java.util.List;
2123
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.CompletionException;
2225

2326
@Slf4j
2427
@Component
@@ -31,6 +34,9 @@ public class PulsarSink extends Sinker {
3134
@Autowired
3235
private PulsarClient pulsarClient;
3336

37+
@Autowired
38+
private PulsarProducerProperties producerProperties;
39+
3440
private Server server;
3541

3642
@PostConstruct // starts server automatically when the spring context initializes
@@ -61,16 +67,24 @@ public ResponseList processMessages(DatumIterator datumIterator) {
6167
final byte[] msg = datum.getValue();
6268
final String msgId = datum.getId();
6369

64-
// Won't wait for broker to confirm receipt of msg before continuing
65-
// sendSync returns CompletableFuture which will complete when broker ack
6670
CompletableFuture<Void> future = producer.sendAsync(msg)
6771
.thenAccept(messageId -> {
6872
log.info("Processed message ID: {}, Content: {}", msgId, new String(msg));
6973
responseListBuilder.addResponse(Response.responseOK(msgId));
7074
})
7175
.exceptionally(ex -> {
72-
log.error("Error processing message ID {}: {}", msgId, ex.getMessage(), ex);
73-
responseListBuilder.addResponse(Response.responseFailure(msgId, ex.getMessage()));
76+
Throwable cause = ex instanceof CompletionException ? ex.getCause() : ex;
77+
if (producerProperties.isDropInvalidMessages() && isSchemaSerializationFailure(cause != null ? cause : ex)) {
78+
log.warn("Dropping message ID {} due to schema/serialization error (drop-invalid-messages=true): {}",
79+
msgId, cause != null ? cause.getMessage() : ex.getMessage());
80+
responseListBuilder.addResponse(Response.responseOK(msgId));
81+
} else if (isSchemaSerializationFailure(cause != null ? cause : ex)) {
82+
log.warn("Message ID {} failed schema validation, messages produced do not align with topic schema: {}", msgId, cause != null ? cause.getMessage() : ex.getMessage());
83+
responseListBuilder.addResponse(Response.responseFailure(msgId, cause != null ? cause.getMessage() : ex.getMessage()));
84+
} else {
85+
log.error("Error processing message ID {}: {}", msgId, ex.getMessage(), ex);
86+
responseListBuilder.addResponse(Response.responseFailure(msgId, ex.getMessage()));
87+
}
7488
return null;
7589
});
7690

@@ -83,6 +97,19 @@ public ResponseList processMessages(DatumIterator datumIterator) {
8397
return responseListBuilder.build();
8498
}
8599

100+
/**
101+
* True if the failure is due to schema/serialization (e.g. invalid Avro, EOF).
102+
* Pulsar wraps these in SchemaSerializationException (e.g. around EOFException).
103+
*/
104+
private static boolean isSchemaSerializationFailure(Throwable t) {
105+
for (Throwable c = t; c != null; c = c.getCause()) {
106+
if (c instanceof SchemaSerializationException) {
107+
return true;
108+
}
109+
}
110+
return false;
111+
}
112+
86113
@PreDestroy
87114
public void cleanup() {
88115
try {

src/test/java/io/numaproj/pulsar/config/producer/PulsarProducerConfigTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,27 @@ public void pulsarProducer_topicDoesNotExist_throwsException() throws Exception
218218
verify(mockProducerBuilder, never()).create();
219219
}
220220

221+
// Test that useAutoProduceSchema=false and dropInvalidMessages=true is rejected
222+
@Test
223+
public void pulsarProducer_useAutoProduceSchemaFalse_dropInvalidMessagesTrue_throwsException() throws Exception {
224+
PulsarProducerProperties properties = new PulsarProducerProperties();
225+
properties.setUseAutoProduceSchema(false);
226+
properties.setDropInvalidMessages(true);
227+
Map<String, Object> producerConfig = new HashMap<>();
228+
producerConfig.put("topicName", "persistent://tenant/namespace/test-topic");
229+
properties.setProducerConfig(producerConfig);
230+
231+
IllegalArgumentException exception = assertThrows(
232+
IllegalArgumentException.class,
233+
() -> spiedConfig.pulsarProducer(mockClient, properties, mockAdmin));
234+
235+
assertTrue("Message should describe invalid combination",
236+
exception.getMessage().contains("useAutoProduceSchema=false and dropInvalidMessages=true"));
237+
assertTrue("Message should mention dropInvalidMessages only applies with auto-produce",
238+
exception.getMessage().contains("dropInvalidMessages only applies when useAutoProduceSchema is true"));
239+
verify(mockProducerBuilder, never()).create();
240+
}
241+
221242
// Test for partitioned topic that exists (happy path)
222243
@Test
223244
public void pulsarProducer_partitionedTopicExists() throws Exception {
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.numaproj.pulsar.config.producer;
2+
3+
import org.junit.Test;
4+
5+
import static org.junit.Assert.assertThrows;
6+
import static org.junit.Assert.assertTrue;
7+
8+
public class PulsarProducerPropertiesTest {
9+
10+
@Test
11+
public void validate_useAutoProduceSchemaFalse_dropInvalidMessagesTrue_throwsException() {
12+
PulsarProducerProperties properties = new PulsarProducerProperties();
13+
properties.setUseAutoProduceSchema(false);
14+
properties.setDropInvalidMessages(true);
15+
16+
IllegalArgumentException exception = assertThrows(
17+
IllegalArgumentException.class,
18+
properties::validateConfig);
19+
20+
assertTrue("Message should describe invalid combination",
21+
exception.getMessage().contains("useAutoProduceSchema=false and dropInvalidMessages=true"));
22+
assertTrue("Message should mention dropInvalidMessages only applies with auto-produce",
23+
exception.getMessage().contains("dropInvalidMessages only applies when useAutoProduceSchema is true"));
24+
}
25+
26+
@Test
27+
public void validate_useAutoProduceSchemaTrue_dropInvalidMessagesTrue_doesNotThrow() {
28+
PulsarProducerProperties properties = new PulsarProducerProperties();
29+
properties.setUseAutoProduceSchema(true);
30+
properties.setDropInvalidMessages(true);
31+
properties.validateConfig();
32+
}
33+
34+
@Test
35+
public void validate_useAutoProduceSchemaFalse_dropInvalidMessagesFalse_doesNotThrow() {
36+
PulsarProducerProperties properties = new PulsarProducerProperties();
37+
properties.setUseAutoProduceSchema(false);
38+
properties.setDropInvalidMessages(false);
39+
properties.validateConfig();
40+
}
41+
}

src/test/java/io/numaproj/pulsar/numaflow/PulsarSinkTest.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,22 @@
33
import io.numaproj.numaflow.sinker.Datum;
44
import io.numaproj.numaflow.sinker.DatumIterator;
55
import io.numaproj.numaflow.sinker.ResponseList;
6+
import io.numaproj.pulsar.config.producer.PulsarProducerProperties;
67
import io.numaproj.pulsar.producer.PulsarSink;
78

89
import org.apache.pulsar.client.api.MessageId;
910
import org.apache.pulsar.client.api.Producer;
1011
import org.apache.pulsar.client.api.PulsarClient;
1112
import org.apache.pulsar.client.api.PulsarClientException;
13+
import org.apache.pulsar.client.api.SchemaSerializationException;
1214
import org.junit.Test;
1315
import org.springframework.test.util.ReflectionTestUtils;
1416
import static org.junit.Assert.*;
1517
import static org.mockito.ArgumentMatchers.any;
1618
import static org.mockito.Mockito.*;
1719

1820
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.CompletionException;
1922

2023
public class PulsarSinkTest {
2124

@@ -52,7 +55,7 @@ public void processMessages_responseSuccess() throws Exception {
5255
// Failed to process messages because the thread waiting for the next datum is
5356
// interrupted; no new messages
5457
@Test
55-
public void processMessages_responseFailure_datumInterupted() throws Exception {
58+
public void processMessages_responseFailure_datumInterrupted() throws Exception {
5659
PulsarSink pulsarSink = new PulsarSink();
5760
ByteProducer mockProducer = mock(ByteProducer.class);
5861
DatumIterator mockIterator = mock(DatumIterator.class);
@@ -80,6 +83,9 @@ public void processMessages_responseFailure_addResponse() throws Exception {
8083
Datum mockDatum = mock(Datum.class);
8184

8285
ReflectionTestUtils.setField(pulsarSink, "producer", mockProducer);
86+
PulsarProducerProperties producerProperties = new PulsarProducerProperties();
87+
producerProperties.setDropInvalidMessages(false);
88+
ReflectionTestUtils.setField(pulsarSink, "producerProperties", producerProperties);
8389

8490
byte[] testMessage = "test message".getBytes();
8591

@@ -131,6 +137,9 @@ public void processMessages_responsePartialSuccess() throws Exception {
131137
Datum mockDatum2 = mock(Datum.class);
132138

133139
ReflectionTestUtils.setField(pulsarSink, "producer", mockProducer);
140+
PulsarProducerProperties producerProperties = new PulsarProducerProperties();
141+
producerProperties.setDropInvalidMessages(false);
142+
ReflectionTestUtils.setField(pulsarSink, "producerProperties", producerProperties);
134143

135144
byte[] testMessage1 = "message part 1".getBytes();
136145
byte[] testMessage2 = "message part 2".getBytes();
@@ -168,4 +177,34 @@ public void processMessages_responsePartialSuccess() throws Exception {
168177
assertTrue(response.getResponses().get(1).getErr().contains(exceptionMessage));
169178
}
170179

180+
// When future completes with SchemaSerializationException (async) and dropInvalidMessages is true, message is dropped.
181+
@Test
182+
public void processMessages_dropInvalidMessagesTrue_asyncSchemaSerializationException_dropsMessage() throws Exception {
183+
PulsarSink pulsarSink = new PulsarSink();
184+
ByteProducer mockProducer = mock(ByteProducer.class);
185+
DatumIterator mockIterator = mock(DatumIterator.class);
186+
Datum mockDatum = mock(Datum.class);
187+
188+
ReflectionTestUtils.setField(pulsarSink, "producer", mockProducer);
189+
PulsarProducerProperties producerProperties = new PulsarProducerProperties();
190+
producerProperties.setDropInvalidMessages(true);
191+
ReflectionTestUtils.setField(pulsarSink, "producerProperties", producerProperties);
192+
193+
byte[] testMessage = "invalid".getBytes();
194+
when(mockDatum.getValue()).thenReturn(testMessage);
195+
when(mockDatum.getId()).thenReturn("msg-schema");
196+
when(mockIterator.next()).thenReturn(mockDatum, (Datum) null);
197+
198+
CompletableFuture<MessageId> future = new CompletableFuture<>();
199+
future.completeExceptionally(new CompletionException(new SchemaSerializationException("Incompatible schema")));
200+
when(mockProducer.sendAsync(testMessage)).thenReturn(future);
201+
202+
ResponseList response = pulsarSink.processMessages(mockIterator);
203+
204+
verify(mockProducer).sendAsync(testMessage);
205+
assertEquals(1, response.getResponses().size());
206+
assertTrue("Expected message to be dropped (async schema error)", response.getResponses().get(0).getSuccess());
207+
assertEquals("msg-schema", response.getResponses().get(0).getId());
208+
}
209+
171210
}

0 commit comments

Comments
 (0)