Skip to content

Commit e9dea4e

Browse files
committed
Recycle a buffer in several async routines
1 parent 27580b8 commit e9dea4e

File tree

5 files changed

+280
-21
lines changed

5 files changed

+280
-21
lines changed

pulsar/internal/buffer.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,17 @@ package internal
1919

2020
import (
2121
"encoding/binary"
22+
"sync"
2223

2324
log "github.com/sirupsen/logrus"
2425
)
2526

27+
var defaultBufferPool = newBufferPool(&sync.Pool{})
28+
29+
func GetDefaultBufferPool() synchronizedBufferPool {
30+
return defaultBufferPool
31+
}
32+
2633
// Buffer is a variable-sized buffer of bytes with Read and Write methods.
2734
// The zero value for Buffer is an empty buffer ready to use.
2835
type Buffer interface {
@@ -217,3 +224,46 @@ func (b *buffer) Clear() {
217224
b.readerIdx = 0
218225
b.writerIdx = 0
219226
}
227+
228+
type BufferPool interface {
229+
// Get returns a cleared buffer if any is available, otherwise nil.
230+
Get() Buffer
231+
232+
// Put puts the buffer back to the pool and available for other routines.
233+
Put(Buffer)
234+
235+
// Clone attempts to create a clone using a buffer from the pool, or returns
236+
// a new one if necessary.
237+
Clone(Buffer) Buffer
238+
}
239+
240+
type synchronizedBufferPool struct {
241+
pool *sync.Pool
242+
}
243+
244+
var _ BufferPool = synchronizedBufferPool{}
245+
246+
func newBufferPool(pool *sync.Pool) synchronizedBufferPool {
247+
return synchronizedBufferPool{pool: pool}
248+
}
249+
250+
func (p synchronizedBufferPool) Get() Buffer {
251+
buffer, ok := p.pool.Get().(Buffer)
252+
if ok {
253+
buffer.Clear()
254+
}
255+
return buffer
256+
}
257+
258+
func (p synchronizedBufferPool) Put(buffer Buffer) {
259+
p.pool.Put(buffer)
260+
}
261+
262+
func (p synchronizedBufferPool) Clone(b Buffer) Buffer {
263+
newBuffer := p.Get()
264+
if newBuffer == nil {
265+
newBuffer = &buffer{}
266+
}
267+
newBuffer.Write(b.ReadableSlice())
268+
return newBuffer
269+
}

pulsar/internal/buffer_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package internal
1919

2020
import (
21+
"sync"
2122
"testing"
2223

2324
"github.com/stretchr/testify/assert"
@@ -34,3 +35,65 @@ func TestBuffer(t *testing.T) {
3435
assert.Equal(t, uint32(1019), b.WritableBytes())
3536
assert.Equal(t, uint32(1024), b.Capacity())
3637
}
38+
39+
func TestSynchronizedBufferPool_Clone_returnsNewlyAllocatedBuffer(t *testing.T) {
40+
pool := newBufferPool(&sync.Pool{})
41+
42+
buffer := NewBuffer(1024)
43+
buffer.Write([]byte{1, 2, 3})
44+
45+
res := pool.Clone(buffer)
46+
assert.Equal(t, []byte{1, 2, 3}, res.ReadableSlice())
47+
}
48+
49+
func TestSynchronizedBufferPool_Clone_returnsRecycledBuffer(t *testing.T) {
50+
pool := newBufferPool(&sync.Pool{})
51+
52+
for range 100 {
53+
buffer := NewBuffer(1024)
54+
buffer.Write([]byte{1, 2, 3})
55+
pool.Put(buffer)
56+
}
57+
58+
buffer := NewBuffer(1024)
59+
buffer.Write([]byte{1, 2, 3})
60+
61+
res := pool.Clone(buffer)
62+
assert.Equal(t, []byte{1, 2, 3}, res.ReadableSlice())
63+
}
64+
65+
// BenchmarkBufferPool_Clone demonstrates the cloning of a buffer without
66+
// allocation if the pool is filled making the process very efficient.
67+
func BenchmarkBufferPool_Clone(b *testing.B) {
68+
pool := GetDefaultBufferPool()
69+
buffer := NewBuffer(1024)
70+
buffer.Write(make([]byte, 1024))
71+
72+
for range b.N {
73+
newBuffer := pool.Clone(buffer)
74+
pool.Put(newBuffer)
75+
}
76+
}
77+
78+
// --- Helpers
79+
80+
type capturingPool struct {
81+
buffers []Buffer
82+
}
83+
84+
func (p *capturingPool) Get() Buffer {
85+
if len(p.buffers) > 0 {
86+
value := p.buffers[0]
87+
p.buffers = p.buffers[1:]
88+
return value
89+
}
90+
return nil
91+
}
92+
93+
func (p *capturingPool) Put(value Buffer) {
94+
p.buffers = append(p.buffers, value)
95+
}
96+
97+
func (p *capturingPool) Clone(value Buffer) Buffer {
98+
return value
99+
}

pulsar/internal/connection.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ type connection struct {
150150
logicalAddr *url.URL
151151
physicalAddr *url.URL
152152

153+
bufferPool BufferPool
153154
writeBufferLock sync.Mutex
154155
writeBuffer Buffer
155156
reader *connectionReader
@@ -199,6 +200,7 @@ func newConnection(opts connectionOptions) *connection {
199200
keepAliveInterval: opts.keepAliveInterval,
200201
logicalAddr: opts.logicalAddr,
201202
physicalAddr: opts.physicalAddr,
203+
bufferPool: GetDefaultBufferPool(),
202204
writeBuffer: NewBuffer(4096),
203205
log: opts.logger.SubLogger(log.Fields{"remote_addr": opts.physicalAddr}),
204206
pendingReqs: make(map[uint64]*request),
@@ -433,6 +435,9 @@ func (c *connection) run() {
433435
}
434436
c.internalWriteData(req.ctx, req.data)
435437

438+
// Write request is fully processed so we can release the buffer.
439+
c.bufferPool.Put(req.data)
440+
436441
case <-pingSendTicker.C:
437442
c.sendPing()
438443
}
@@ -457,9 +462,18 @@ func (c *connection) runPingCheck(pingCheckTicker *time.Ticker) {
457462
}
458463

459464
func (c *connection) WriteData(ctx context.Context, data Buffer) {
465+
written := false
466+
defer func() {
467+
if !written {
468+
// Buffer has failed to be written and can now be reused.
469+
c.bufferPool.Put(data)
470+
}
471+
}()
472+
460473
select {
461474
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
462475
// Channel is not full
476+
written = true
463477
return
464478
case <-ctx.Done():
465479
c.log.Debug("Write data context cancelled")
@@ -472,6 +486,7 @@ func (c *connection) WriteData(ctx context.Context, data Buffer) {
472486
select {
473487
case c.writeRequestsCh <- &dataRequest{ctx: ctx, data: data}:
474488
// Successfully wrote on the channel
489+
written = true
475490
return
476491
case <-ctx.Done():
477492
c.log.Debug("Write data context cancelled")

pulsar/internal/connection_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package internal
19+
20+
import (
21+
"context"
22+
"io"
23+
"net"
24+
"testing"
25+
"time"
26+
27+
"github.com/apache/pulsar-client-go/pulsar/log"
28+
"github.com/prometheus/client_golang/prometheus"
29+
"github.com/stretchr/testify/assert"
30+
)
31+
32+
func TestConnection_WriteData_recyclesBufferOnContextCanceled(t *testing.T) {
33+
pool := &capturingPool{}
34+
c := makeTestConnection()
35+
c.writeRequestsCh = make(chan *dataRequest)
36+
c.bufferPool = pool
37+
38+
ctx, cancel := context.WithCancel(context.Background())
39+
cancel()
40+
41+
c.WriteData(ctx, NewBuffer(1024))
42+
assert.NotNil(t, pool.Get())
43+
}
44+
45+
func TestConnection_WriteData_recyclesBufferOnConnectionClosed(t *testing.T) {
46+
pool := &capturingPool{}
47+
c := makeTestConnection()
48+
c.writeRequestsCh = make(chan *dataRequest)
49+
c.state.Store(connectionClosed)
50+
c.bufferPool = pool
51+
52+
c.WriteData(context.Background(), NewBuffer(1024))
53+
assert.NotNil(t, pool.Get())
54+
}
55+
56+
func TestConnection_WriteData_doNotRecyclesBufferWhenWritten(t *testing.T) {
57+
pool := &capturingPool{}
58+
c := makeTestConnection()
59+
c.bufferPool = pool
60+
61+
c.WriteData(context.Background(), NewBuffer(1024))
62+
assert.Nil(t, pool.Get())
63+
}
64+
65+
func TestConnection_run_recyclesBufferOnceDone(t *testing.T) {
66+
pool := &capturingPool{}
67+
c := makeTestConnection()
68+
c.bufferPool = pool
69+
70+
c.writeRequestsCh <- &dataRequest{ctx: context.Background(), data: NewBuffer(1024)}
71+
close(c.writeRequestsCh)
72+
73+
c.run()
74+
assert.NotNil(t, pool.Get())
75+
}
76+
77+
func TestConnection_run_recyclesBufferEvenOnContextCanceled(t *testing.T) {
78+
pool := &capturingPool{}
79+
c := makeTestConnection()
80+
c.bufferPool = pool
81+
82+
ctx, cancel := context.WithCancel(context.Background())
83+
cancel()
84+
85+
c.writeRequestsCh <- &dataRequest{ctx: ctx, data: NewBuffer(1024)}
86+
close(c.writeRequestsCh)
87+
88+
c.run()
89+
assert.NotNil(t, pool.Get())
90+
}
91+
92+
// --- Helpers
93+
94+
func makeTestConnection() *connection {
95+
c := &connection{
96+
log: log.DefaultNopLogger(),
97+
keepAliveInterval: 30 * time.Second,
98+
writeRequestsCh: make(chan *dataRequest, 10),
99+
closeCh: make(chan struct{}),
100+
cnx: happyCnx{},
101+
metrics: NewMetricsProvider(0, make(map[string]string), prometheus.DefaultRegisterer),
102+
bufferPool: GetDefaultBufferPool(),
103+
}
104+
c.reader = newConnectionReader(c)
105+
c.state.Store(connectionReady)
106+
return c
107+
}
108+
109+
type happyCnx struct{}
110+
111+
func (happyCnx) Read(b []byte) (n int, err error) {
112+
return 0, io.EOF
113+
}
114+
115+
func (happyCnx) Write(b []byte) (n int, err error) {
116+
return len(b), nil
117+
}
118+
119+
func (happyCnx) Close() error {
120+
return nil
121+
}
122+
123+
func (happyCnx) LocalAddr() net.Addr {
124+
return &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
125+
}
126+
127+
func (happyCnx) RemoteAddr() net.Addr {
128+
return &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
129+
}
130+
131+
func (happyCnx) SetDeadline(t time.Time) error {
132+
return nil
133+
}
134+
135+
func (happyCnx) SetReadDeadline(t time.Time) error {
136+
return nil
137+
}
138+
139+
func (happyCnx) SetWriteDeadline(t time.Time) error {
140+
return nil
141+
}

0 commit comments

Comments
 (0)