Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
*.out
coverage.html

perf/perf
pulsar-perf
bin

perf/perf/
pulsar-perf/
bin/
logs/
vendor/
7 changes: 5 additions & 2 deletions pulsar/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ type DefaultBackoff struct {
}

func NewDefaultBackoff() Policy {
return &DefaultBackoff{rnd: rand.New(rand.NewSource(time.Now().UnixNano()))}
return NewDefaultBackoffWithInitialBackOff(0)
}
func NewDefaultBackoffWithInitialBackOff(backoff time.Duration) Policy {
return &DefaultBackoff{backoff: backoff / 2}
return &DefaultBackoff{
rnd: rand.New(rand.NewSource(time.Now().UnixNano())),
backoff: backoff / 2,
}
}

const maxBackoff = 60 * time.Second
Expand Down
6 changes: 3 additions & 3 deletions pulsar/backoff/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ import (
)

func TestBackoff_NextMinValue(t *testing.T) {
backoff := &DefaultBackoff{}
backoff := NewDefaultBackoff()
delay := backoff.Next()
assert.GreaterOrEqual(t, int64(delay), int64(100*time.Millisecond))
assert.LessOrEqual(t, int64(delay), int64(120*time.Millisecond))
}

func TestBackoff_NextExponentialBackoff(t *testing.T) {
backoff := &DefaultBackoff{}
backoff := NewDefaultBackoff()
previousDelay := backoff.Next()
// the last value before capping to the max value is 51.2 s (.1, .2, .4, .8, 1.6, 3.2, 6.4, 12.8, 25.6, 51.2)
for previousDelay < 51*time.Second {
Expand All @@ -47,7 +47,7 @@ func TestBackoff_NextExponentialBackoff(t *testing.T) {
}

func TestBackoff_NextMaxValue(t *testing.T) {
backoff := &DefaultBackoff{}
backoff := NewDefaultBackoff()
var delay time.Duration
for delay < maxBackoff {
delay = backoff.Next()
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_multitopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (dummyConnection) SendRequestNoWait(_ *pb.BaseCommand) error {
return nil
}

func (dummyConnection) WriteData(_ context.Context, _ internal.Buffer) {
func (dummyConnection) WriteData(_ context.Context, _ *internal.SharedBuffer) {
}

func (dummyConnection) RegisterListener(_ uint64, _ internal.ConnectionListener) error {
Expand Down
6 changes: 3 additions & 3 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

type BuffersPool interface {
GetBuffer() Buffer
GetBuffer() *SharedBuffer
}

// BatcherBuilderProvider defines func which returns the BatchBuilder.
Expand Down Expand Up @@ -71,7 +71,7 @@ type BatchBuilder interface {
}

type FlushBatch struct {
BatchData Buffer
BatchData *SharedBuffer
SequenceID uint64
Callbacks []interface{}
Error error
Expand Down Expand Up @@ -271,7 +271,7 @@ func (bc *batchContainer) Flush() *FlushBatch {

buffer := bc.buffersPool.GetBuffer()
if buffer == nil {
buffer = NewBuffer(int(uncompressedSize * 3 / 2))
buffer = NewSharedBuffer(NewBuffer(int(uncompressedSize * 3 / 2)))
}

sequenceID := uint64(0)
Expand Down
67 changes: 67 additions & 0 deletions pulsar/internal/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@ package internal

import (
"encoding/binary"
"sync"
"sync/atomic"

log "github.com/sirupsen/logrus"
)

var defaultBufferPool BufferPool = newBufferPool(&sync.Pool{})

func GetDefaultBufferPool() BufferPool {
return defaultBufferPool
}

// Buffer is a variable-sized buffer of bytes with Read and Write methods.
// The zero value for Buffer is an empty buffer ready to use.
type Buffer interface {
Expand Down Expand Up @@ -217,3 +225,62 @@ func (b *buffer) Clear() {
b.readerIdx = 0
b.writerIdx = 0
}

type BufferPool interface {
// Get returns a cleared buffer if any is available, otherwise nil.
Get() *SharedBuffer

// Put puts the buffer back to the pool and available for other routines.
Put(*SharedBuffer)
}

type synchronizedBufferPool struct {
pool *sync.Pool
}

var _ BufferPool = synchronizedBufferPool{}

func newBufferPool(pool *sync.Pool) synchronizedBufferPool {
return synchronizedBufferPool{pool: pool}
}

func (p synchronizedBufferPool) Get() *SharedBuffer {
buffer, ok := p.pool.Get().(Buffer)
if ok {
buffer.Clear()
return NewSharedBuffer(buffer)
}
return nil
}

func (p synchronizedBufferPool) Put(buffer *SharedBuffer) {
if !buffer.isCurrentlyShared.CompareAndSwap(true, false) {
// When swapping fails, it means that a previous go routine has already
// tried to recycle the buffer, indicating that the sharing is not there
// anymore and the buffer can safely be reused.
p.pool.Put(buffer)
}
Comment on lines +257 to +262
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If:

  • Buffer A created from pool
  • Call A.Retain, create a new buffer B (isCurrentlyShared=true)
  • Put back the A(isCurrentlyShared=false), the buffer A will be returned to the pool. And the buffer get recycled.
  • Panic! The buffer B still holds this buffer.

Why not just use the ref count here?

}

// SharedBuffer is a wrapper around a buffer that can keep track of a shared
// reference of the buffer to later recycle it only when both references are
// done with it.
type SharedBuffer struct {
Buffer
isCurrentlyShared atomic.Bool
}

func NewSharedBuffer(buffer Buffer) *SharedBuffer {
return &SharedBuffer{Buffer: buffer}
}

// Retain creates a new shared buffer backed by the same buffer but that will be
// retained by the pool until the shared reference is also done.
func (b *SharedBuffer) Retain() *SharedBuffer {
newBuffer := &SharedBuffer{
Buffer: b.Buffer,
isCurrentlyShared: atomic.Bool{},
}
newBuffer.isCurrentlyShared.Store(true)
return newBuffer
}
65 changes: 65 additions & 0 deletions pulsar/internal/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package internal

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -34,3 +35,67 @@ func TestBuffer(t *testing.T) {
assert.Equal(t, uint32(1019), b.WritableBytes())
assert.Equal(t, uint32(1024), b.Capacity())
}

func TestSynchronizedBufferPool_Get_returnsNilWhenEmpty(t *testing.T) {
pool := newBufferPool(&sync.Pool{})
assert.Nil(t, pool.Get())
}

func TestSynchronizedBufferPool_Put_marksBufferAsNotSharedAnymore(t *testing.T) {
buffer := NewSharedBuffer(NewBuffer(1024)).Retain()
assert.True(t, buffer.isCurrentlyShared.Load())

pool := newBufferPool(&sync.Pool{})
pool.Put(buffer)
assert.False(t, buffer.isCurrentlyShared.Load())
}

func TestSynchronizedBufferPool_Put_recyclesSharedBuffer(t *testing.T) {
pool := newBufferPool(&sync.Pool{})

for range 100 {
buffer := NewSharedBuffer(NewBuffer(1024)).Retain()
pool.Put(buffer)
pool.Put(buffer)

if res := pool.Get(); res != nil {
return
}
}

t.Fatal("pool is not recycling buffers")
}

func TestSynchronizedBufferPool_Put_recyclesBuffer(t *testing.T) {
pool := newBufferPool(&sync.Pool{})

for range 100 {
buffer := NewSharedBuffer(NewBuffer(1024))
pool.Put(buffer)

if res := pool.Get(); res != nil {
return
}
}

t.Fatal("pool is not recycling buffers")
}

// --- Helpers

type capturingPool struct {
buffers []*SharedBuffer
}

func (p *capturingPool) Get() *SharedBuffer {
if len(p.buffers) > 0 {
value := p.buffers[0]
p.buffers = p.buffers[1:]
return value
}
return nil
}

func (p *capturingPool) Put(value *SharedBuffer) {
p.buffers = append(p.buffers, value)
}
34 changes: 31 additions & 3 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type ConnectionListener interface {
type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
SendRequestNoWait(req *pb.BaseCommand) error
WriteData(ctx context.Context, data Buffer)
WriteData(ctx context.Context, data *SharedBuffer)
RegisterListener(id uint64, listener ConnectionListener) error
UnregisterListener(id uint64)
AddConsumeHandler(id uint64, handler ConsumerHandler) error
Expand Down Expand Up @@ -132,7 +132,7 @@ type request struct {

type dataRequest struct {
ctx context.Context
data Buffer
data *SharedBuffer
}

type connection struct {
Expand All @@ -150,6 +150,7 @@ type connection struct {
logicalAddr *url.URL
physicalAddr *url.URL

bufferPool BufferPool
writeBufferLock sync.Mutex
writeBuffer Buffer
reader *connectionReader
Expand Down Expand Up @@ -199,6 +200,7 @@ func newConnection(opts connectionOptions) *connection {
keepAliveInterval: opts.keepAliveInterval,
logicalAddr: opts.logicalAddr,
physicalAddr: opts.physicalAddr,
bufferPool: GetDefaultBufferPool(),
writeBuffer: NewBuffer(4096),
log: opts.logger.SubLogger(log.Fields{"remote_addr": opts.physicalAddr}),
pendingReqs: make(map[uint64]*request),
Expand Down Expand Up @@ -394,6 +396,18 @@ func (c *connection) failLeftRequestsWhenClose() {
}
}

func (c *connection) drainWriteRequests() {
for {
select {
case req := <-c.writeRequestsCh:
c.bufferPool.Put(req.data)

default:
return
}
}
}

func (c *connection) run() {
pingSendTicker := time.NewTicker(c.keepAliveInterval)
pingCheckTicker := time.NewTicker(c.keepAliveInterval)
Expand Down Expand Up @@ -421,6 +435,7 @@ func (c *connection) run() {
select {
case <-c.closeCh:
c.failLeftRequestsWhenClose()
c.drainWriteRequests()
return
case req := <-c.incomingRequestsCh:
if req == nil {
Expand All @@ -433,6 +448,9 @@ func (c *connection) run() {
}
c.internalWriteData(req.ctx, req.data)

// Write request is fully processed so we can release the buffer.
c.bufferPool.Put(req.data)

case <-pingSendTicker.C:
c.sendPing()
}
Expand All @@ -456,10 +474,19 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) {
}
}

func (c *connection) WriteData(ctx context.Context, data Buffer) {
func (c *connection) WriteData(ctx context.Context, data *SharedBuffer) {
written := false
defer func() {
if !written {
// Buffer has failed to be written and can now be reused.
c.bufferPool.Put(data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a new method like Recycle into the SharedBuffer so we don't need to maintain the bufferPool in the connection. The SharedBuffer can hold the ref to the buffer pool.

}
}()

select {
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
// Channel is not full
written = true
return
case <-ctx.Done():
c.log.Debug("Write data context cancelled")
Expand All @@ -472,6 +499,7 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) {
select {
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
// Successfully wrote on the channel
written = true
return
case <-ctx.Done():
c.log.Debug("Write data context cancelled")
Expand Down
Loading
Loading