diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index fe8f62808a..7b50530625 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -35,7 +35,7 @@ type BuffersPool interface { // BatcherBuilderProvider defines func which returns the BatchBuilder. type BatcherBuilderProvider func( - maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, + maxMessages uint, maxBatchSize uint, maxMessageSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor, ) (BatchBuilder, error) @@ -85,6 +85,8 @@ type batchContainer struct { // without needing costly re-allocations. maxBatchSize uint + maxMessageSize uint + producerName string producerID uint64 @@ -102,18 +104,19 @@ type batchContainer struct { // newBatchContainer init a batchContainer func newBatchContainer( - maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, + maxMessages uint, maxBatchSize uint, maxMessageSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor, ) batchContainer { bc := batchContainer{ - buffer: NewBuffer(4096), - numMessages: 0, - maxMessages: maxMessages, - maxBatchSize: maxBatchSize, - producerName: producerName, - producerID: producerID, + buffer: NewBuffer(4096), + numMessages: 0, + maxMessages: maxMessages, + maxBatchSize: maxBatchSize, + maxMessageSize: maxMessageSize, + producerName: producerName, + producerID: producerID, cmdSend: baseCommand( pb.BaseCommand_SEND, &pb.CommandSend{ @@ -139,13 +142,13 @@ func newBatchContainer( // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container. func NewBatchBuilder( - maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, + maxMessages uint, maxBatchSize uint, maxMessageSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor, ) (BatchBuilder, error) { bc := newBatchContainer( - maxMessages, maxBatchSize, producerName, producerID, compressionType, + maxMessages, maxBatchSize, maxMessageSize, producerName, producerID, compressionType, level, bufferPool, logger, encryptor, ) @@ -259,9 +262,13 @@ func (bc *batchContainer) Flush() ( } if err = serializeBatch( - buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, bc.encryptor, + buffer, bc.cmdSend, bc.msgMetadata, bc.maxMessageSize, bc.buffer, bc.compressionProvider, bc.encryptor, ); err == nil { // no error in serializing Batch sequenceID = bc.cmdSend.Send.GetSequenceId() + } else { + bc.log.WithError(err). + WithField("payloadSize", len(bc.buffer.ReadableSlice())). + Errorf("BatchBuilder flush: MaxMessageSize %d", bc.maxMessageSize) } callbacks = bc.callbacks diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go index 1af837e0b8..d0cc983795 100644 --- a/pulsar/internal/commands.go +++ b/pulsar/internal/commands.go @@ -50,6 +50,8 @@ var ErrEOM = errors.New("EOF") var ErrConnectionClosed = errors.New("connection closed") +var ErrMessageTooLarge = errors.New("batch message payload size exceed MaxMessageSize") + func NewMessageReader(headersAndPayload Buffer) *MessageReader { return &MessageReader{ buffer: headersAndPayload, @@ -241,6 +243,7 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [ func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata *pb.MessageMetadata, + maxMessageSize uint, uncompressedPayload Buffer, compressionProvider compression.Provider, encryptor crypto.Encryptor) error { @@ -257,6 +260,12 @@ func serializeBatch(wb Buffer, return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error :%v", err) } + // check whether payload is bigger than MaxMessageSize + // refer to https://pulsar.apache.org/docs/developing-binary-protocol + if len(encryptedPayload) > int(maxMessageSize) { + return ErrMessageTooLarge + } + cmdSize := uint32(proto.Size(cmdSend)) msgMetadataSize := uint32(proto.Size(msgMetadata)) diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go index 77fbb8c77a..40739e742f 100644 --- a/pulsar/internal/key_based_batch_builder.go +++ b/pulsar/internal/key_based_batch_builder.go @@ -84,7 +84,7 @@ func (h *keyBasedBatches) Val(key string) *batchContainer { // NewKeyBasedBatchBuilder init batch builder and return BatchBuilder // pointer. Build a new key based batch message container. func NewKeyBasedBatchBuilder( - maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, + maxMessages uint, maxBatchSize uint, maxMessageSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor, ) (BatchBuilder, error) { @@ -92,7 +92,7 @@ func NewKeyBasedBatchBuilder( bb := &keyBasedBatchContainer{ batches: newKeyBasedBatches(), batchContainer: newBatchContainer( - maxMessages, maxBatchSize, producerName, producerID, + maxMessages, maxBatchSize, maxMessageSize, producerName, producerID, compressionType, level, bufferPool, logger, encryptor, ), compressionType: compressionType, @@ -151,7 +151,7 @@ func (bc *keyBasedBatchContainer) Add( if batchPart == nil { // create batchContainer for new key t := newBatchContainer( - bc.maxMessages, bc.maxBatchSize, bc.producerName, bc.producerID, + bc.maxMessages, bc.maxBatchSize, bc.maxMessageSize, bc.producerName, bc.producerID, bc.compressionType, bc.level, bc.buffersPool, bc.log, bc.encryptor, ) batchPart = &t diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index ec924158c9..c773c9c98d 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -243,6 +243,7 @@ func (p *partitionProducer) grabCnx() error { p.log.WithError(err).Error("Failed to create producer at send PRODUCER request") return err } + p._setConn(res.Cnx) p.producerName = res.Response.ProducerSuccess.GetProducerName() @@ -258,7 +259,8 @@ func (p *partitionProducer) grabCnx() error { if p.options.DisableBatching { provider, _ := GetBatcherBuilderProvider(DefaultBatchBuilder) - p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize, + p.batchBuilder, err = provider(p.options.BatchingMaxMessages, + p.options.BatchingMaxSize, uint(p._getConn().GetMaxMessageSize()), p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType), compression.Level(p.options.CompressionLevel), p, @@ -273,7 +275,8 @@ func (p *partitionProducer) grabCnx() error { provider, _ = GetBatcherBuilderProvider(DefaultBatchBuilder) } - p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize, + p.batchBuilder, err = provider(p.options.BatchingMaxMessages, + p.options.BatchingMaxSize, uint(p._getConn().GetMaxMessageSize()), p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType), compression.Level(p.options.CompressionLevel), p, @@ -294,7 +297,6 @@ func (p *partitionProducer) grabCnx() error { p.schemaCache.Put(p.schemaInfo, schemaVersion) } - p._setConn(res.Cnx) err = p._getConn().RegisterListener(p.producerID, p) if err != nil { return err @@ -519,8 +521,9 @@ func (p *partitionProducer) internalSend(request *sendRequest) { } } - // if msg is too large - if len(payload) > int(p._getConn().GetMaxMessageSize()) { + // if msg is too large and compress is not enabled + if len(payload) > int(p._getConn().GetMaxMessageSize()) && + p.options.CompressionType == NoCompression { p.publishSemaphore.Release() request.callback(nil, request.msg, errMessageTooLarge) p.log.WithError(errMessageTooLarge). @@ -625,6 +628,10 @@ func (p *partitionProducer) internalFlushCurrentBatch() { if err != nil { for _, cb := range callbacks { if sr, ok := cb.(*sendRequest); ok { + if errors.Is(err, internal.ErrMessageTooLarge) { + err = errMessageTooLarge + p.metrics.PublishErrorsMsgTooLarge.Inc() + } sr.callback(nil, sr.msg, err) } } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index dc13f50dcb..e243986239 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -916,7 +916,7 @@ func TestBatchingDisabled(t *testing.T) { } func TestMaxMessageSize(t *testing.T) { - serverMaxMessageSize := 1024 * 1024 + serverMaxMessageSize := 1024 * 1024 * 5 client, err := NewClient(ClientOptions{ URL: serviceURL, @@ -924,28 +924,38 @@ func TestMaxMessageSize(t *testing.T) { assert.NoError(t, err) defer client.Close() - // Need to set BatchingMaxSize > serverMaxMessageSize to avoid errMessageTooLarge - // being masked by an earlier errFailAddToBatch - producer, err := client.CreateProducer(ProducerOptions{ + f := func(option ProducerOptions) { + // Need to set BatchingMaxSize > serverMaxMessageSize to avoid errMessageTooLarge + // being masked by an earlier errFailAddToBatch + producer, err := client.CreateProducer(option) + assert.NoError(t, err) + assert.NotNil(t, producer) + defer producer.Close() + + for i := 0; i < 10; i++ { + ID, err := producer.Send(context.Background(), &ProducerMessage{ + Payload: make([]byte, serverMaxMessageSize), + }) + if option.CompressionType == NoCompression { + assert.Equal(t, errMessageTooLarge, err) + } else { + assert.NoError(t, err) + assert.NotNil(t, ID) + } + } + } + + f(ProducerOptions{ Topic: newTopicName(), BatchingMaxSize: uint(2 * serverMaxMessageSize), }) - assert.NoError(t, err) - assert.NotNil(t, producer) - defer producer.Close() - for bias := -1; bias <= 1; bias++ { - payload := make([]byte, serverMaxMessageSize+bias) - ID, err := producer.Send(context.Background(), &ProducerMessage{ - Payload: payload, - }) - if bias <= 0 { - assert.NoError(t, err) - assert.NotNil(t, ID) - } else { - assert.Equal(t, errMessageTooLarge, err) - } - } + f(ProducerOptions{ + Topic: newTopicName(), + BatchingMaxSize: uint(2 * serverMaxMessageSize), + CompressionType: LZ4, + CompressionLevel: Default, + }) } func TestFailedSchemaEncode(t *testing.T) {