diff --git a/.gitignore b/.gitignore index bfa04f037f..c1a09dc724 100644 --- a/.gitignore +++ b/.gitignore @@ -11,8 +11,8 @@ *.out coverage.html -perf/perf -pulsar-perf -bin - +perf/perf/ +pulsar-perf/ +bin/ +logs/ vendor/ diff --git a/pulsar/backoff/backoff.go b/pulsar/backoff/backoff.go index e3cf25c0a3..2db6ad02d3 100644 --- a/pulsar/backoff/backoff.go +++ b/pulsar/backoff/backoff.go @@ -43,10 +43,13 @@ type DefaultBackoff struct { } func NewDefaultBackoff() Policy { - return &DefaultBackoff{rnd: rand.New(rand.NewSource(time.Now().UnixNano()))} + return NewDefaultBackoffWithInitialBackOff(0) } func NewDefaultBackoffWithInitialBackOff(backoff time.Duration) Policy { - return &DefaultBackoff{backoff: backoff / 2} + return &DefaultBackoff{ + rnd: rand.New(rand.NewSource(time.Now().UnixNano())), + backoff: backoff / 2, + } } const maxBackoff = 60 * time.Second diff --git a/pulsar/backoff/backoff_test.go b/pulsar/backoff/backoff_test.go index fc0a49232b..05791aea4a 100644 --- a/pulsar/backoff/backoff_test.go +++ b/pulsar/backoff/backoff_test.go @@ -25,14 +25,14 @@ import ( ) func TestBackoff_NextMinValue(t *testing.T) { - backoff := &DefaultBackoff{} + backoff := NewDefaultBackoff() delay := backoff.Next() assert.GreaterOrEqual(t, int64(delay), int64(100*time.Millisecond)) assert.LessOrEqual(t, int64(delay), int64(120*time.Millisecond)) } func TestBackoff_NextExponentialBackoff(t *testing.T) { - backoff := &DefaultBackoff{} + backoff := NewDefaultBackoff() previousDelay := backoff.Next() // the last value before capping to the max value is 51.2 s (.1, .2, .4, .8, 1.6, 3.2, 6.4, 12.8, 25.6, 51.2) for previousDelay < 51*time.Second { @@ -47,7 +47,7 @@ func TestBackoff_NextExponentialBackoff(t *testing.T) { } func TestBackoff_NextMaxValue(t *testing.T) { - backoff := &DefaultBackoff{} + backoff := NewDefaultBackoff() var delay time.Duration for delay < maxBackoff { delay = backoff.Next() diff --git a/pulsar/consumer_multitopic_test.go b/pulsar/consumer_multitopic_test.go index 30ae5ccd17..2b1ebeef18 100644 --- a/pulsar/consumer_multitopic_test.go +++ b/pulsar/consumer_multitopic_test.go @@ -332,7 +332,7 @@ func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error { return nil } -func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) { +func (dummyConnection) WriteData(_ context.Context, _ *internal.SharedBuffer) { } func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error { diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 1074be82ce..d7b1a52306 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -30,7 +30,7 @@ import ( ) type BuffersPool interface { - GetBuffer() Buffer + GetBuffer() *SharedBuffer } // BatcherBuilderProvider defines func which returns the BatchBuilder. @@ -71,7 +71,7 @@ type BatchBuilder interface { } type FlushBatch struct { - BatchData Buffer + BatchData *SharedBuffer SequenceID uint64 Callbacks []interface{} Error error @@ -271,7 +271,7 @@ func (bc *batchContainer) Flush() *FlushBatch { buffer := bc.buffersPool.GetBuffer() if buffer == nil { - buffer = NewBuffer(int(uncompressedSize * 3 / 2)) + buffer = NewSharedBuffer(NewBuffer(int(uncompressedSize * 3 / 2))) } sequenceID := uint64(0) diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go index b3e23fbeb6..6019697b4c 100644 --- a/pulsar/internal/buffer.go +++ b/pulsar/internal/buffer.go @@ -19,10 +19,18 @@ package internal import ( "encoding/binary" + "sync" + "sync/atomic" log "github.com/sirupsen/logrus" ) +var defaultBufferPool BufferPool = newBufferPool(&sync.Pool{}) + +func GetDefaultBufferPool() BufferPool { + return defaultBufferPool +} + // Buffer is a variable-sized buffer of bytes with Read and Write methods. // The zero value for Buffer is an empty buffer ready to use. type Buffer interface { @@ -217,3 +225,62 @@ func (b *buffer) Clear() { b.readerIdx = 0 b.writerIdx = 0 } + +type BufferPool interface { + // Get returns a cleared buffer if any is available, otherwise nil. + Get() *SharedBuffer + + // Put puts the buffer back to the pool and available for other routines. + Put(*SharedBuffer) +} + +type synchronizedBufferPool struct { + pool *sync.Pool +} + +var _ BufferPool = synchronizedBufferPool{} + +func newBufferPool(pool *sync.Pool) synchronizedBufferPool { + return synchronizedBufferPool{pool: pool} +} + +func (p synchronizedBufferPool) Get() *SharedBuffer { + buffer, ok := p.pool.Get().(Buffer) + if ok { + buffer.Clear() + return NewSharedBuffer(buffer) + } + return nil +} + +func (p synchronizedBufferPool) Put(buffer *SharedBuffer) { + if !buffer.isCurrentlyShared.CompareAndSwap(true, false) { + // When swapping fails, it means that a previous go routine has already + // tried to recycle the buffer, indicating that the sharing is not there + // anymore and the buffer can safely be reused. + p.pool.Put(buffer) + } +} + +// SharedBuffer is a wrapper around a buffer that can keep track of a shared +// reference of the buffer to later recycle it only when both references are +// done with it. +type SharedBuffer struct { + Buffer + isCurrentlyShared atomic.Bool +} + +func NewSharedBuffer(buffer Buffer) *SharedBuffer { + return &SharedBuffer{Buffer: buffer} +} + +// Retain creates a new shared buffer backed by the same buffer but that will be +// retained by the pool until the shared reference is also done. +func (b *SharedBuffer) Retain() *SharedBuffer { + newBuffer := &SharedBuffer{ + Buffer: b.Buffer, + isCurrentlyShared: atomic.Bool{}, + } + newBuffer.isCurrentlyShared.Store(true) + return newBuffer +} diff --git a/pulsar/internal/buffer_test.go b/pulsar/internal/buffer_test.go index 8edc60c169..808f54260e 100644 --- a/pulsar/internal/buffer_test.go +++ b/pulsar/internal/buffer_test.go @@ -18,6 +18,7 @@ package internal import ( + "sync" "testing" "github.com/stretchr/testify/assert" @@ -34,3 +35,67 @@ func TestBuffer(t *testing.T) { assert.Equal(t, uint32(1019), b.WritableBytes()) assert.Equal(t, uint32(1024), b.Capacity()) } + +func TestSynchronizedBufferPool_Get_returnsNilWhenEmpty(t *testing.T) { + pool := newBufferPool(&sync.Pool{}) + assert.Nil(t, pool.Get()) +} + +func TestSynchronizedBufferPool_Put_marksBufferAsNotSharedAnymore(t *testing.T) { + buffer := NewSharedBuffer(NewBuffer(1024)).Retain() + assert.True(t, buffer.isCurrentlyShared.Load()) + + pool := newBufferPool(&sync.Pool{}) + pool.Put(buffer) + assert.False(t, buffer.isCurrentlyShared.Load()) +} + +func TestSynchronizedBufferPool_Put_recyclesSharedBuffer(t *testing.T) { + pool := newBufferPool(&sync.Pool{}) + + for range 100 { + buffer := NewSharedBuffer(NewBuffer(1024)).Retain() + pool.Put(buffer) + pool.Put(buffer) + + if res := pool.Get(); res != nil { + return + } + } + + t.Fatal("pool is not recycling buffers") +} + +func TestSynchronizedBufferPool_Put_recyclesBuffer(t *testing.T) { + pool := newBufferPool(&sync.Pool{}) + + for range 100 { + buffer := NewSharedBuffer(NewBuffer(1024)) + pool.Put(buffer) + + if res := pool.Get(); res != nil { + return + } + } + + t.Fatal("pool is not recycling buffers") +} + +// --- Helpers + +type capturingPool struct { + buffers []*SharedBuffer +} + +func (p *capturingPool) Get() *SharedBuffer { + if len(p.buffers) > 0 { + value := p.buffers[0] + p.buffers = p.buffers[1:] + return value + } + return nil +} + +func (p *capturingPool) Put(value *SharedBuffer) { + p.buffers = append(p.buffers, value) +} diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 2ad1acb5fd..1ceafc9619 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -79,7 +79,7 @@ type ConnectionListener interface { type Connection interface { SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error)) SendRequestNoWait(req *pb.BaseCommand) error - WriteData(ctx context.Context, data Buffer) + WriteData(ctx context.Context, data *SharedBuffer) RegisterListener(id uint64, listener ConnectionListener) error UnregisterListener(id uint64) AddConsumeHandler(id uint64, handler ConsumerHandler) error @@ -132,7 +132,7 @@ type request struct { type dataRequest struct { ctx context.Context - data Buffer + data *SharedBuffer } type connection struct { @@ -150,6 +150,7 @@ type connection struct { logicalAddr *url.URL physicalAddr *url.URL + bufferPool BufferPool writeBufferLock sync.Mutex writeBuffer Buffer reader *connectionReader @@ -199,6 +200,7 @@ func newConnection(opts connectionOptions) *connection { keepAliveInterval: opts.keepAliveInterval, logicalAddr: opts.logicalAddr, physicalAddr: opts.physicalAddr, + bufferPool: GetDefaultBufferPool(), writeBuffer: NewBuffer(4096), log: opts.logger.SubLogger(log.Fields{"remote_addr": opts.physicalAddr}), pendingReqs: make(map[uint64]*request), @@ -394,6 +396,18 @@ func (c *connection) failLeftRequestsWhenClose() { } } +func (c *connection) drainWriteRequests() { + for { + select { + case req := <-c.writeRequestsCh: + c.bufferPool.Put(req.data) + + default: + return + } + } +} + func (c *connection) run() { pingSendTicker := time.NewTicker(c.keepAliveInterval) pingCheckTicker := time.NewTicker(c.keepAliveInterval) @@ -421,6 +435,7 @@ func (c *connection) run() { select { case <-c.closeCh: c.failLeftRequestsWhenClose() + c.drainWriteRequests() return case req := <-c.incomingRequestsCh: if req == nil { @@ -433,6 +448,9 @@ func (c *connection) run() { } c.internalWriteData(req.ctx, req.data) + // Write request is fully processed so we can release the buffer. + c.bufferPool.Put(req.data) + case <-pingSendTicker.C: c.sendPing() } @@ -456,10 +474,19 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) { } } -func (c *connection) WriteData(ctx context.Context, data Buffer) { +func (c *connection) WriteData(ctx context.Context, data *SharedBuffer) { + written := false + defer func() { + if !written { + // Buffer has failed to be written and can now be reused. + c.bufferPool.Put(data) + } + }() + select { case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}: // Channel is not full + written = true return case <-ctx.Done(): c.log.Debug("Write data context cancelled") @@ -472,6 +499,7 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) { select { case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}: // Successfully wrote on the channel + written = true return case <-ctx.Done(): c.log.Debug("Write data context cancelled") diff --git a/pulsar/internal/connection_test.go b/pulsar/internal/connection_test.go new file mode 100644 index 0000000000..305ae1093a --- /dev/null +++ b/pulsar/internal/connection_test.go @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package internal + +import ( + "context" + "io" + "net" + "testing" + "time" + + "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" +) + +func TestConnection_WriteData_recyclesBufferOnContextCanceled(t *testing.T) { + pool := &capturingPool{} + c := makeTestConnection() + c.writeRequestsCh = make(chan *dataRequest) + c.bufferPool = pool + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + c.WriteData(ctx, NewSharedBuffer(NewBuffer(1024))) + assert.NotNil(t, pool.Get()) +} + +func TestConnection_WriteData_recyclesBufferOnConnectionClosed(t *testing.T) { + pool := &capturingPool{} + c := makeTestConnection() + c.writeRequestsCh = make(chan *dataRequest) + c.state.Store(connectionClosed) + c.bufferPool = pool + + c.WriteData(context.Background(), NewSharedBuffer(NewBuffer(1024))) + assert.NotNil(t, pool.Get()) +} + +func TestConnection_WriteData_doNotRecyclesBufferWhenWritten(t *testing.T) { + pool := &capturingPool{} + c := makeTestConnection() + c.bufferPool = pool + + c.WriteData(context.Background(), NewSharedBuffer(NewBuffer(1024))) + assert.Nil(t, pool.Get()) +} + +func TestConnection_run_recyclesBufferOnceDone(t *testing.T) { + pool := &capturingPool{} + c := makeTestConnection() + c.bufferPool = pool + + c.writeRequestsCh <- &dataRequest{ctx: context.Background(), data: NewSharedBuffer(NewBuffer(1024))} + close(c.writeRequestsCh) + + c.run() + assert.NotNil(t, pool.Get()) +} + +func TestConnection_run_recyclesBufferEvenOnContextCanceled(t *testing.T) { + pool := &capturingPool{} + c := makeTestConnection() + c.bufferPool = pool + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + c.writeRequestsCh <- &dataRequest{ctx: ctx, data: NewSharedBuffer(NewBuffer(1024))} + close(c.writeRequestsCh) + + c.run() + assert.NotNil(t, pool.Get()) +} + +// --- Helpers + +func makeTestConnection() *connection { + c := &connection{ + log: log.DefaultNopLogger(), + keepAliveInterval: 30 * time.Second, + writeRequestsCh: make(chan *dataRequest, 10), + closeCh: make(chan struct{}), + cnx: happyCnx{}, + metrics: NewMetricsProvider(0, make(map[string]string), prometheus.DefaultRegisterer), + bufferPool: GetDefaultBufferPool(), + } + c.reader = newConnectionReader(c) + c.state.Store(connectionReady) + return c +} + +type happyCnx struct{} + +func (happyCnx) Read(_ []byte) (n int, err error) { + return 0, io.EOF +} + +func (happyCnx) Write(b []byte) (n int, err error) { + return len(b), nil +} + +func (happyCnx) Close() error { + return nil +} + +func (happyCnx) LocalAddr() net.Addr { + return &net.IPAddr{IP: net.ParseIP("127.0.0.1")} +} + +func (happyCnx) RemoteAddr() net.Addr { + return &net.IPAddr{IP: net.ParseIP("127.0.0.1")} +} + +func (happyCnx) SetDeadline(_ time.Time) error { + return nil +} + +func (happyCnx) SetReadDeadline(_ time.Time) error { + return nil +} + +func (happyCnx) SetWriteDeadline(_ time.Time) error { + return nil +} diff --git a/pulsar/internal/key_based_batch_builder_test.go b/pulsar/internal/key_based_batch_builder_test.go index 9df33b0780..4f53b93056 100644 --- a/pulsar/internal/key_based_batch_builder_test.go +++ b/pulsar/internal/key_based_batch_builder_test.go @@ -33,7 +33,7 @@ import ( type mockBufferPool struct { } -func (m *mockBufferPool) GetBuffer() Buffer { +func (m *mockBufferPool) GetBuffer() *SharedBuffer { return nil } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index a4c2e3f8c4..38cc16f0d4 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -69,7 +69,6 @@ var ( ErrProducerBlockedQuotaExceeded = newError(ProducerBlockedQuotaExceededException, "producer blocked") ErrProducerFenced = newError(ProducerFenced, "producer fenced") - buffersPool sync.Pool sendRequestPool *sync.Pool ) @@ -125,6 +124,7 @@ type partitionProducer struct { ctx context.Context cancelFunc context.CancelFunc backOffPolicyFunc func() backoff.Policy + bufferPool internal.BufferPool } type schemaCache struct { @@ -199,6 +199,7 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions ctx: ctx, cancelFunc: cancelFunc, backOffPolicyFunc: boFunc, + bufferPool: internal.GetDefaultBufferPool(), } if p.options.DisableBatching { p.batchFlushTicker.Stop() @@ -392,6 +393,9 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error { // when resending pending batches, we update the sendAt timestamp to record the metric. pi.Lock() pi.sentAt = time.Now() + // Buffer is sent again to the connection to be processed in an + // asynchronous routine so we need to retain it again. + pi.buffer = pi.buffer.Retain() pi.Unlock() p.pendingQueue.Put(pi) p._getConn().WriteData(pi.ctx, pi.buffer) @@ -412,12 +416,8 @@ func (cc *connectionClosed) HasURL() bool { return len(cc.assignedBrokerURL) > 0 } -func (p *partitionProducer) GetBuffer() internal.Buffer { - b, ok := buffersPool.Get().(internal.Buffer) - if ok { - b.Clear() - } - return b +func (p *partitionProducer) GetBuffer() *internal.SharedBuffer { + return p.bufferPool.Get() } func (p *partitionProducer) ConnectionClosed(closeProducer *pb.CommandCloseProducer) { @@ -798,7 +798,7 @@ func (p *partitionProducer) internalSingleSend( buffer := p.GetBuffer() if buffer == nil { - buffer = internal.NewBuffer(int(payloadBuf.ReadableBytes() * 3 / 2)) + buffer = internal.NewSharedBuffer(internal.NewBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))) } sid := *mm.SequenceId @@ -839,7 +839,7 @@ type pendingItem struct { sync.Mutex ctx context.Context cancel context.CancelFunc - buffer internal.Buffer + buffer *internal.SharedBuffer sequenceID uint64 createdAt time.Time sentAt time.Time @@ -886,7 +886,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() { p.writeData(batchData, sequenceID, callbacks) } -func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, callbacks []interface{}) { +func (p *partitionProducer) writeData(buffer *internal.SharedBuffer, sequenceID uint64, callbacks []interface{}) { select { case <-p.ctx.Done(): for _, cb := range callbacks { @@ -896,6 +896,11 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, } return default: + // As the buffer will be sent to the connection in an asynchronous + // routine, we need to announce it to recycle later only when we are + // done in both the connection and the producer. + buffer = buffer.Retain() + now := time.Now() ctx, cancel := context.WithCancel(context.Background()) p.pendingQueue.Put(&pendingItem{ @@ -907,6 +912,7 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, sequenceID: sequenceID, sendRequests: callbacks, }) + p._getConn().WriteData(ctx, buffer) } } @@ -991,7 +997,7 @@ func (p *partitionProducer) failTimeoutMessages() { } // flag the sending has completed with error, flush make no effect - pi.done(ErrSendTimeout) + pi.done(ErrSendTimeout, p.bufferPool) pi.Unlock() // finally reached the last view item, current iteration ends @@ -1427,7 +1433,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) } // Mark this pending item as done - pi.done(nil) + pi.done(nil, p.bufferPool) } func (p *partitionProducer) internalClose(req *closeProducer) { @@ -1498,7 +1504,7 @@ func (p *partitionProducer) failPendingMessages(err error) { } // flag the sending has completed with error, flush make no effect - pi.done(err) + pi.done(err, p.bufferPool) pi.Unlock() // finally reached the last view item, current iteration ends @@ -1737,14 +1743,14 @@ type flushRequest struct { err error } -func (i *pendingItem) done(err error) { +func (i *pendingItem) done(err error, bufferPool internal.BufferPool) { if i.isDone { return } i.isDone = true // return the buffer to the pool after all callbacks have been called. - defer buffersPool.Put(i.buffer) + defer bufferPool.Put(i.buffer) if i.flushCallback != nil { i.flushCallback(err) } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 00cb6db558..f136f8d05f 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2607,3 +2607,32 @@ func TestSelectConnectionForSameProducer(t *testing.T) { client.Close() } + +func TestProducerPutsBufferBackInPool(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + testProducer, err := client.CreateProducer(ProducerOptions{Topic: newTopicName()}) + assert.NoError(t, err) + defer testProducer.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + for range 100 { + // Due to the sync.Pool not keeping all buffers, we need a few to make + // sure there is at least one available at the end + _, err = testProducer.Send(ctx, &ProducerMessage{Payload: []byte{42}}) + assert.NoError(t, err) + + if buffer := internal.GetDefaultBufferPool().Get(); buffer != nil { + // Ok! + return + } + } + + t.Fatal("no buffer has been recycled") +}