Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 76 additions & 30 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
type BatcherBuilderProvider func(
maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor,
bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor, sequenceIDGenerator *SequenceIDGenerator,
) (BatchBuilder, error)

// BatchBuilder is a interface of batch builders
Expand All @@ -43,13 +43,14 @@ type BatchBuilder interface {

// Add will add single message to batch.
Add(
metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
metadata *pb.SingleMessageMetadata,
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
schemaVersion []byte, multiSchemaEnabled bool,
useTxn bool,
mostSigBits uint64,
leastSigBits uint64,
customSequenceID *int64,
) bool

// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
Expand All @@ -73,6 +74,50 @@ type FlushBatch struct {
Error error
}

type messageEntry struct {
smm *pb.SingleMessageMetadata
payload []byte
customSequenceID *int64
}

type messageBatch struct {
entries []messageEntry
estimatedSize uint32
}

func (mb *messageBatch) Append(smm *pb.SingleMessageMetadata, payload []byte, customSequenceID *int64) {
mb.entries = append(mb.entries, messageEntry{
smm: smm,
payload: payload,
customSequenceID: customSequenceID,
})
mb.estimatedSize += uint32(len(payload) + proto.Size(smm) + 4 + 11) // metadataSize:4 sequenceID: [2,11]
}

func (mb *messageBatch) AssignSequenceIDs(sequenceIDGenerator *SequenceIDGenerator) {
for k, entry := range mb.entries {
if entry.smm.SequenceId != nil {
continue
}

var sequenceID uint64
if entry.customSequenceID != nil {
sequenceID = uint64(*entry.customSequenceID)
} else {
sequenceID = sequenceIDGenerator.Next()
}
mb.entries[k].smm.SequenceId = proto.Uint64(sequenceID)
}
}

func (mb *messageBatch) FlushTo(wb Buffer) {
for _, entry := range mb.entries {
addSingleMessageToBatch(wb, entry.smm, entry.payload)
}
mb.entries = mb.entries[:0]
mb.estimatedSize = 0
}

// batchContainer wraps the objects needed to a batch.
// batchContainer implement BatchBuilder as a single batch container.
type batchContainer struct {
Expand All @@ -91,8 +136,10 @@ type batchContainer struct {

maxMessageSize uint32

producerName string
producerID uint64
producerName string
producerID uint64
sequenceIDGenerator *SequenceIDGenerator
messageBatch

cmdSend *pb.BaseCommand
msgMetadata *pb.MessageMetadata
Expand All @@ -111,17 +158,18 @@ type batchContainer struct {
func newBatchContainer(
maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor,
bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor, sequenceIDGenerator *SequenceIDGenerator,
) batchContainer {

bc := batchContainer{
buffer: NewBuffer(4096),
numMessages: 0,
maxMessages: maxMessages,
maxBatchSize: maxBatchSize,
maxMessageSize: maxMessageSize,
producerName: producerName,
producerID: producerID,
buffer: NewBuffer(4096),
numMessages: 0,
maxMessages: maxMessages,
maxBatchSize: maxBatchSize,
maxMessageSize: maxMessageSize,
producerName: producerName,
producerID: producerID,
sequenceIDGenerator: sequenceIDGenerator,
cmdSend: baseCommand(
pb.BaseCommand_SEND,
&pb.CommandSend{
Expand Down Expand Up @@ -150,20 +198,20 @@ func newBatchContainer(
func NewBatchBuilder(
maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor,
bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor, sequenceIDGenerator *SequenceIDGenerator,
) (BatchBuilder, error) {

bc := newBatchContainer(
maxMessages, maxBatchSize, maxMessageSize, producerName, producerID, compressionType,
level, bufferPool, metrics, logger, encryptor,
level, bufferPool, metrics, logger, encryptor, sequenceIDGenerator,
)

return &bc, nil
}

// IsFull checks if the size in the current batch meets or exceeds the maximum size allowed by the batch
func (bc *batchContainer) IsFull() bool {
return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize)
return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize) || bc.estimatedSize >= uint32(bc.maxBatchSize)
}

// hasSpace should return true if and only if the batch container can accommodate another message of length payload.
Expand All @@ -175,7 +223,8 @@ func (bc *batchContainer) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
expectedSize := bc.buffer.ReadableBytes() + msgSize
return bc.numMessages+1 <= bc.maxMessages &&
expectedSize <= uint32(bc.maxBatchSize) && expectedSize <= bc.maxMessageSize
expectedSize <= uint32(bc.maxBatchSize) && expectedSize <= bc.maxMessageSize &&
bc.estimatedSize+msgSize <= uint32(bc.maxBatchSize) && bc.estimatedSize+msgSize <= bc.maxMessageSize
}

func (bc *batchContainer) hasSameSchema(schemaVersion []byte) bool {
Expand All @@ -187,11 +236,11 @@ func (bc *batchContainer) hasSameSchema(schemaVersion []byte) bool {

// Add will add single message to batch.
func (bc *batchContainer) Add(
metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
metadata *pb.SingleMessageMetadata,
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
schemaVersion []byte, multiSchemaEnabled bool,
useTxn bool, mostSigBits uint64, leastSigBits uint64,
useTxn bool, mostSigBits uint64, leastSigBits uint64, customSequenceID *int64,
) bool {

if replicateTo != nil && bc.numMessages != 0 {
Expand All @@ -211,13 +260,6 @@ func (bc *batchContainer) Add(
}

if bc.numMessages == 0 {
var sequenceID uint64
if metadata.SequenceId != nil {
sequenceID = *metadata.SequenceId
} else {
sequenceID = GetAndAdd(sequenceIDGenerator, 1)
}
bc.msgMetadata.SequenceId = proto.Uint64(sequenceID)
bc.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now()))
bc.msgMetadata.ProducerName = &bc.producerName
bc.msgMetadata.ReplicateTo = replicateTo
Expand All @@ -229,13 +271,12 @@ func (bc *batchContainer) Add(
bc.msgMetadata.DeliverAtTime = proto.Int64(int64(TimestampMillis(deliverAt)))
}

bc.cmdSend.Send.SequenceId = proto.Uint64(sequenceID)
if useTxn {
bc.cmdSend.Send.TxnidMostBits = proto.Uint64(mostSigBits)
bc.cmdSend.Send.TxnidLeastBits = proto.Uint64(leastSigBits)
}
}
addSingleMessageToBatch(bc.buffer, metadata, payload)
bc.messageBatch.Append(metadata, payload, customSequenceID)

bc.numMessages++
bc.callbacks = append(bc.callbacks, callback)
Expand Down Expand Up @@ -272,13 +313,18 @@ func (bc *batchContainer) Flush() *FlushBatch {
bufferCount.Inc()
buffer.SetReleaseCallback(func() { bufferCount.Dec() })

sequenceID := uint64(0)
bc.messageBatch.AssignSequenceIDs(bc.sequenceIDGenerator)
sequenceID := *bc.messageBatch.entries[0].smm.SequenceId
bc.msgMetadata.SequenceId = proto.Uint64(sequenceID)
bc.cmdSend.Send.SequenceId = proto.Uint64(sequenceID)
bc.messageBatch.FlushTo(bc.buffer)

var err error
if err = serializeMessage(
buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider,
bc.encryptor, bc.maxMessageSize, true,
); err == nil { // no error in serializing Batch
sequenceID = bc.cmdSend.Send.GetSequenceId()
); err != nil {
sequenceID = 0
}

callbacks := bc.callbacks
Expand Down
74 changes: 74 additions & 0 deletions pulsar/internal/batch_builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package internal

import (
"bytes"
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
)

func TestBatchContainer_IsFull(t *testing.T) {
batcher, err := NewBatchBuilder(
10,
1000,
10000,
"test",
1,
pb.CompressionType_NONE,
compression.Level(0),
&bufferPoolImpl{},
NewMetricsProvider(2, map[string]string{}, prometheus.DefaultRegisterer),
log.NewLoggerWithLogrus(logrus.StandardLogger()),
&mockEncryptor{},
NewSequenceIDGenerator(1),
)
if err != nil {
assert.Fail(t, "Failed to create batcher")
}

f := func(payload []byte) bool {

Check failure on line 53 in pulsar/internal/batch_builder_test.go

View workflow job for this annotation

GitHub Actions / lint

TestBatchContainer_IsFull$1 - result 0 (bool) is never used (unparam)
return batcher.Add(&pb.SingleMessageMetadata{
PayloadSize: proto.Int32(123),
}, payload, nil, nil, time.Now(),
nil, false, false, 0, 0, nil)
}

// maxMessages
for i := 0; i < 9; i++ {
f([]byte("test"))
assert.False(t, batcher.IsFull())
}
f([]byte("test"))
assert.True(t, batcher.IsFull())

batcher.Flush()
assert.False(t, batcher.IsFull())

// maxBatchSize
f(bytes.Repeat([]byte("a"), 1000))
assert.True(t, batcher.IsFull())
}
20 changes: 11 additions & 9 deletions pulsar/internal/key_based_batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ func (h *keyBasedBatches) Val(key string) *batchContainer {
func NewKeyBasedBatchBuilder(
maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor,
bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor, sequenceIDGenerator *SequenceIDGenerator,
) (BatchBuilder, error) {

bb := &keyBasedBatchContainer{
batches: newKeyBasedBatches(),
batchContainer: newBatchContainer(
maxMessages, maxBatchSize, maxMessageSize, producerName, producerID,
compressionType, level, bufferPool, metrics, logger, encryptor,
compressionType, level, bufferPool, metrics, logger, encryptor, sequenceIDGenerator,
),
compressionType: compressionType,
level: level,
Expand All @@ -108,7 +108,7 @@ func NewKeyBasedBatchBuilder(

// IsFull checks if the size in the current batch meets or exceeds the maximum size allowed by the batch
func (bc *keyBasedBatchContainer) IsFull() bool {
return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize)
return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize) || bc.estimatedSize >= uint32(bc.maxBatchSize)
}

func (bc *keyBasedBatchContainer) IsMultiBatches() bool {
Expand All @@ -123,18 +123,20 @@ func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool {
return true
}
msgSize := uint32(len(payload))
return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize) &&
bc.estimatedSize+msgSize <= uint32(bc.maxBatchSize)
}

// Add will add single message to key-based batch with message key.
func (bc *keyBasedBatchContainer) Add(
metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
metadata *pb.SingleMessageMetadata,
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
schemaVersion []byte, multiSchemaEnabled bool,
useTxn bool,
mostSigBits uint64,
leastSigBits uint64,
customSequenceID *int64,
) bool {
if replicateTo != nil && bc.numMessages != 0 {
// If the current batch is not empty and we're trying to set the replication clusters,
Expand All @@ -155,22 +157,22 @@ func (bc *keyBasedBatchContainer) Add(
// create batchContainer for new key
t := newBatchContainer(
bc.maxMessages, bc.maxBatchSize, bc.maxMessageSize, bc.producerName, bc.producerID,
bc.compressionType, bc.level, bc.buffersPool, bc.metrics, bc.log, bc.encryptor,
bc.compressionType, bc.level, bc.buffersPool, bc.metrics, bc.log, bc.encryptor, bc.sequenceIDGenerator,
)
batchPart = &t
bc.batches.Add(msgKey, &t)
}

// add message to batch container
add := batchPart.Add(
metadata, sequenceIDGenerator, payload, callback, replicateTo,
metadata, payload, callback, replicateTo,
deliverAt,
schemaVersion, multiSchemaEnabled, useTxn, mostSigBits, leastSigBits,
schemaVersion, multiSchemaEnabled, useTxn, mostSigBits, leastSigBits, customSequenceID,
)
if !add {
return false
}
addSingleMessageToBatch(bc.buffer, metadata, payload)
bc.messageBatch.Append(metadata, payload, customSequenceID)

bc.numMessages++
bc.callbacks = append(bc.callbacks, callback)
Expand Down
6 changes: 3 additions & 3 deletions pulsar/internal/key_based_batch_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ func TestKeyBasedBatcherOrdering(t *testing.T) {
NewMetricsProvider(2, map[string]string{}, prometheus.DefaultRegisterer),
log.NewLoggerWithLogrus(logrus.StandardLogger()),
&mockEncryptor{},
NewSequenceIDGenerator(1),
)
if err != nil {
assert.Fail(t, "Failed to create key based batcher")
}

sequenceID := uint64(0)
for i := 0; i < 10; i++ {
metadata := &pb.SingleMessageMetadata{
OrderingKey: []byte(fmt.Sprintf("key-%d", i)),
PayloadSize: proto.Int32(0),
}
assert.True(t, keyBatcher.Add(metadata, &sequenceID, []byte("test"), nil, nil, time.Now(),
nil, false, false, 0, 0))
assert.True(t, keyBatcher.Add(metadata, []byte("test"), nil, nil, time.Now(),
nil, false, false, 0, 0, nil))
}

batches := keyBatcher.FlushBatches()
Expand Down
Loading
Loading