Skip to content

Commit 1a57b3a

Browse files
authored
Add ReaderBufferer (#2)
- Add ReaderBufferer with its child struct BufferedReader - AdaptivePool: - Add MinCap to NormalSlice and NormalBytesBuffer - Change PoolItemProvider.Sizeof to have a meaningful negative value - AdaptivePool.Put will not reuse items with negative size - Stats: remove legacy implementations - Improve docs and add recommended starting values
1 parent 36121d9 commit 1a57b3a

File tree

7 files changed

+630
-35
lines changed

7 files changed

+630
-35
lines changed

adaptive_pool.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@ import (
1616
type PoolItemProvider[T any] interface {
1717
// Sizeof measures the size of an item. This measurement is used to compute
1818
// stats that allow efficiently reusing and creating items in an
19-
// AdaptivePool. It should not hold references to the item.
19+
// AdaptivePool. Items for which this method returns a negative number will
20+
// not be put back into the pool nor will be fed into statistics. This
21+
// allows handling items with a virtual size, like a slice. For instance, a
22+
// slice with zero cap should return -1 (or any negative value) so that it's
23+
// not unnecessarily put back into the pool, while it is totally fine to
24+
// return 0 for a slice with cap greater than zero. Implementations should
25+
// not hold references to the item.
2026
Sizeof(T) float64
2127
// Create returns a new item. It has a set of basic stats about the
2228
// AdaptivePool usage that allows efficient pre-allocation in many common
@@ -31,18 +37,24 @@ type PoolItemProvider[T any] interface {
3137
// NormalSlice is a generic [PoolItemProvider] for slice items, operating under
3238
// the assumption that their `len` follow a Normal Distribution.
3339
type NormalSlice[T any] struct {
40+
MinCap int // Minimum capacity of a newly created slice
3441
Threshold float64 // Threshold must be non-negative.
3542
}
3643

3744
// Sizeof returns the length of the slice.
3845
func (p NormalSlice[T]) Sizeof(v []T) float64 {
46+
if cap(v) == 0 {
47+
return -1
48+
}
3949
return float64(len(v))
4050
}
4151

4252
// Create returns a new slice with length zero and cap `mean + Threshold *
4353
// stdDev`, or `mean` if `stdDev` is `NaN`.
4454
func (p NormalSlice[T]) Create(mean, stdDev float64) []T {
45-
return make([]T, 0, int(normalCreateSize(mean, stdDev, p.Threshold)))
55+
size := int(normalCreateSize(mean, stdDev, p.Threshold))
56+
size = max(size, p.MinCap)
57+
return make([]T, 0, size)
4658
}
4759

4860
// Accept will accept a new item if its length is in the inclusive range `mean ±
@@ -54,22 +66,24 @@ func (p NormalSlice[T]) Accept(mean, stdDev, itemSize float64) bool {
5466
// NormalBytesBuffer is a [PoolItemProvider] for [*bytes.Buffer] items,
5567
// operating under the assumption that their `Len` follow a Normal Distribution.
5668
type NormalBytesBuffer struct {
69+
MinCap int // Minimum capacity of a newly created *bytes.Buffer
5770
Threshold float64 // Threshold must be non-negative.
5871
}
5972

6073
// Sizeof returns the length of the buffer.
6174
func (p NormalBytesBuffer) Sizeof(v *bytes.Buffer) float64 {
62-
if v == nil {
63-
return 0
75+
if v == nil || v.Cap() == 0 {
76+
return -1
6477
}
6578
return float64(v.Len())
6679
}
6780

6881
// Create returns a new buffer with `Len` zero and `Cap` `mean + Threshold *
6982
// stdDev`, or `mean` if `stdDev` is `NaN`.
7083
func (p NormalBytesBuffer) Create(mean, stdDev float64) *bytes.Buffer {
71-
size := normalCreateSize(mean, stdDev, p.Threshold)
72-
return bytes.NewBuffer(make([]byte, 0, int(size)))
84+
size := int(normalCreateSize(mean, stdDev, p.Threshold))
85+
size = max(size, p.MinCap)
86+
return bytes.NewBuffer(make([]byte, 0, size))
7387
}
7488

7589
// Accept will accept a new item if its `Len` is in the inclusive range `mean ±
@@ -80,10 +94,7 @@ func (p NormalBytesBuffer) Accept(mean, stdDev, itemSize float64) bool {
8094

8195
// AdaptivePool is a [sync.Pool] that uses a [PoolItemProvider] to efficiently
8296
// create and reuse new pool items. Statistics are updated each time the `Put`
83-
// method is called for an item, regardless if it will be put back in the
84-
// sync.Pool. As with a regular sync.Pool, it can be "seeded" with objects by
85-
// calling `Put`, with the additional property that statistics will also be
86-
// seeded this way.
97+
// method is called for an item.
8798
type AdaptivePool[T any] struct {
8899
pool pool
89100
provider PoolItemProvider[T]
@@ -128,9 +139,13 @@ func (p *AdaptivePool[T]) Get() T {
128139
}
129140

130141
// Put updates the internal statistics with the size of the object and puts
131-
// it back to the pool if [PoolItemProvider.Accept] allows it.
142+
// it back to the pool if [PoolItemProvider.Accept] allows it. Items with a
143+
// negative size will not be put back into the pool.
132144
func (p *AdaptivePool[T]) Put(x T) {
133145
s := p.provider.Sizeof(x)
146+
if s < 0 {
147+
return
148+
}
134149
mean, stdDev := p.writeThenRead(s)
135150
if p.provider.Accept(mean, stdDev, s) {
136151
p.pool.Put(x)

adaptive_pool_test.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ func TestAdaptivePool(t *testing.T) {
2626
return float64(cap(v))
2727
}
2828

29-
x := newAdaptivePoolAsserter(t, NormalSlice[int]{thresh}, capv)
29+
x := newAdaptivePoolAsserter(t, NormalSlice[int]{
30+
Threshold: thresh,
31+
}, capv)
32+
x.assertStats(0, 0, math.NaN())
33+
x.assertPut(nil, true) // should be a nop
3034
x.assertStats(0, 0, math.NaN())
3135
x.assertGet(0)
3236
x.assertGet(0) // should not change capacity
@@ -59,7 +63,7 @@ func TestAdaptivePool(t *testing.T) {
5963
x.assertStats(15, 32, 16)
6064
})
6165

62-
t.Run("seeding", func(t *testing.T) {
66+
t.Run("test data from file", func(t *testing.T) {
6367
t.Parallel()
6468
const thresh = 2.5
6569
v := func(n int) *bytes.Buffer {
@@ -69,7 +73,12 @@ func TestAdaptivePool(t *testing.T) {
6973
return float64(v.Cap())
7074
}
7175

72-
x := newAdaptivePoolAsserter(t, NormalBytesBuffer{thresh}, capv)
76+
x := newAdaptivePoolAsserter(t, NormalBytesBuffer{
77+
Threshold: thresh,
78+
}, capv)
79+
x.assertStats(0, 0, math.NaN())
80+
x.assertPut(nil, true) // should be a nop
81+
x.assertStats(0, 0, math.NaN())
7382
x.assertGet(0)
7483

7584
values := make([]float64, 3)
@@ -81,11 +90,11 @@ func TestAdaptivePool(t *testing.T) {
8190
break
8291
}
8392
i++
84-
zero(t, err)
93+
zero(t, err, "read CSV record #%d", i)
8594
equal(t, 3, len(rec), "number of CSV values in record #%d", i)
8695

8796
err = parseFloats(rec, values)
88-
zero(t, err)
97+
zero(t, err, "parse floats from CSV record #%d; record: %v", i, rec)
8998

9099
x.ap.Put(v(int(values[0])))
91100
}
@@ -130,8 +139,8 @@ func newAdaptivePoolAsserter[T any](
130139
func (a adaptivePoolAsserter[T]) assertGet(expectedSize float64) {
131140
a.t.Helper()
132141
item := a.ap.Get()
133-
if s := a.provider.Sizeof(item); s != 0 {
134-
a.t.Fatalf("created items should have size zero, got %v", s)
142+
if s := a.provider.Sizeof(item); s > 0 {
143+
a.t.Fatalf("created items should have non-positive size, got %v", s)
135144
}
136145
got := a.capv(item)
137146
if got != expectedSize {

buffered_reader.go

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
package adaptivepool
2+
3+
import (
4+
"bytes"
5+
"errors"
6+
"fmt"
7+
"io"
8+
"sync"
9+
)
10+
11+
// ReaderBufferer buffers data from [io.Reader]s and [io.ReadCloser]s into
12+
// [BufferedReader]s that, upon calling their `Close` method, will put the data
13+
// back into an [AdaptivePool] for reuse.
14+
type ReaderBufferer struct {
15+
bufPool AdaptivePool[[]byte]
16+
rdPool sync.Pool
17+
}
18+
19+
// NewReaderBufferer returns a new ReaderBufferer. The `minCap` and `thresh`
20+
// arguments will be the values of the internal [NormalSlice.MinCap] and
21+
// [NormalSlice.Threshold], respectively. Example:
22+
//
23+
// rb := NewReaderBufferer(512, 2, 500)
24+
func NewReaderBufferer(minCap int, thresh, maxN float64) *ReaderBufferer {
25+
return new(ReaderBufferer).init(minCap, thresh, maxN)
26+
}
27+
28+
func (p *ReaderBufferer) init(minCap int, thresh,
29+
maxN float64) *ReaderBufferer {
30+
p.rdPool.New = newBytesReader
31+
p.bufPool.init(NormalSlice[byte]{
32+
MinCap: minCap,
33+
Threshold: thresh,
34+
}, maxN)
35+
return p
36+
}
37+
38+
func newBytesReader() any {
39+
return bytes.NewReader(nil)
40+
}
41+
42+
// Stats returns the statistics from the internal AdaptivePool.
43+
func (p *ReaderBufferer) Stats() Stats {
44+
return p.bufPool.Stats()
45+
}
46+
47+
// Reader buffers the contents of the given io.Reader in a BufferedReader.
48+
func (p *ReaderBufferer) Reader(r io.Reader) (*BufferedReader, error) {
49+
return p.buf(r, nil)
50+
}
51+
52+
// ReadCloser buffers the contents of the given io.ReadCloser in a
53+
// BufferedReader. It always calls Close, and it fails if it returns an error.
54+
func (p *ReaderBufferer) ReadCloser(rc io.ReadCloser) (*BufferedReader, error) {
55+
return p.buf(rc, rc)
56+
}
57+
58+
func (p *ReaderBufferer) buf(r io.Reader,
59+
c io.Closer) (*BufferedReader, error) {
60+
buf := p.bufPool.Get()
61+
bytesBuf := bytes.NewBuffer(buf)
62+
n, readErr := bytesBuf.ReadFrom(r)
63+
if readErr != nil && c == nil {
64+
p.put(buf)
65+
return nil, fmt.Errorf("read io.Reader: %w; bytes read: %v", readErr, n)
66+
}
67+
buf = bytesBuf.Bytes()
68+
69+
var closeErr error
70+
if c != nil {
71+
closeErr = c.Close()
72+
if readErr == nil && closeErr != nil {
73+
p.put(buf)
74+
return nil, fmt.Errorf("close io.ReadCloser: %w; bytes read: %v",
75+
closeErr, n)
76+
}
77+
}
78+
79+
if readErr != nil || closeErr != nil {
80+
p.put(buf)
81+
return nil, fmt.Errorf("buffer io.ReadCloser: read error: %w; close"+
82+
" error: %w; bytes read: %v", readErr, closeErr, n)
83+
}
84+
85+
rd := p.rdPool.Get().(*bytes.Reader)
86+
rd.Reset(buf)
87+
88+
return &BufferedReader{
89+
reader: rd,
90+
buf: buf,
91+
release: p.release,
92+
}, nil
93+
}
94+
95+
func (p *ReaderBufferer) release(buf []byte, rd *bytes.Reader) {
96+
rd.Reset(nil)
97+
p.rdPool.Put(rd)
98+
p.put(buf)
99+
}
100+
101+
func (p *ReaderBufferer) put(buf []byte) {
102+
if cap(buf) > 0 {
103+
clear(buf[:cap(buf)])
104+
p.bufPool.Put(buf[:0])
105+
}
106+
}
107+
108+
// NOTE: we explicitly do not want to offer io.ReaderAt in BufferedReader
109+
// because, as per its docs, "Clients of ReadAt can execute parallel ReadAt
110+
// calls on the same input source". This means that we should add a sync.RWMutex
111+
// to protect the underlying implementation and make it more heavyweight in
112+
// order to guard the parallel ReadAt operations from potential Close
113+
// operations. Clients can still use the Seek method and then Read as a
114+
// sequential workaround.
115+
116+
// BufferedReader holds a read-only buffer of the contents extracted from an
117+
// [io.Reader] or [io.ReadCloser]. Its `Close` method releases internal buffers
118+
// for reuse, and after that it will be empty. It is not safe for concurrent
119+
// use.
120+
type BufferedReader struct {
121+
reader *bytes.Reader
122+
buf []byte
123+
release func([]byte, *bytes.Reader)
124+
}
125+
126+
// Bytes returns the internal buffered []byte, transferring their ownership to
127+
// the caller. The data will not be later put back into a pool by the
128+
// implementation, and subsequent calls to any method will behave as if `Close`
129+
// had been called. Subsequent calls to this method return nil, the same as if
130+
// `Close` had been called before.
131+
func (bb *BufferedReader) Bytes() []byte {
132+
if bb.reader != nil {
133+
bb.release(nil, bb.reader)
134+
buf := bb.buf
135+
*bb = BufferedReader{}
136+
return buf
137+
}
138+
return nil
139+
}
140+
141+
// Len returns the number of unread bytes.
142+
func (bb *BufferedReader) Len() int {
143+
if bb.reader != nil {
144+
return bb.reader.Len()
145+
}
146+
return 0
147+
}
148+
149+
// Read is part of the implementation of the io.Reader interface.
150+
func (bb *BufferedReader) Read(p []byte) (int, error) {
151+
if bb.reader != nil {
152+
return bb.reader.Read(p)
153+
}
154+
return 0, io.EOF
155+
}
156+
157+
// Close is part of the implementation of the io.Closer interface. This method
158+
// releases the internal buffer for reuse. After this, the *BufferedReader will
159+
// be empty. This method is idempotent and always returns a nil error.
160+
func (bb *BufferedReader) Close() error {
161+
if bb.reader != nil {
162+
bb.release(bb.buf, bb.reader)
163+
*bb = BufferedReader{}
164+
}
165+
return nil
166+
}
167+
168+
// Seek is part of the implementation of the io.Seeker interface.
169+
func (bb *BufferedReader) Seek(offset int64, whence int) (int64, error) {
170+
if bb.reader != nil {
171+
return bb.reader.Seek(offset, whence)
172+
}
173+
174+
switch whence {
175+
case io.SeekStart, io.SeekCurrent, io.SeekEnd:
176+
default:
177+
return 0, errors.New("BufferedReader.Seek: invalid whence")
178+
}
179+
if offset < 0 {
180+
return 0, errors.New("BufferedReader.Seek: negative position")
181+
}
182+
183+
return 0, nil
184+
}
185+
186+
// ReadByte is part of the implementation of the io.ByteReader interface.
187+
func (bb *BufferedReader) ReadByte() (byte, error) {
188+
if bb.reader != nil {
189+
return bb.reader.ReadByte()
190+
}
191+
return 0, io.EOF
192+
}
193+
194+
// UnreadByte is part of the implementation of the io.ByteScanner interface.
195+
func (bb *BufferedReader) UnreadByte() error {
196+
if bb.reader != nil {
197+
return bb.reader.UnreadByte()
198+
}
199+
return errors.New("BufferedReader.UnreadByte: resource closed")
200+
}
201+
202+
// ReadRune is part of the implementation of the io.RuneReader interface.
203+
func (bb *BufferedReader) ReadRune() (r rune, size int, err error) {
204+
if bb.reader != nil {
205+
return bb.reader.ReadRune()
206+
}
207+
return 0, 0, io.EOF
208+
}
209+
210+
// UnreadRune is part of the implementation of the io.RuneScanner interface.
211+
func (bb *BufferedReader) UnreadRune() error {
212+
if bb.reader != nil {
213+
return bb.reader.UnreadRune()
214+
}
215+
return errors.New("BufferedReader.UnreadRune: resource closed")
216+
}
217+
218+
// WriteTo is part of the implementation of the io.WriterTo interface.
219+
func (bb *BufferedReader) WriteTo(w io.Writer) (n int64, err error) {
220+
if bb.reader != nil {
221+
return bb.reader.WriteTo(w)
222+
}
223+
return 0, nil
224+
}

0 commit comments

Comments
 (0)