Skip to content

Compression broken if batching is off. #899

@flowchartsman

Description

@flowchartsman

If compression is enabled for a producer but batching is off, messages will be compressed, but will not be uncompressed again. This appears to be related to the chunking code in producer_partion.go introduced with #805 which compresses by default, but does not appear to set MessageMetadata.Compression nor test for it, so the consumer is not aware that messages are compressed, and will assume they are not.

The simplest case to reproduce this is to simply turn batching off and enable compression, however enabling compression in TestLargeMessage also causes it to fail, meaning chunking will be broken if compression is enabled.

func TestProducerCompressionUnbatched(t *testing.T) {
	topicName := "compressed-unbatched"
	client, err := NewClient(ClientOptions{
		URL: serviceURL,
	})
	assert.Nil(t, err)
	defer client.Close()

	producer, err := client.CreateProducer(ProducerOptions{
		Topic:           topicName,
		DisableBatching: true,
		CompressionType: LZ4,
	})

	assert.Nil(t, err)
	defer producer.Close()

	consumer, err := client.Subscribe(ConsumerOptions{
		Topic:            topicName,
		SubscriptionName: "my-sub",
	})
	assert.Nil(t, err)
	defer consumer.Close()

	payload := []byte(`{"json":"message"}`)

	ID, err := producer.Send(context.Background(), &ProducerMessage{
		Payload: payload,
	})
	assert.Nil(t, err)
	assert.NotNil(t, ID)

	msg, err := consumer.Receive(context.Background())
	assert.Nil(t, err)
	assert.Equal(t, payload, msg.Payload())
}

Result:

    producer_test.go:1768:
                Error Trace:    /pulsar-client-go/pulsar/producer_test.go:1768
                Error:          Not equal:
                                expected: []byte{0x7b, 0x22, 0x6a, 0x73, 0x6f, 0x6e, 0x22, 0x3a, 0x22, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x7d}
                                actual  : []byte{0xf0, 0x3, 0x7b, 0x22, 0x6a, 0x73, 0x6f, 0x6e, 0x22, 0x3a, 0x22, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x7d}

                                Diff:
                                --- Expected
                                +++ Actual
                                @@ -1,4 +1,4 @@
                                -([]uint8) (len=18) {
                                - 00000000  7b 22 6a 73 6f 6e 22 3a  22 6d 65 73 73 61 67 65  |{"json":"message|
                                - 00000010  22 7d                                             |"}|
                                +([]uint8) (len=20) {
                                + 00000000  f0 03 7b 22 6a 73 6f 6e  22 3a 22 6d 65 73 73 61  |..{"json":"messa|
                                + 00000010  67 65 22 7d                                       |ge"}|
                                 }
                Test:           TestProducerCompressionUnbatched

Expected behavior

Success

Actual behavior

Failure. Message has not been uncompressed.

Steps to reproduce

test-case is above.

System configuration

Pulsar version: 2.8.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions