Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type BuffersPool interface {

// BatcherBuilderProvider defines func which returns the BatchBuilder.
type BatcherBuilderProvider func(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
maxMessages uint, maxBatchSize uint, maxMessageSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) (BatchBuilder, error)
Expand Down Expand Up @@ -85,6 +85,8 @@ type batchContainer struct {
// without needing costly re-allocations.
maxBatchSize uint

maxMessageSize uint

producerName string
producerID uint64

Expand All @@ -102,18 +104,19 @@ type batchContainer struct {

// newBatchContainer init a batchContainer
func newBatchContainer(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
maxMessages uint, maxBatchSize uint, maxMessageSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) batchContainer {

bc := batchContainer{
buffer: NewBuffer(4096),
numMessages: 0,
maxMessages: maxMessages,
maxBatchSize: maxBatchSize,
producerName: producerName,
producerID: producerID,
buffer: NewBuffer(4096),
numMessages: 0,
maxMessages: maxMessages,
maxBatchSize: maxBatchSize,
maxMessageSize: maxMessageSize,
producerName: producerName,
producerID: producerID,
cmdSend: baseCommand(
pb.BaseCommand_SEND,
&pb.CommandSend{
Expand All @@ -139,13 +142,13 @@ func newBatchContainer(

// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
maxMessages uint, maxBatchSize uint, maxMessageSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) (BatchBuilder, error) {

bc := newBatchContainer(
maxMessages, maxBatchSize, producerName, producerID, compressionType,
maxMessages, maxBatchSize, maxMessageSize, producerName, producerID, compressionType,
level, bufferPool, logger, encryptor,
)

Expand Down Expand Up @@ -259,9 +262,13 @@ func (bc *batchContainer) Flush() (
}

if err = serializeBatch(
buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, bc.encryptor,
buffer, bc.cmdSend, bc.msgMetadata, bc.maxMessageSize, bc.buffer, bc.compressionProvider, bc.encryptor,
); err == nil { // no error in serializing Batch
sequenceID = bc.cmdSend.Send.GetSequenceId()
} else {
bc.log.WithError(err).
WithField("payloadSize", len(bc.buffer.ReadableSlice())).
Errorf("BatchBuilder flush: MaxMessageSize %d", bc.maxMessageSize)
}

callbacks = bc.callbacks
Expand Down
9 changes: 9 additions & 0 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ var ErrEOM = errors.New("EOF")

var ErrConnectionClosed = errors.New("connection closed")

var ErrMessageTooLarge = errors.New("batch message payload size exceed MaxMessageSize")

func NewMessageReader(headersAndPayload Buffer) *MessageReader {
return &MessageReader{
buffer: headersAndPayload,
Expand Down Expand Up @@ -241,6 +243,7 @@ func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload [
func serializeBatch(wb Buffer,
cmdSend *pb.BaseCommand,
msgMetadata *pb.MessageMetadata,
maxMessageSize uint,
uncompressedPayload Buffer,
compressionProvider compression.Provider,
encryptor crypto.Encryptor) error {
Expand All @@ -257,6 +260,12 @@ func serializeBatch(wb Buffer,
return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error :%v", err)
}

// check whether payload is bigger than MaxMessageSize
// refer to https://pulsar.apache.org/docs/developing-binary-protocol
if len(encryptedPayload) > int(maxMessageSize) {
return ErrMessageTooLarge
}

cmdSize := uint32(proto.Size(cmdSend))
msgMetadataSize := uint32(proto.Size(msgMetadata))

Expand Down
6 changes: 3 additions & 3 deletions pulsar/internal/key_based_batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ func (h *keyBasedBatches) Val(key string) *batchContainer {
// NewKeyBasedBatchBuilder init batch builder and return BatchBuilder
// pointer. Build a new key based batch message container.
func NewKeyBasedBatchBuilder(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
maxMessages uint, maxBatchSize uint, maxMessageSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) (BatchBuilder, error) {

bb := &keyBasedBatchContainer{
batches: newKeyBasedBatches(),
batchContainer: newBatchContainer(
maxMessages, maxBatchSize, producerName, producerID,
maxMessages, maxBatchSize, maxMessageSize, producerName, producerID,
compressionType, level, bufferPool, logger, encryptor,
),
compressionType: compressionType,
Expand Down Expand Up @@ -151,7 +151,7 @@ func (bc *keyBasedBatchContainer) Add(
if batchPart == nil {
// create batchContainer for new key
t := newBatchContainer(
bc.maxMessages, bc.maxBatchSize, bc.producerName, bc.producerID,
bc.maxMessages, bc.maxBatchSize, bc.maxMessageSize, bc.producerName, bc.producerID,
bc.compressionType, bc.level, bc.buffersPool, bc.log, bc.encryptor,
)
batchPart = &t
Expand Down
17 changes: 12 additions & 5 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (p *partitionProducer) grabCnx() error {
p.log.WithError(err).Error("Failed to create producer at send PRODUCER request")
return err
}
p._setConn(res.Cnx)

p.producerName = res.Response.ProducerSuccess.GetProducerName()

Expand All @@ -258,7 +259,8 @@ func (p *partitionProducer) grabCnx() error {

if p.options.DisableBatching {
provider, _ := GetBatcherBuilderProvider(DefaultBatchBuilder)
p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
p.batchBuilder, err = provider(p.options.BatchingMaxMessages,
p.options.BatchingMaxSize, uint(p._getConn().GetMaxMessageSize()),
p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
compression.Level(p.options.CompressionLevel),
p,
Expand All @@ -273,7 +275,8 @@ func (p *partitionProducer) grabCnx() error {
provider, _ = GetBatcherBuilderProvider(DefaultBatchBuilder)
}

p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
p.batchBuilder, err = provider(p.options.BatchingMaxMessages,
p.options.BatchingMaxSize, uint(p._getConn().GetMaxMessageSize()),
p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
compression.Level(p.options.CompressionLevel),
p,
Expand All @@ -294,7 +297,6 @@ func (p *partitionProducer) grabCnx() error {
p.schemaCache.Put(p.schemaInfo, schemaVersion)
}

p._setConn(res.Cnx)
err = p._getConn().RegisterListener(p.producerID, p)
if err != nil {
return err
Expand Down Expand Up @@ -519,8 +521,9 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
}
}

// if msg is too large
if len(payload) > int(p._getConn().GetMaxMessageSize()) {
// if msg is too large and compress is not enabled
if len(payload) > int(p._getConn().GetMaxMessageSize()) &&
p.options.CompressionType == NoCompression {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
Expand Down Expand Up @@ -625,6 +628,10 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
if err != nil {
for _, cb := range callbacks {
if sr, ok := cb.(*sendRequest); ok {
if errors.Is(err, internal.ErrMessageTooLarge) {
err = errMessageTooLarge
p.metrics.PublishErrorsMsgTooLarge.Inc()
}
sr.callback(nil, sr.msg, err)
}
}
Expand Down
48 changes: 29 additions & 19 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,36 +916,46 @@ func TestBatchingDisabled(t *testing.T) {
}

func TestMaxMessageSize(t *testing.T) {
serverMaxMessageSize := 1024 * 1024
serverMaxMessageSize := 1024 * 1024 * 5

client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.NoError(t, err)
defer client.Close()

// Need to set BatchingMaxSize > serverMaxMessageSize to avoid errMessageTooLarge
// being masked by an earlier errFailAddToBatch
producer, err := client.CreateProducer(ProducerOptions{
f := func(option ProducerOptions) {
// Need to set BatchingMaxSize > serverMaxMessageSize to avoid errMessageTooLarge
// being masked by an earlier errFailAddToBatch
producer, err := client.CreateProducer(option)
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()

for i := 0; i < 10; i++ {
ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, serverMaxMessageSize),
})
if option.CompressionType == NoCompression {
assert.Equal(t, errMessageTooLarge, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, ID)
}
}
}

f(ProducerOptions{
Topic: newTopicName(),
BatchingMaxSize: uint(2 * serverMaxMessageSize),
})
assert.NoError(t, err)
assert.NotNil(t, producer)
defer producer.Close()

for bias := -1; bias <= 1; bias++ {
payload := make([]byte, serverMaxMessageSize+bias)
ID, err := producer.Send(context.Background(), &ProducerMessage{
Payload: payload,
})
if bias <= 0 {
assert.NoError(t, err)
assert.NotNil(t, ID)
} else {
assert.Equal(t, errMessageTooLarge, err)
}
}
f(ProducerOptions{
Topic: newTopicName(),
BatchingMaxSize: uint(2 * serverMaxMessageSize),
CompressionType: LZ4,
CompressionLevel: Default,
})
}

func TestFailedSchemaEncode(t *testing.T) {
Expand Down