Skip to content

feat: producer avro schema validation#60

Merged
KeranYang merged 6 commits intomainfrom
feature/schema-validation
Feb 25, 2026
Merged

feat: producer avro schema validation#60
KeranYang merged 6 commits intomainfrom
feature/schema-validation

Conversation

@ccm32004
Copy link
Collaborator

@ccm32004 ccm32004 commented Feb 18, 2026

Added producer schema validation + tests:

  • if message passes schema validation, it processes and sends successfully (happy path)
Screenshot 2026-02-24 at 12 49 55 PM
  • if message doesn't pass schema validation and drop is configured, message is reported as successful as to not trigger resends
Screenshot 2026-02-24 at 12 29 28 PM
  • if message doesn't pass schema validation and drop is not configured, message is reported as failure downstream to numaflow
Screenshot 2026-02-24 at 12 32 48 PM

Other testing notes:

  • tested on pulsar streamnative cluster, registered schema to topic, and then configured the producer pipeline to generate message payloads that were valid/invalid
  • added validation for auto produce false and drop being true; resulted in the following log
Screenshot 2026-02-25 at 3 39 03 PM

Signed-off-by: Cece Ma <mayuqing131@gmail.com>
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
@ccm32004 ccm32004 requested a review from KeranYang February 24, 2026 18:59
@ccm32004 ccm32004 marked this pull request as ready for review February 24, 2026 18:59
producerConfig:
# Use broker schema to validate payloads; when false, uses Schema.BYTES (no validation).
use-auto-produce-schema: true
# When true, drop messages that fail schema/serialization (respond OK, no retry). When false, report failure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# When true, drop messages that fail schema/serialization (respond OK, no retry). When false, report failure.
# When true, drop messages that fail schema/serialization and continue publishing.
# When false, report failure.

@@ -15,4 +15,17 @@
@ConfigurationProperties(prefix = "spring.pulsar.producer")
public class PulsarProducerProperties {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we implement a validate method for PulsarProducerProperties? useAutoProduceSchema: false, dropInvalidMessages: true is not a validate combination.

Signed-off-by: Cece Ma <mayuqing131@gmail.com>
@ccm32004 ccm32004 requested a review from KeranYang February 25, 2026 20:47
@KeranYang KeranYang merged commit 63ab552 into main Feb 25, 2026
2 checks passed
@KeranYang KeranYang deleted the feature/schema-validation branch February 25, 2026 21:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants