Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type MessageReader struct {
buffer Buffer
// true if we are parsing a batched message - set after parsing the message metadata
batched bool
// true if the message has properties - set after parsing the message metadata
hasProperties bool
}

// ReadChecksum
Expand Down Expand Up @@ -118,6 +120,10 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) {
r.batched = true
}

if len(meta.Properties) > 0 {
r.hasProperties = true
}
Comment on lines +123 to +125
Copy link

Copilot AI Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hasProperties field is never reset between message reads when a MessageReader is reused. If the same reader processes multiple messages and a message without properties follows one with properties, hasProperties will incorrectly remain true. Consider resetting this field in the ResetBuffer() method or at the start of ReadMessageMetadata().

Copilot uses AI. Check for mistakes.

return &meta, nil
}

Expand All @@ -137,6 +143,9 @@ func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error) {

func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) {
if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
if !r.batched && r.hasProperties {
return nil, []byte{}, nil
}
return nil, nil, ErrEOM
}
if !r.batched {
Expand Down
4 changes: 2 additions & 2 deletions pulsar/internal/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func TestReadMessageOldFormat(t *testing.T) {
assert.Equal(t, true, ssm == nil)
assert.Equal(t, "hello", string(payload))

_, _, err = reader.ReadMessage()
assert.Equal(t, ErrEOM, err)
_, payload, _ = reader.ReadMessage()
assert.Equal(t, []byte{}, payload)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to check the err is nil here.

}

func TestReadMessagesBatchSize1(t *testing.T) {
Expand Down
46 changes: 46 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,3 +1298,49 @@ func TestReaderReadFromLatest(t *testing.T) {
require.Error(t, err)
require.Nil(t, msg)
}

func TestReaderEmptyPayloadNonemptyPropsNonBatch(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
ctx := context.Background()

// create reader
reader, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: EarliestMessageID(),
})
assert.Nil(t, err)
defer reader.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
for i := 0; i < 10; i++ {
_, err := producer.Send(ctx, &ProducerMessage{
Properties: map[string]string{"key": "value"},
Payload: []byte{},
})
assert.NoError(t, err)
}

// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := reader.Next(ctx)
assert.NoError(t, err)

assert.Equal(t, map[string]string{"key": "value"}, msg.Properties())
assert.Equal(t, []byte{}, msg.Payload())
}
}