Skip to content

feat: consumer schema validation for avro#59

Merged
KeranYang merged 14 commits intomainfrom
feature/consumer-schema-validation
Feb 25, 2026
Merged

feat: consumer schema validation for avro#59
KeranYang merged 14 commits intomainfrom
feature/consumer-schema-validation

Conversation

@ccm32004
Copy link
Collaborator

@ccm32004 ccm32004 commented Feb 18, 2026

when consumer receives messages from topic that is registered with an avro schema, will perform validation to ensure that correct fields are defined in message payload, and that correct format is used (not json etc). uses apache pulsar client consumer AUTO_CONSUME function.

  • added configuration behaviour:
    • user also has option to turn auto consume on or off for enable/disable consumer schema validation
  • added tests to verify behaviour

Testing notes:

  • tested with pulsar streamnative cluster instance
  • created topic registered with avro schema of only one field (ie name field), published messages to that topic with one field, then reregistered the same topic with a different avro schema of two fields
  • created consumer pipeline connecting to pulsar topic, consumer pulls pulsar topic schema with two fields, but receives avro messages with only one field, thus performs decoding on message payload and fails

Figure 1: application fails after pipeline consumes methods that don't match schema registered to topic
Screenshot 2026-02-24 at 2 48 53 PM

Tested happy path where messages matched topic schema as well, verified that correct message logs were being consumed, also sink shows malformed json because it cannot deserialize avro bytes
Screenshot 2026-02-19 at 10 22 33 AM

Signed-off-by: Cece Ma <mayuqing131@gmail.com>
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
@ccm32004 ccm32004 requested a review from KeranYang February 18, 2026 16:19
@ccm32004 ccm32004 marked this pull request as draft February 18, 2026 17:29
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
@ccm32004 ccm32004 marked this pull request as ready for review February 19, 2026 16:02
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
bytesConsumer = pulsarClient.newConsumer(Schema.BYTES)
.loadConf(pulsarConsumerProperties.getConsumerConfig())
.batchReceivePolicy(batchPolicy)
.subscriptionType(SubscriptionType.Shared)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.subscriptionType(SubscriptionType.Shared)
.subscriptionType(SubscriptionType.Shared) // Must be shared to support multiple pods

.loadConf(pulsarConsumerProperties.getConsumerConfig())
.batchReceivePolicy(batchPolicy)
.subscriptionType(SubscriptionType.Shared) // Must be shared to support multiple pods
.subscriptionType(SubscriptionType.Shared)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.subscriptionType(SubscriptionType.Shared)
.subscriptionType(SubscriptionType.Shared) // Must be shared to support multiple pods

* Returns the consumer used for receiving and acknowledgment, creating it if necessary.
* Matches useAutoConsumeSchema. Never null.
*/
public Consumer<?> getConsumer() throws PulsarClientException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two methods caller can call to get a byte array consumer. Please keep one. See if we can mark getOrCreateGenericRecordConsumer and getOrCreateGenericRecordConsumer as private.

Comment on lines +146 to +148
if (t instanceof SchemaSerializationException
|| t instanceof IOException
|| t instanceof UnsupportedOperationException) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document more details about each of these exceptions. When will it be thrown under a schema validation case.

Signed-off-by: Cece Ma <mayuqing131@gmail.com>
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
Signed-off-by: Cece Ma <mayuqing131@gmail.com>
@ccm32004 ccm32004 requested a review from KeranYang February 24, 2026 20:38
.maxNumMessages((int) count)
.timeout((int) timeoutMillis, TimeUnit.MILLISECONDS) // We do not expect user to specify a number larger
// than 2^63 - 1 which will cause an overflow
.timeout((int) timeoutMillis, TimeUnit.MILLISECONDS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep the original comments.

consumerManagerMock = mock(PulsarConsumerManager.class);
consumerMock = mock(Consumer.class);
// Inject the mocked PulsarConsumerManager into pulsarSource using
// Inject the mocked PulsarConsumerManager into pulsarSource using
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Inject the mocked PulsarConsumerManager into pulsarSource using
// Inject the mocked PulsarConsumerManager into pulsarSource using

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider configuring Format on Save.

Signed-off-by: Cece Ma <mayuqing131@gmail.com>
@ccm32004 ccm32004 requested a review from KeranYang February 25, 2026 20:56
@KeranYang KeranYang merged commit 97d782f into main Feb 25, 2026
2 checks passed
@KeranYang KeranYang deleted the feature/consumer-schema-validation branch February 25, 2026 21:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants