diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index b9f462341..7cca37d38 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -75,6 +75,8 @@ type MessageReader struct { buffer Buffer // true if we are parsing a batched message - set after parsing the message metadata batched bool + // true if the message has properties - set after parsing the message metadata + hasProperties bool } // ReadChecksum @@ -118,6 +120,10 @@ func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error) { r.batched = true } + if len(meta.Properties) > 0 { + r.hasProperties = true + } + return &meta, nil } @@ -137,6 +143,9 @@ func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error) { func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error) { if r.buffer.ReadableBytes() == 0 && r.buffer.Capacity() > 0 { + if !r.batched && r.hasProperties { + return nil, []byte{}, nil + } return nil, nil, ErrEOM } if !r.batched { diff --git a/pulsar/internal/commands_test.go b/pulsar/internal/commands_test.go index c236e10cb..4371823df 100644 --- a/pulsar/internal/commands_test.go +++ b/pulsar/internal/commands_test.go @@ -98,8 +98,8 @@ func TestReadMessageOldFormat(t *testing.T) { assert.Equal(t, true, ssm == nil) assert.Equal(t, "hello", string(payload)) - _, _, err = reader.ReadMessage() - assert.Equal(t, ErrEOM, err) + _, payload, _ = reader.ReadMessage() + assert.Equal(t, []byte{}, payload) } func TestReadMessagesBatchSize1(t *testing.T) { diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 58c72525c..e9d01ee12 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -1298,3 +1298,49 @@ func TestReaderReadFromLatest(t *testing.T) { require.Error(t, err) require.Nil(t, msg) } + +func TestReaderEmptyPayloadNonemptyPropsNonBatch(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + }) + assert.Nil(t, err) + defer reader.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Properties: map[string]string{"key": "value"}, + Payload: []byte{}, + }) + assert.NoError(t, err) + } + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := reader.Next(ctx) + assert.NoError(t, err) + + assert.Equal(t, map[string]string{"key": "value"}, msg.Properties()) + assert.Equal(t, []byte{}, msg.Payload()) + } +}