Skip to content

Commit 783a2c9

Browse files
committed
api: added future/response releasing methods.
Added method Release to Future and Response's interface, that allows to free used data directly by calling. Used to reduce allocations in `reader` function. Closes #493
1 parent 40211c2 commit 783a2c9

File tree

12 files changed

+169
-29
lines changed

12 files changed

+169
-29
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1717
* Added Future.cond (sync.Cond) and Future.finished bool. Added Future.finish() marks Future as done (#496).
1818
* Added function String() for type datetime (#322).
1919
* New `Future` interface (#470).
20+
* Method `Release` for `Future` and `Response` interface that allows
21+
to free used data directly by calling (#493).
2022

2123
### Changed
2224

connection.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ var (
3939
"to the current connection or connection pool")
4040
)
4141

42+
var slicePool = &sync.Pool{
43+
New: func() interface{} {
44+
buf := make([]byte, 1024)
45+
return &buf
46+
},
47+
}
48+
4249
const (
4350
// Connected signals that connection is established or reestablished.
4451
Connected ConnEventKind = iota + 1
@@ -373,7 +380,6 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
373380
}
374381

375382
conn.cond = sync.NewCond(&conn.mutex)
376-
377383
if conn.opts.Reconnect > 0 {
378384
// We don't need these mutex.Lock()/mutex.Unlock() here, but
379385
// runReconnects() expects mutex.Lock() to be set, so it's
@@ -844,12 +850,36 @@ func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
844850
return event, nil
845851
}
846852

853+
func getSlice(length int) *[]byte {
854+
bs, ok := slicePool.Get().(*[]byte)
855+
856+
if ok && cap(*bs) >= int(length) {
857+
clear((*bs)[:cap(*bs)])
858+
*bs = (*bs)[:length]
859+
return bs
860+
}
861+
862+
if ok {
863+
slicePool.Put(bs)
864+
}
865+
866+
b := make([]byte, length)
867+
868+
return &b
869+
}
870+
871+
func putSlice(buf *[]byte) {
872+
slicePool.Put(buf)
873+
}
874+
847875
func (conn *Connection) reader(r io.Reader, c Conn) {
848876
events := make(chan connWatchEvent, 1024)
849877
defer close(events)
850878

851879
go conn.eventer(events)
852880

881+
buf := &smallBuf{}
882+
853883
for atomic.LoadUint32(&conn.state) != connClosed {
854884
respBytes, err := read(r, conn.lenbuf[:])
855885
if err != nil {
@@ -860,8 +890,12 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
860890
conn.reconnect(err, c)
861891
return
862892
}
863-
buf := smallBuf{b: respBytes}
864-
header, code, err := decodeHeader(conn.dec, &buf)
893+
894+
buf.b = *respBytes
895+
buf.p = 0
896+
buf.ptr = respBytes
897+
898+
header, code, err := decodeHeader(conn.dec, buf)
865899
if err != nil {
866900
err = ClientError{
867901
ErrProtocolError,
@@ -873,7 +907,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
873907

874908
var fut *future = nil
875909
if code == iproto.IPROTO_EVENT {
876-
if event, err := readWatchEvent(&buf); err == nil {
910+
if event, err := readWatchEvent(buf); err == nil {
877911
events <- event
878912
} else {
879913
err = ClientError{
@@ -887,7 +921,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
887921
conn.opts.Logger.Report(LogBoxSessionPushUnsupported, conn, header)
888922
} else {
889923
if fut = conn.fetchFuture(header.RequestId); fut != nil {
890-
if err := fut.setResponse(header, &buf); err != nil {
924+
if err := fut.setResponse(header, buf); err != nil {
891925
fut.setError(fmt.Errorf("failed to set response: %w", err))
892926
}
893927
conn.markDone(fut)
@@ -1190,7 +1224,9 @@ func (conn *Connection) timeouts() {
11901224
}
11911225
}
11921226

1193-
func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
1227+
// read uses args to allocate slices for responses using sync.Pool.
1228+
// data must be released later using Release.
1229+
func read(r io.Reader, lenbuf []byte) (response *[]byte, err error) {
11941230
var length uint64
11951231

11961232
if _, err = io.ReadFull(r, lenbuf); err != nil {
@@ -1214,8 +1250,8 @@ func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
12141250
return
12151251
}
12161252

1217-
response = make([]byte, length)
1218-
_, err = io.ReadFull(r, response)
1253+
response = getSlice(int(length))
1254+
_, err = io.ReadFull(r, *response)
12191255

12201256
return
12211257
}

dial.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ func identify(ctx context.Context, conn Conn) (ProtocolInfo, error) {
481481
}
482482
return info, err
483483
}
484+
defer resp.Release()
484485
data, err := resp.Decode()
485486
if err != nil {
486487
return info, err
@@ -536,6 +537,7 @@ func checkProtocolInfo(required ProtocolInfo, actual ProtocolInfo) error {
536537
func authenticate(ctx context.Context, c Conn, auth Auth, user, pass, salt string) error {
537538
var req Request
538539
var err error
540+
var resp Response
539541

540542
switch auth {
541543
case ChapSha1Auth:
@@ -552,9 +554,10 @@ func authenticate(ctx context.Context, c Conn, auth Auth, user, pass, salt strin
552554
if err = writeRequest(ctx, c, req); err != nil {
553555
return err
554556
}
555-
if _, err = readResponse(ctx, c, req); err != nil {
557+
if resp, err = readResponse(ctx, c, req); err != nil {
556558
return err
557559
}
560+
resp.Release()
558561
return nil
559562
}
560563

@@ -620,17 +623,17 @@ func readResponse(ctx context.Context, conn Conn, req Request) (Response, error)
620623
return nil, fmt.Errorf("read error: %w", err)
621624
}
622625

623-
buf := smallBuf{b: respBytes}
626+
buf := &smallBuf{b: *respBytes, p: 0}
624627

625-
d := getDecoder(&buf)
628+
d := getDecoder(buf)
626629
defer putDecoder(d)
627630

628-
header, _, err := decodeHeader(d, &buf)
631+
header, _, err := decodeHeader(d, buf)
629632
if err != nil {
630633
return nil, fmt.Errorf("decode response header error: %w", err)
631634
}
632635

633-
resp, err := req.Response(header, &buf)
636+
resp, err := req.Response(header, buf)
634637
if err != nil {
635638
return nil, fmt.Errorf("creating response error: %w", err)
636639
}

example_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,7 @@ func ExampleConnection_Do_failure() {
13131313

13141314
// We got a future, the request actually not performed yet.
13151315
future := conn.Do(req)
1316+
defer future.Release()
13161317

13171318
// When the future receives the response, the result of the Future is set
13181319
// and becomes available. We could wait for that moment with Future.Get(),

future.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type Future interface {
1111
Get() ([]interface{}, error)
1212
GetTyped(result interface{}) error
1313
GetResponse() (Response, error)
14+
Release()
1415
WaitChan() <-chan struct{}
1516
}
1617

@@ -188,3 +189,11 @@ func (fut *future) WaitChan() <-chan struct{} {
188189

189190
return fut.done
190191
}
192+
193+
// Release is freeing the Future resources.
194+
// After this, using this Future becomes invalid.
195+
func (fut *future) Release() {
196+
if fut.resp != nil {
197+
fut.resp.Release()
198+
}
199+
}

future_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ func (resp *futureMockResponse) Header() Header {
5555
return resp.header
5656
}
5757

58+
func (resp *futureMockResponse) Release() {
59+
// Nothing to do.
60+
}
61+
5862
func (resp *futureMockResponse) Decode() ([]interface{}, error) {
5963
resp.decodeCnt++
6064

@@ -219,6 +223,10 @@ func (*futureMock) WaitChan() <-chan struct{} {
219223
return nil
220224
}
221225

226+
func (f *futureMock) Release() {
227+
// Nothing to do.
228+
}
229+
222230
func TestFuture(t *testing.T) {
223231
fut := &futureMock{value: 5}
224232

prepared.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (req *PrepareRequest) Response(header Header, body io.Reader) (Response, er
9090
if err != nil {
9191
return nil, err
9292
}
93-
return &PrepareResponse{baseResponse: baseResp}, nil
93+
return &PrepareResponse{baseResponse: *baseResp}, nil
9494
}
9595

9696
// UnprepareRequest helps you to create an unprepare request object for
@@ -204,5 +204,5 @@ func (req *ExecutePreparedRequest) Response(header Header, body io.Reader) (Resp
204204
if err != nil {
205205
return nil, err
206206
}
207-
return &ExecuteResponse{baseResponse: baseResp}, nil
207+
return &ExecuteResponse{baseResponse: *baseResp}, nil
208208
}

request.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -620,11 +620,11 @@ func (req *SelectRequest) Context(ctx context.Context) *SelectRequest {
620620

621621
// Response creates a response for the SelectRequest.
622622
func (req *SelectRequest) Response(header Header, body io.Reader) (Response, error) {
623-
baseResp, err := createBaseResponse(header, body)
623+
SelectResp, err := createSelectResponse(header, body)
624624
if err != nil {
625625
return nil, err
626626
}
627-
return &SelectResponse{baseResponse: baseResp}, nil
627+
return SelectResp, nil
628628
}
629629

630630
// InsertRequest helps you to create an insert request object for execution
@@ -1154,7 +1154,7 @@ func (req *ExecuteRequest) Response(header Header, body io.Reader) (Response, er
11541154
if err != nil {
11551155
return nil, err
11561156
}
1157-
return &ExecuteResponse{baseResponse: baseResp}, nil
1157+
return &ExecuteResponse{baseResponse: *baseResp}, nil
11581158
}
11591159

11601160
// WatchOnceRequest synchronously fetches the value currently associated with a

response.go

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
type Response interface {
1313
// Header returns a response header.
1414
Header() Header
15+
// Release free responses data.
16+
Release()
1517
// Decode decodes a response.
1618
Decode() ([]interface{}, error)
1719
// DecodeTyped decodes a response into a given container res.
@@ -31,24 +33,39 @@ type baseResponse struct {
3133
err error
3234
}
3335

34-
func createBaseResponse(header Header, body io.Reader) (baseResponse, error) {
36+
func createBaseResponse(header Header, body io.Reader) (*baseResponse, error) {
37+
resp := &baseResponse{}
3538
if body == nil {
36-
return baseResponse{header: header}, nil
39+
resp.header = header
40+
return resp, nil
3741
}
3842
if buf, ok := body.(*smallBuf); ok {
39-
return baseResponse{header: header, buf: *buf}, nil
43+
resp.header = header
44+
resp.buf.b = buf.b
45+
resp.buf.p = buf.p
46+
resp.buf.ptr = buf.ptr
47+
return resp, nil
4048
}
4149
data, err := io.ReadAll(body)
4250
if err != nil {
43-
return baseResponse{}, err
51+
return resp, err
4452
}
45-
return baseResponse{header: header, buf: smallBuf{b: data}}, nil
53+
resp.header = header
54+
resp.buf.b = data
55+
return resp, nil
56+
}
57+
58+
func (resp *baseResponse) Release() {
59+
if resp.buf.ptr != nil {
60+
putSlice(resp.buf.ptr)
61+
}
62+
*resp = baseResponse{}
4663
}
4764

4865
// DecodeBaseResponse parse response header and body.
4966
func DecodeBaseResponse(header Header, body io.Reader) (Response, error) {
5067
resp, err := createBaseResponse(header, body)
51-
return &resp, err
68+
return resp, err
5269
}
5370

5471
// SelectResponse is used for the select requests.
@@ -670,6 +687,33 @@ func (resp *baseResponse) Header() Header {
670687
return resp.header
671688
}
672689

690+
func createSelectResponse(header Header, body io.Reader) (*SelectResponse, error) {
691+
resp := &SelectResponse{}
692+
if body == nil {
693+
resp.header = header
694+
return resp, nil
695+
}
696+
if buf, ok := body.(*smallBuf); ok {
697+
resp.header = header
698+
resp.buf.b = buf.b
699+
resp.buf.p = buf.p
700+
resp.buf.ptr = buf.ptr
701+
return resp, nil
702+
}
703+
data, err := io.ReadAll(body)
704+
if err != nil {
705+
return resp, err
706+
}
707+
resp.header = header
708+
resp.buf.b = data
709+
return resp, nil
710+
}
711+
712+
func (resp *SelectResponse) Release() {
713+
resp.baseResponse.Release()
714+
resp.pos = nil
715+
}
716+
673717
// Pos returns a position descriptor of the last selected tuple for the SelectResponse.
674718
// If the response was not decoded, this method will call Decode().
675719
func (resp *SelectResponse) Pos() ([]byte, error) {

smallbuf.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import (
66
)
77

88
type smallBuf struct {
9-
b []byte
10-
p int
9+
b []byte
10+
p int
11+
ptr *[]byte
1112
}
1213

1314
func (s *smallBuf) Read(d []byte) (l int, err error) {

0 commit comments

Comments
 (0)