Skip to content

Commit 6768377

Browse files
committed
Adapt buffer pool to recycle buffers only when safe
1 parent e88d02a commit 6768377

File tree

9 files changed

+117
-63
lines changed

9 files changed

+117
-63
lines changed

pulsar/consumer_multitopic_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error {
332332
return nil
333333
}
334334

335-
func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) {
335+
func (dummyConnection) WriteData(_ context.Context, _ *internal.SharedBuffer) {
336336
}
337337

338338
func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error {

pulsar/internal/batch_builder.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
)
3131

3232
type BuffersPool interface {
33-
GetBuffer() Buffer
33+
GetBuffer() *SharedBuffer
3434
}
3535

3636
// BatcherBuilderProvider defines func which returns the BatchBuilder.
@@ -71,7 +71,7 @@ type BatchBuilder interface {
7171
}
7272

7373
type FlushBatch struct {
74-
BatchData Buffer
74+
BatchData *SharedBuffer
7575
SequenceID uint64
7676
Callbacks []interface{}
7777
Error error
@@ -271,7 +271,7 @@ func (bc *batchContainer) Flush() *FlushBatch {
271271

272272
buffer := bc.buffersPool.GetBuffer()
273273
if buffer == nil {
274-
buffer = NewBuffer(int(uncompressedSize * 3 / 2))
274+
buffer = NewSharedBuffer(NewBuffer(int(uncompressedSize * 3 / 2)))
275275
}
276276

277277
sequenceID := uint64(0)

pulsar/internal/buffer.go

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package internal
2020
import (
2121
"encoding/binary"
2222
"sync"
23+
"sync/atomic"
2324

2425
log "github.com/sirupsen/logrus"
2526
)
@@ -227,14 +228,10 @@ func (b *buffer) Clear() {
227228

228229
type BufferPool interface {
229230
// Get returns a cleared buffer if any is available, otherwise nil.
230-
Get() Buffer
231+
Get() *SharedBuffer
231232

232233
// Put puts the buffer back to the pool and available for other routines.
233-
Put(Buffer)
234-
235-
// Clone attempts to create a clone using a buffer from the pool, or returns
236-
// a new one if necessary.
237-
Clone(Buffer) Buffer
234+
Put(*SharedBuffer)
238235
}
239236

240237
type synchronizedBufferPool struct {
@@ -247,23 +244,40 @@ func newBufferPool(pool *sync.Pool) synchronizedBufferPool {
247244
return synchronizedBufferPool{pool: pool}
248245
}
249246

250-
func (p synchronizedBufferPool) Get() Buffer {
247+
func (p synchronizedBufferPool) Get() *SharedBuffer {
251248
buffer, ok := p.pool.Get().(Buffer)
252249
if ok {
253250
buffer.Clear()
251+
return NewSharedBuffer(buffer)
252+
}
253+
return nil
254+
}
255+
256+
func (p synchronizedBufferPool) Put(buffer *SharedBuffer) {
257+
if !buffer.isCurrentlyShared.CompareAndSwap(true, false) {
258+
p.pool.Put(buffer)
254259
}
255-
return buffer
256260
}
257261

258-
func (p synchronizedBufferPool) Put(buffer Buffer) {
259-
p.pool.Put(buffer)
262+
// SharedBuffer is a wrapper around a buffer that can keep track of a shared
263+
// reference of the buffer to later recycle it only when both references are
264+
// done with it.
265+
type SharedBuffer struct {
266+
Buffer
267+
isCurrentlyShared atomic.Bool
268+
}
269+
270+
func NewSharedBuffer(buffer Buffer) *SharedBuffer {
271+
return &SharedBuffer{Buffer: buffer}
260272
}
261273

262-
func (p synchronizedBufferPool) Clone(b Buffer) Buffer {
263-
newBuffer := p.Get()
264-
if newBuffer == nil {
265-
newBuffer = &buffer{}
274+
// Retain creates a new shared buffer backed by the same buffer but that will be
275+
// retained by the pool until the shared reference is also done.
276+
func (b *SharedBuffer) Retain() *SharedBuffer {
277+
newBuffer := &SharedBuffer{
278+
Buffer: b.Buffer,
279+
isCurrentlyShared: atomic.Bool{},
266280
}
267-
newBuffer.Write(b.ReadableSlice())
281+
newBuffer.isCurrentlyShared.Store(true)
268282
return newBuffer
269283
}

pulsar/internal/buffer_test.go

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -36,52 +36,58 @@ func TestBuffer(t *testing.T) {
3636
assert.Equal(t, uint32(1024), b.Capacity())
3737
}
3838

39-
func TestSynchronizedBufferPool_Clone_returnsNewlyAllocatedBuffer(t *testing.T) {
39+
func TestSynchronizedBufferPool_Get_returnsNilWhenEmpty(t *testing.T) {
4040
pool := newBufferPool(&sync.Pool{})
41+
assert.Nil(t, pool.Get())
42+
}
4143

42-
buffer := NewBuffer(1024)
43-
buffer.Write([]byte{1, 2, 3})
44+
func TestSynchronizedBufferPool_Put_marksBufferAsNotSharedAnymore(t *testing.T) {
45+
buffer := NewSharedBuffer(NewBuffer(1024)).Retain()
46+
assert.True(t, buffer.isCurrentlyShared.Load())
4447

45-
res := pool.Clone(buffer)
46-
assert.Equal(t, []byte{1, 2, 3}, res.ReadableSlice())
48+
pool := newBufferPool(&sync.Pool{})
49+
pool.Put(buffer)
50+
assert.False(t, buffer.isCurrentlyShared.Load())
4751
}
4852

49-
func TestSynchronizedBufferPool_Clone_returnsRecycledBuffer(t *testing.T) {
53+
func TestSynchronizedBufferPool_Put_recyclesSharedBuffer(t *testing.T) {
5054
pool := newBufferPool(&sync.Pool{})
5155

5256
for range 100 {
53-
buffer := NewBuffer(1024)
54-
buffer.Write([]byte{1, 2, 3})
57+
buffer := NewSharedBuffer(NewBuffer(1024)).Retain()
58+
pool.Put(buffer)
5559
pool.Put(buffer)
56-
}
5760

58-
buffer := NewBuffer(1024)
59-
buffer.Write([]byte{1, 2, 3})
61+
if res := pool.Get(); res != nil {
62+
return
63+
}
64+
}
6065

61-
res := pool.Clone(buffer)
62-
assert.Equal(t, []byte{1, 2, 3}, res.ReadableSlice())
66+
t.Fatal("pool is not recycling buffers")
6367
}
6468

65-
// BenchmarkBufferPool_Clone demonstrates the cloning of a buffer without
66-
// allocation if the pool is filled making the process very efficient.
67-
func BenchmarkBufferPool_Clone(b *testing.B) {
68-
pool := GetDefaultBufferPool()
69-
buffer := NewBuffer(1024)
70-
buffer.Write(make([]byte, 1024))
69+
func TestSynchronizedBufferPool_Put_recyclesBuffer(t *testing.T) {
70+
pool := newBufferPool(&sync.Pool{})
71+
72+
for range 100 {
73+
buffer := NewSharedBuffer(NewBuffer(1024))
74+
pool.Put(buffer)
7175

72-
for range b.N {
73-
newBuffer := pool.Clone(buffer)
74-
pool.Put(newBuffer)
76+
if res := pool.Get(); res != nil {
77+
return
78+
}
7579
}
80+
81+
t.Fatal("pool is not recycling buffers")
7682
}
7783

7884
// --- Helpers
7985

8086
type capturingPool struct {
81-
buffers []Buffer
87+
buffers []*SharedBuffer
8288
}
8389

84-
func (p *capturingPool) Get() Buffer {
90+
func (p *capturingPool) Get() *SharedBuffer {
8591
if len(p.buffers) > 0 {
8692
value := p.buffers[0]
8793
p.buffers = p.buffers[1:]
@@ -90,10 +96,6 @@ func (p *capturingPool) Get() Buffer {
9096
return nil
9197
}
9298

93-
func (p *capturingPool) Put(value Buffer) {
99+
func (p *capturingPool) Put(value *SharedBuffer) {
94100
p.buffers = append(p.buffers, value)
95101
}
96-
97-
func (p *capturingPool) Clone(value Buffer) Buffer {
98-
return value
99-
}

pulsar/internal/connection.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type ConnectionListener interface {
7979
type Connection interface {
8080
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
8181
SendRequestNoWait(req *pb.BaseCommand) error
82-
WriteData(ctx context.Context, data Buffer)
82+
WriteData(ctx context.Context, data *SharedBuffer)
8383
RegisterListener(id uint64, listener ConnectionListener) error
8484
UnregisterListener(id uint64)
8585
AddConsumeHandler(id uint64, handler ConsumerHandler) error
@@ -132,7 +132,7 @@ type request struct {
132132

133133
type dataRequest struct {
134134
ctx context.Context
135-
data Buffer
135+
data *SharedBuffer
136136
}
137137

138138
type connection struct {
@@ -474,7 +474,7 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) {
474474
}
475475
}
476476

477-
func (c *connection) WriteData(ctx context.Context, data Buffer) {
477+
func (c *connection) WriteData(ctx context.Context, data *SharedBuffer) {
478478
written := false
479479
defer func() {
480480
if !written {

pulsar/internal/connection_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestConnection_WriteData_recyclesBufferOnContextCanceled(t *testing.T) {
3838
ctx, cancel := context.WithCancel(context.Background())
3939
cancel()
4040

41-
c.WriteData(ctx, NewBuffer(1024))
41+
c.WriteData(ctx, NewSharedBuffer(NewBuffer(1024)))
4242
assert.NotNil(t, pool.Get())
4343
}
4444

@@ -49,7 +49,7 @@ func TestConnection_WriteData_recyclesBufferOnConnectionClosed(t *testing.T) {
4949
c.state.Store(connectionClosed)
5050
c.bufferPool = pool
5151

52-
c.WriteData(context.Background(), NewBuffer(1024))
52+
c.WriteData(context.Background(), NewSharedBuffer(NewBuffer(1024)))
5353
assert.NotNil(t, pool.Get())
5454
}
5555

@@ -58,7 +58,7 @@ func TestConnection_WriteData_doNotRecyclesBufferWhenWritten(t *testing.T) {
5858
c := makeTestConnection()
5959
c.bufferPool = pool
6060

61-
c.WriteData(context.Background(), NewBuffer(1024))
61+
c.WriteData(context.Background(), NewSharedBuffer(NewBuffer(1024)))
6262
assert.Nil(t, pool.Get())
6363
}
6464

@@ -67,7 +67,7 @@ func TestConnection_run_recyclesBufferOnceDone(t *testing.T) {
6767
c := makeTestConnection()
6868
c.bufferPool = pool
6969

70-
c.writeRequestsCh <- &dataRequest{ctx: context.Background(), data: NewBuffer(1024)}
70+
c.writeRequestsCh <- &dataRequest{ctx: context.Background(), data: NewSharedBuffer(NewBuffer(1024))}
7171
close(c.writeRequestsCh)
7272

7373
c.run()
@@ -82,7 +82,7 @@ func TestConnection_run_recyclesBufferEvenOnContextCanceled(t *testing.T) {
8282
ctx, cancel := context.WithCancel(context.Background())
8383
cancel()
8484

85-
c.writeRequestsCh <- &dataRequest{ctx: ctx, data: NewBuffer(1024)}
85+
c.writeRequestsCh <- &dataRequest{ctx: ctx, data: NewSharedBuffer(NewBuffer(1024))}
8686
close(c.writeRequestsCh)
8787

8888
c.run()

pulsar/internal/key_based_batch_builder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
type mockBufferPool struct {
3434
}
3535

36-
func (m *mockBufferPool) GetBuffer() Buffer {
36+
func (m *mockBufferPool) GetBuffer() *SharedBuffer {
3737
return nil
3838
}
3939

pulsar/producer_partition.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -393,9 +393,12 @@ func (p *partitionProducer) grabCnx(assignedBrokerURL string) error {
393393
// when resending pending batches, we update the sendAt timestamp to record the metric.
394394
pi.Lock()
395395
pi.sentAt = time.Now()
396+
// Buffer is sent again to the connection to be processed in an
397+
// asynchronous routine so we need to retain it again.
398+
pi.buffer = pi.buffer.Retain()
396399
pi.Unlock()
397400
p.pendingQueue.Put(pi)
398-
p._getConn().WriteData(pi.ctx, p.bufferPool.Clone(pi.buffer))
401+
p._getConn().WriteData(pi.ctx, pi.buffer)
399402

400403
if pi == lastViewItem {
401404
break
@@ -413,7 +416,7 @@ func (cc *connectionClosed) HasURL() bool {
413416
return len(cc.assignedBrokerURL) > 0
414417
}
415418

416-
func (p *partitionProducer) GetBuffer() internal.Buffer {
419+
func (p *partitionProducer) GetBuffer() *internal.SharedBuffer {
417420
return p.bufferPool.Get()
418421
}
419422

@@ -795,7 +798,7 @@ func (p *partitionProducer) internalSingleSend(
795798

796799
buffer := p.GetBuffer()
797800
if buffer == nil {
798-
buffer = internal.NewBuffer(int(payloadBuf.ReadableBytes() * 3 / 2))
801+
buffer = internal.NewSharedBuffer(internal.NewBuffer(int(payloadBuf.ReadableBytes() * 3 / 2)))
799802
}
800803

801804
sid := *mm.SequenceId
@@ -836,7 +839,7 @@ type pendingItem struct {
836839
sync.Mutex
837840
ctx context.Context
838841
cancel context.CancelFunc
839-
buffer internal.Buffer
842+
buffer *internal.SharedBuffer
840843
sequenceID uint64
841844
createdAt time.Time
842845
sentAt time.Time
@@ -883,7 +886,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
883886
p.writeData(batchData, sequenceID, callbacks)
884887
}
885888

886-
func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, callbacks []interface{}) {
889+
func (p *partitionProducer) writeData(buffer *internal.SharedBuffer, sequenceID uint64, callbacks []interface{}) {
887890
select {
888891
case <-p.ctx.Done():
889892
for _, cb := range callbacks {
@@ -893,6 +896,11 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
893896
}
894897
return
895898
default:
899+
// As the buffer will be sent to the connection in an asynchronous
900+
// routine, we need to announce it to later recycle later only when we
901+
// are done in both the connection and the producer.
902+
buffer = buffer.Retain()
903+
896904
now := time.Now()
897905
ctx, cancel := context.WithCancel(context.Background())
898906
p.pendingQueue.Put(&pendingItem{
@@ -904,7 +912,8 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
904912
sequenceID: sequenceID,
905913
sendRequests: callbacks,
906914
})
907-
p._getConn().WriteData(ctx, p.bufferPool.Clone(buffer))
915+
916+
p._getConn().WriteData(ctx, buffer)
908917
}
909918
}
910919

pulsar/producer_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2607,3 +2607,32 @@ func TestSelectConnectionForSameProducer(t *testing.T) {
26072607

26082608
client.Close()
26092609
}
2610+
2611+
func TestProducerPutsBufferBackInPool(t *testing.T) {
2612+
client, err := NewClient(ClientOptions{
2613+
URL: serviceURL,
2614+
})
2615+
assert.NoError(t, err)
2616+
defer client.Close()
2617+
2618+
testProducer, err := client.CreateProducer(ProducerOptions{Topic: newTopicName()})
2619+
assert.NoError(t, err)
2620+
defer testProducer.Close()
2621+
2622+
ctx, cancel := context.WithCancel(context.Background())
2623+
defer cancel()
2624+
2625+
for range 100 {
2626+
// Due to the sync.Pool not keeping all buffers, we need a few to make
2627+
// sure there is at least one available at the end
2628+
_, err = testProducer.Send(ctx, &ProducerMessage{Payload: []byte{42}})
2629+
assert.NoError(t, err)
2630+
2631+
if buffer := internal.GetDefaultBufferPool().Get(); buffer != nil {
2632+
// Ok!
2633+
return
2634+
}
2635+
}
2636+
2637+
t.Fatal("no buffer has been recycled")
2638+
}

0 commit comments

Comments
 (0)