Skip to content

Conversation

@coderzc
Copy link
Member

@coderzc coderzc commented Nov 3, 2025

Motivation

time="2025-11-03T12:22:21+08:00" level=info msg="Created producer" cnx="[::1]:55372 -> [::1]:6650" producerID=1 producer_name=standalone-0-14 topic="persistent://public/default/my-topic-965686000"
time="2025-11-03T12:22:21+08:00" level=error msg="Discarding corrupted message" consumerID=1 msgID="ledgerId:19 entryId:0 partition:-1" name= subscription=reader-ebjpf topic="persistent://public/default/my-topic-965686000" validationError=BatchDeSerializeError

Modifications

Correct parse non-batch messages that with non-empty properties and empty payloads.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@coderzc coderzc requested a review from RobertIndie November 3, 2025 04:34
@coderzc coderzc force-pushed the fix_message_parse_err branch from 0a18a00 to 778835c Compare November 4, 2025 13:36
@crossoverJie crossoverJie requested a review from Copilot November 4, 2025 14:22
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes handling of non-batched messages with empty payloads but non-empty properties. Previously, the ReadMessage() function would return an error (ErrEOM) when encountering such messages, but the fix allows these messages to be properly read by returning an empty byte slice instead.

Key changes:

  • Added hasProperties field to MessageReader to track whether the message metadata contains properties
  • Modified ReadMessage() to return []byte{} (empty payload) instead of ErrEOM for non-batched messages with properties and empty payloads
  • Updated existing test to verify the new behavior and added comprehensive end-to-end test

Reviewed Changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
pulsar/internal/commands.go Added hasProperties field to track message properties; modified ReadMessage() to handle empty payloads with properties
pulsar/internal/commands_test.go Updated test to verify empty payload is returned instead of ErrEOM for old format messages with properties
pulsar/reader_test.go Added end-to-end test for empty payload messages with properties in non-batched mode

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +123 to +125
if len(meta.Properties) > 0 {
r.hasProperties = true
}
Copy link

Copilot AI Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hasProperties field is never reset between message reads when a MessageReader is reused. If the same reader processes multiple messages and a message without properties follows one with properties, hasProperties will incorrectly remain true. Consider resetting this field in the ResetBuffer() method or at the start of ReadMessageMetadata().

Copilot uses AI. Check for mistakes.
_, _, err = reader.ReadMessage()
assert.Equal(t, ErrEOM, err)
_, payload, _ = reader.ReadMessage()
assert.Equal(t, []byte{}, payload)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to check the err is nil here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants