Skip to content

Commit bdfa6a0

Browse files
authored
Fix the issue of unable to parse non-batch messages that with non-empty properties and empty payloads. (#1435)
### Motivation ``` time="2025-11-03T12:22:21+08:00" level=info msg="Created producer" cnx="[::1]:55372 -> [::1]:6650" producerID=1 producer_name=standalone-0-14 topic="persistent://public/default/my-topic-965686000" time="2025-11-03T12:22:21+08:00" level=error msg="Discarding corrupted message" consumerID=1 msgID="ledgerId:19 entryId:0 partition:-1" name= subscription=reader-ebjpf topic="persistent://public/default/my-topic-965686000" validationError=BatchDeSerializeError ``` ### Modifications Correct parse non-batch messages that with non-empty properties and empty payloads.
1 parent 148c10c commit bdfa6a0

File tree

3 files changed

+58
-2
lines changed

3 files changed

+58
-2
lines changed

pulsar/internal/commands.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ type MessageReader struct {
7575
buffer Buffer
7676
// true if we are parsing a batched message - set after parsing the message metadata
7777
batched bool
78+
// true if the message has properties - set after parsing the message metadata
79+
hasProperties bool
7880
}
7981

8082
// ReadChecksum
@@ -118,6 +120,10 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) {
118120
r.batched = true
119121
}
120122

123+
if len(meta.Properties) > 0 {
124+
r.hasProperties = true
125+
}
126+
121127
return &meta, nil
122128
}
123129

@@ -137,6 +143,9 @@ func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error) {
137143

138144
func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) {
139145
if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 {
146+
if !r.batched && r.hasProperties {
147+
return nil, []byte{}, nil
148+
}
140149
return nil, nil, ErrEOM
141150
}
142151
if !r.batched {

pulsar/internal/commands_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,9 @@ func TestReadMessageOldFormat(t *testing.T) {
9898
assert.Equal(t, true, ssm == nil)
9999
assert.Equal(t, "hello", string(payload))
100100

101-
_, _, err = reader.ReadMessage()
102-
assert.Equal(t, ErrEOM, err)
101+
_, payload, err = reader.ReadMessage()
102+
assert.Equal(t, []byte{}, payload)
103+
assert.Nil(t, err)
103104
}
104105

105106
func TestReadMessagesBatchSize1(t *testing.T) {

pulsar/reader_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,3 +1298,49 @@ func TestReaderReadFromLatest(t *testing.T) {
12981298
require.Error(t, err)
12991299
require.Nil(t, msg)
13001300
}
1301+
1302+
func TestReaderEmptyPayloadNonemptyPropsNonBatch(t *testing.T) {
1303+
client, err := NewClient(ClientOptions{
1304+
URL: lookupURL,
1305+
})
1306+
1307+
assert.Nil(t, err)
1308+
defer client.Close()
1309+
1310+
topic := newTopicName()
1311+
ctx := context.Background()
1312+
1313+
// create reader
1314+
reader, err := client.CreateReader(ReaderOptions{
1315+
Topic: topic,
1316+
StartMessageID: EarliestMessageID(),
1317+
})
1318+
assert.Nil(t, err)
1319+
defer reader.Close()
1320+
1321+
// create producer
1322+
producer, err := client.CreateProducer(ProducerOptions{
1323+
Topic: topic,
1324+
DisableBatching: true,
1325+
})
1326+
assert.Nil(t, err)
1327+
defer producer.Close()
1328+
1329+
// send 10 messages
1330+
for i := 0; i < 10; i++ {
1331+
_, err := producer.Send(ctx, &ProducerMessage{
1332+
Properties: map[string]string{"key": "value"},
1333+
Payload: []byte{},
1334+
})
1335+
assert.NoError(t, err)
1336+
}
1337+
1338+
// receive 10 messages
1339+
for i := 0; i < 10; i++ {
1340+
msg, err := reader.Next(ctx)
1341+
assert.NoError(t, err)
1342+
1343+
assert.Equal(t, map[string]string{"key": "value"}, msg.Properties())
1344+
assert.Equal(t, []byte{}, msg.Payload())
1345+
}
1346+
}

0 commit comments

Comments
 (0)