diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index ee5c1299dd..ee4249f7c5 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -34,6 +34,7 @@ type BatcherBuilderProvider func( maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor, + sequenceIDGenerator *SequenceIDGenerator, ) (BatchBuilder, error) // BatchBuilder is a interface of batch builders @@ -43,13 +44,14 @@ type BatchBuilder interface { // Add will add single message to batch. Add( - metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, + metadata *pb.SingleMessageMetadata, payload []byte, callback interface{}, replicateTo []string, deliverAt time.Time, schemaVersion []byte, multiSchemaEnabled bool, useTxn bool, mostSigBits uint64, leastSigBits uint64, + customSequenceID *int64, ) bool // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. @@ -73,6 +75,50 @@ type FlushBatch struct { Error error } +type messageEntry struct { + smm *pb.SingleMessageMetadata + payload []byte + customSequenceID *int64 +} + +type messageBatch struct { + entries []messageEntry + estimatedSize uint32 +} + +func (mb *messageBatch) Append(smm *pb.SingleMessageMetadata, payload []byte, customSequenceID *int64) { + mb.entries = append(mb.entries, messageEntry{ + smm: smm, + payload: payload, + customSequenceID: customSequenceID, + }) + mb.estimatedSize += uint32(len(payload) + proto.Size(smm) + 4 + 11) // metadataSize:4 sequenceID: [2,11] +} + +func (mb *messageBatch) AssignSequenceIDs(sequenceIDGenerator *SequenceIDGenerator) { + for k, entry := range mb.entries { + if entry.smm.SequenceId != nil { + continue + } + + var sequenceID uint64 + if entry.customSequenceID != nil { + sequenceID = uint64(*entry.customSequenceID) + } else { + sequenceID = sequenceIDGenerator.Next() + } + mb.entries[k].smm.SequenceId = proto.Uint64(sequenceID) + } +} + +func (mb *messageBatch) FlushTo(wb Buffer) { + for _, entry := range mb.entries { + addSingleMessageToBatch(wb, entry.smm, entry.payload) + } + mb.entries = mb.entries[:0] + mb.estimatedSize = 0 +} + // batchContainer wraps the objects needed to a batch. // batchContainer implement BatchBuilder as a single batch container. type batchContainer struct { @@ -91,8 +137,10 @@ type batchContainer struct { maxMessageSize uint32 - producerName string - producerID uint64 + producerName string + producerID uint64 + sequenceIDGenerator *SequenceIDGenerator + messageBatch cmdSend *pb.BaseCommand msgMetadata *pb.MessageMetadata @@ -112,16 +160,18 @@ func newBatchContainer( maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor, + sequenceIDGenerator *SequenceIDGenerator, ) batchContainer { bc := batchContainer{ - buffer: NewBuffer(4096), - numMessages: 0, - maxMessages: maxMessages, - maxBatchSize: maxBatchSize, - maxMessageSize: maxMessageSize, - producerName: producerName, - producerID: producerID, + buffer: NewBuffer(4096), + numMessages: 0, + maxMessages: maxMessages, + maxBatchSize: maxBatchSize, + maxMessageSize: maxMessageSize, + producerName: producerName, + producerID: producerID, + sequenceIDGenerator: sequenceIDGenerator, cmdSend: baseCommand( pb.BaseCommand_SEND, &pb.CommandSend{ @@ -151,11 +201,12 @@ func NewBatchBuilder( maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor, + sequenceIDGenerator *SequenceIDGenerator, ) (BatchBuilder, error) { bc := newBatchContainer( maxMessages, maxBatchSize, maxMessageSize, producerName, producerID, compressionType, - level, bufferPool, metrics, logger, encryptor, + level, bufferPool, metrics, logger, encryptor, sequenceIDGenerator, ) return &bc, nil @@ -163,7 +214,9 @@ func NewBatchBuilder( // IsFull checks if the size in the current batch meets or exceeds the maximum size allowed by the batch func (bc *batchContainer) IsFull() bool { - return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize) + return bc.numMessages >= bc.maxMessages || + bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize) || + bc.estimatedSize >= uint32(bc.maxBatchSize) } // hasSpace should return true if and only if the batch container can accommodate another message of length payload. @@ -175,7 +228,8 @@ func (bc *batchContainer) hasSpace(payload []byte) bool { msgSize := uint32(len(payload)) expectedSize := bc.buffer.ReadableBytes() + msgSize return bc.numMessages+1 <= bc.maxMessages && - expectedSize <= uint32(bc.maxBatchSize) && expectedSize <= bc.maxMessageSize + expectedSize <= uint32(bc.maxBatchSize) && expectedSize <= bc.maxMessageSize && + bc.estimatedSize+msgSize <= uint32(bc.maxBatchSize) && bc.estimatedSize+msgSize <= bc.maxMessageSize } func (bc *batchContainer) hasSameSchema(schemaVersion []byte) bool { @@ -187,11 +241,11 @@ func (bc *batchContainer) hasSameSchema(schemaVersion []byte) bool { // Add will add single message to batch. func (bc *batchContainer) Add( - metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, + metadata *pb.SingleMessageMetadata, payload []byte, callback interface{}, replicateTo []string, deliverAt time.Time, schemaVersion []byte, multiSchemaEnabled bool, - useTxn bool, mostSigBits uint64, leastSigBits uint64, + useTxn bool, mostSigBits uint64, leastSigBits uint64, customSequenceID *int64, ) bool { if replicateTo != nil && bc.numMessages != 0 { @@ -211,13 +265,6 @@ func (bc *batchContainer) Add( } if bc.numMessages == 0 { - var sequenceID uint64 - if metadata.SequenceId != nil { - sequenceID = *metadata.SequenceId - } else { - sequenceID = GetAndAdd(sequenceIDGenerator, 1) - } - bc.msgMetadata.SequenceId = proto.Uint64(sequenceID) bc.msgMetadata.PublishTime = proto.Uint64(TimestampMillis(time.Now())) bc.msgMetadata.ProducerName = &bc.producerName bc.msgMetadata.ReplicateTo = replicateTo @@ -229,13 +276,12 @@ func (bc *batchContainer) Add( bc.msgMetadata.DeliverAtTime = proto.Int64(int64(TimestampMillis(deliverAt))) } - bc.cmdSend.Send.SequenceId = proto.Uint64(sequenceID) if useTxn { bc.cmdSend.Send.TxnidMostBits = proto.Uint64(mostSigBits) bc.cmdSend.Send.TxnidLeastBits = proto.Uint64(leastSigBits) } } - addSingleMessageToBatch(bc.buffer, metadata, payload) + bc.messageBatch.Append(metadata, payload, customSequenceID) bc.numMessages++ bc.callbacks = append(bc.callbacks, callback) @@ -264,6 +310,12 @@ func (bc *batchContainer) Flush() *FlushBatch { bc.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bc.numMessages)) bc.cmdSend.Send.NumMessages = proto.Int32(int32(bc.numMessages)) + bc.messageBatch.AssignSequenceIDs(bc.sequenceIDGenerator) + sequenceID := *bc.messageBatch.entries[0].smm.SequenceId + bc.msgMetadata.SequenceId = proto.Uint64(sequenceID) + bc.cmdSend.Send.SequenceId = proto.Uint64(sequenceID) + bc.messageBatch.FlushTo(bc.buffer) + uncompressedSize := bc.buffer.ReadableBytes() bc.msgMetadata.UncompressedSize = &uncompressedSize @@ -272,13 +324,12 @@ func (bc *batchContainer) Flush() *FlushBatch { bufferCount.Inc() buffer.SetReleaseCallback(func() { bufferCount.Dec() }) - sequenceID := uint64(0) var err error if err = serializeMessage( buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, bc.encryptor, bc.maxMessageSize, true, - ); err == nil { // no error in serializing Batch - sequenceID = bc.cmdSend.Send.GetSequenceId() + ); err != nil { + sequenceID = 0 } callbacks := bc.callbacks diff --git a/pulsar/internal/batch_builder_test.go b/pulsar/internal/batch_builder_test.go new file mode 100644 index 0000000000..394cabdb55 --- /dev/null +++ b/pulsar/internal/batch_builder_test.go @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "bytes" + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar/internal/compression" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" +) + +func TestBatchContainer_IsFull(t *testing.T) { + batcher, err := NewBatchBuilder( + 10, + 1000, + 10000, + "test", + 1, + pb.CompressionType_NONE, + compression.Level(0), + &bufferPoolImpl{}, + NewMetricsProvider(2, map[string]string{}, prometheus.DefaultRegisterer), + log.NewLoggerWithLogrus(logrus.StandardLogger()), + &mockEncryptor{}, + NewSequenceIDGenerator(1), + ) + if err != nil { + assert.Fail(t, "Failed to create batcher") + } + + f := func(payload []byte) bool { + return batcher.Add(&pb.SingleMessageMetadata{ + PayloadSize: proto.Int32(123), + }, payload, nil, nil, time.Now(), + nil, false, false, 0, 0, nil) + } + + // maxMessages + for i := 0; i < 9; i++ { + f([]byte("test")) + assert.False(t, batcher.IsFull()) + } + f([]byte("test")) + assert.True(t, batcher.IsFull()) + + batcher.Flush() + assert.False(t, batcher.IsFull()) + + // maxBatchSize + f(bytes.Repeat([]byte("a"), 1000)) + assert.True(t, batcher.IsFull()) +} diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go index d8083f08cf..f6a8c04f56 100644 --- a/pulsar/internal/key_based_batch_builder.go +++ b/pulsar/internal/key_based_batch_builder.go @@ -87,13 +87,14 @@ func NewKeyBasedBatchBuilder( maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor, + sequenceIDGenerator *SequenceIDGenerator, ) (BatchBuilder, error) { bb := &keyBasedBatchContainer{ batches: newKeyBasedBatches(), batchContainer: newBatchContainer( maxMessages, maxBatchSize, maxMessageSize, producerName, producerID, - compressionType, level, bufferPool, metrics, logger, encryptor, + compressionType, level, bufferPool, metrics, logger, encryptor, sequenceIDGenerator, ), compressionType: compressionType, level: level, @@ -108,7 +109,9 @@ func NewKeyBasedBatchBuilder( // IsFull checks if the size in the current batch meets or exceeds the maximum size allowed by the batch func (bc *keyBasedBatchContainer) IsFull() bool { - return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize) + return bc.numMessages >= bc.maxMessages || + bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize) || + bc.estimatedSize >= uint32(bc.maxBatchSize) } func (bc *keyBasedBatchContainer) IsMultiBatches() bool { @@ -123,18 +126,20 @@ func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool { return true } msgSize := uint32(len(payload)) - return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize) + return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize) && + bc.estimatedSize+msgSize <= uint32(bc.maxBatchSize) } // Add will add single message to key-based batch with message key. func (bc *keyBasedBatchContainer) Add( - metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64, + metadata *pb.SingleMessageMetadata, payload []byte, callback interface{}, replicateTo []string, deliverAt time.Time, schemaVersion []byte, multiSchemaEnabled bool, useTxn bool, mostSigBits uint64, leastSigBits uint64, + customSequenceID *int64, ) bool { if replicateTo != nil && bc.numMessages != 0 { // If the current batch is not empty and we're trying to set the replication clusters, @@ -155,7 +160,7 @@ func (bc *keyBasedBatchContainer) Add( // create batchContainer for new key t := newBatchContainer( bc.maxMessages, bc.maxBatchSize, bc.maxMessageSize, bc.producerName, bc.producerID, - bc.compressionType, bc.level, bc.buffersPool, bc.metrics, bc.log, bc.encryptor, + bc.compressionType, bc.level, bc.buffersPool, bc.metrics, bc.log, bc.encryptor, bc.sequenceIDGenerator, ) batchPart = &t bc.batches.Add(msgKey, &t) @@ -163,14 +168,14 @@ func (bc *keyBasedBatchContainer) Add( // add message to batch container add := batchPart.Add( - metadata, sequenceIDGenerator, payload, callback, replicateTo, + metadata, payload, callback, replicateTo, deliverAt, - schemaVersion, multiSchemaEnabled, useTxn, mostSigBits, leastSigBits, + schemaVersion, multiSchemaEnabled, useTxn, mostSigBits, leastSigBits, customSequenceID, ) if !add { return false } - addSingleMessageToBatch(bc.buffer, metadata, payload) + bc.messageBatch.Append(metadata, payload, customSequenceID) bc.numMessages++ bc.callbacks = append(bc.callbacks, callback) diff --git a/pulsar/internal/key_based_batch_builder_test.go b/pulsar/internal/key_based_batch_builder_test.go index 10092327a2..1e1ea296ab 100644 --- a/pulsar/internal/key_based_batch_builder_test.go +++ b/pulsar/internal/key_based_batch_builder_test.go @@ -51,19 +51,19 @@ func TestKeyBasedBatcherOrdering(t *testing.T) { NewMetricsProvider(2, map[string]string{}, prometheus.DefaultRegisterer), log.NewLoggerWithLogrus(logrus.StandardLogger()), &mockEncryptor{}, + NewSequenceIDGenerator(1), ) if err != nil { assert.Fail(t, "Failed to create key based batcher") } - sequenceID := uint64(0) for i := 0; i < 10; i++ { metadata := &pb.SingleMessageMetadata{ OrderingKey: []byte(fmt.Sprintf("key-%d", i)), PayloadSize: proto.Int32(0), } - assert.True(t, keyBatcher.Add(metadata, &sequenceID, []byte("test"), nil, nil, time.Now(), - nil, false, false, 0, 0)) + assert.True(t, keyBatcher.Add(metadata, []byte("test"), nil, nil, time.Now(), + nil, false, false, 0, 0, nil)) } batches := keyBatcher.FlushBatches() diff --git a/pulsar/internal/sequence_id_generator.go b/pulsar/internal/sequence_id_generator.go new file mode 100644 index 0000000000..72dd0c8380 --- /dev/null +++ b/pulsar/internal/sequence_id_generator.go @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +// SequenceIDGenerator generates sequential IDs in a simple, non-thread-safe manner +// without overflow handling or interface abstraction. +// The sequenceID lifecycle (generation -> local storage -> network transmission) must be +// serialized to ensure consistency. +type SequenceIDGenerator struct { + nextSequenceID uint64 +} + +func NewSequenceIDGenerator(nextSequenceID uint64) *SequenceIDGenerator { + return &SequenceIDGenerator{nextSequenceID: nextSequenceID} +} + +func (s *SequenceIDGenerator) Next() uint64 { + current := s.nextSequenceID + s.nextSequenceID++ + return current +} diff --git a/pulsar/internal/utils.go b/pulsar/internal/utils.go index 2dc8210147..9cbc97cd91 100644 --- a/pulsar/internal/utils.go +++ b/pulsar/internal/utils.go @@ -20,7 +20,6 @@ package internal import ( "strconv" "strings" - "sync/atomic" "time" "google.golang.org/protobuf/proto" @@ -37,16 +36,6 @@ func TimestampMillis(t time.Time) uint64 { return uint64(t.UnixNano()) / uint64(time.Millisecond) } -// GetAndAdd perform atomic read and update -func GetAndAdd(n *uint64, diff uint64) uint64 { - for { - v := atomic.LoadUint64(n) - if atomic.CompareAndSwapUint64(n, v, v+diff) { - return v - } - } -} - func ParseRelativeTimeInSeconds(relativeTime string) (time.Duration, error) { if relativeTime == "" { return -1, errors.New("time can not be empty") diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index c2e1113df3..7c46ee2753 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -550,7 +550,6 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { mm.NumChunksFromMsg = proto.Int32(int32(totalChunks)) mm.TotalChunkMsgSize = proto.Int32(int32(len(wholePayload))) mm.ChunkId = proto.Int32(int32(chunkID)) - producerImpl.updateMetadataSeqID(mm, msg) producerImpl.internalSingleSend( mm, msg.Payload, @@ -564,7 +563,6 @@ func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { flushImmediately: true, totalChunks: totalChunks, chunkID: chunkID, - uuid: uuid, chunkRecorder: newChunkRecorder(), uncompressedPayload: wholePayload, uncompressedSize: int64(len(wholePayload)), diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 9841222fcd..00a28db759 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -105,7 +105,7 @@ type partitionProducer struct { userProvidedProducerName bool producerID uint64 batchBuilder internal.BatchBuilder - sequenceIDGenerator *uint64 + sequenceIDGenerator *internal.SequenceIDGenerator batchFlushTicker *time.Ticker encryptor internalcrypto.Encryptor compressionProvider compression.Provider @@ -353,8 +353,7 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { } if p.sequenceIDGenerator == nil { - nextSequenceID := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1) - p.sequenceIDGenerator = &nextSequenceID + p.sequenceIDGenerator = internal.NewSequenceIDGenerator(uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)) } schemaVersion := res.Response.ProducerSuccess.GetSchemaVersion() @@ -374,7 +373,8 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { buffersPool, p.client.metrics, p.log, - p.encryptor) + p.encryptor, + p.sequenceIDGenerator) if err != nil { return err } @@ -643,8 +643,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { } var lhs, rhs int - uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*sr.mm.SequenceId, 10)) - sr.mm.Uuid = proto.String(uuid) sr.mm.NumChunksFromMsg = proto.Int32(int32(sr.totalChunks)) sr.mm.TotalChunkMsgSize = proto.Int32(int32(sr.compressedSize)) cr := newChunkRecorder() @@ -667,7 +665,6 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { flushImmediately: sr.flushImmediately, totalChunks: sr.totalChunks, chunkID: chunkID, - uuid: uuid, chunkRecorder: cr, transaction: sr.transaction, memLimit: sr.memLimit, @@ -704,9 +701,9 @@ func addRequestToBatch(smm *pb.SingleMessageMetadata, p *partitionProducer, leastSigBits = txnID.LeastSigBits } - return p.batchBuilder.Add(smm, p.sequenceIDGenerator, uncompressedPayload, request, + return p.batchBuilder.Add(smm, uncompressedPayload, request, msg.ReplicationClusters, deliverAt, schemaVersion, multiSchemaEnabled, useTxn, mostSigBits, - leastSigBits) + leastSigBits, msg.SequenceID) } func (p *partitionProducer) genMetadata(msg *ProducerMessage, @@ -742,22 +739,6 @@ func (p *partitionProducer) genMetadata(msg *ProducerMessage, return } -func (p *partitionProducer) updateMetadataSeqID(mm *pb.MessageMetadata, msg *ProducerMessage) { - if msg.SequenceID != nil { - mm.SequenceId = proto.Uint64(uint64(*msg.SequenceID)) - } else { - mm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1)) - } -} - -func (p *partitionProducer) updateSingleMessageMetadataSeqID(smm *pb.SingleMessageMetadata, msg *ProducerMessage) { - if msg.SequenceID != nil { - smm.SequenceId = proto.Uint64(uint64(*msg.SequenceID)) - } else { - smm.SequenceId = proto.Uint64(internal.GetAndAdd(p.sequenceIDGenerator, 1)) - } -} - func (p *partitionProducer) genSingleMessageMetadataInBatch( msg *ProducerMessage, uncompressedSize int, @@ -782,8 +763,6 @@ func (p *partitionProducer) genSingleMessageMetadataInBatch( smm.Properties = internal.ConvertFromStringMap(msg.Properties) } - p.updateSingleMessageMetadataSeqID(smm, msg) - return } @@ -803,7 +782,23 @@ func (p *partitionProducer) internalSingleSend( bufferCount.Inc() buffer.SetReleaseCallback(func() { bufferCount.Dec() }) - sid := *mm.SequenceId + var sequenceID uint64 + if sr.mm.SequenceId != nil { + sequenceID = *sr.mm.SequenceId + } else { + if msg.SequenceID != nil { + sequenceID = uint64(*msg.SequenceID) + } else { + sequenceID = p.sequenceIDGenerator.Next() + } + sr.mm.SequenceId = proto.Uint64(sequenceID) + } + + if sr.totalChunks > 1 { + uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(sequenceID, 10)) + sr.mm.Uuid = proto.String(uuid) + } + var useTxn bool var mostSigBits uint64 var leastSigBits uint64 @@ -818,7 +813,7 @@ func (p *partitionProducer) internalSingleSend( err := internal.SingleSend( buffer, p.producerID, - sid, + sequenceID, mm, payloadBuf, p.encryptor, @@ -834,7 +829,7 @@ func (p *partitionProducer) internalSingleSend( return } - p.writeData(buffer, sid, []interface{}{sr}) + p.writeData(buffer, sequenceID, []interface{}{sr}) } type pendingItem struct { @@ -1236,12 +1231,6 @@ func (p *partitionProducer) updateMetaData(sr *sendRequest) { sr.msg.ReplicationClusters == nil && deliverAt.UnixNano() < 0 - if !sr.sendAsBatch { - // update sequence id for metadata, make the size of msgMetadata more accurate - // batch sending will update sequence ID in the BatchBuilder - p.updateMetadataSeqID(sr.mm, sr.msg) - } - sr.deliverAt = deliverAt } @@ -1585,7 +1574,6 @@ type sendRequest struct { flushImmediately bool totalChunks int chunkID int - uuid string chunkRecorder *chunkRecorder // resource management