Skip to content

Commit e61434b

Browse files
committed
api: added future/response releasing methods
Added method Release to Future and Response's interface, that allows to free used data directly by calling. Used to reduce allocations in `reader` function. Closes #493
1 parent 68a7cd9 commit e61434b

File tree

5 files changed

+90
-19
lines changed

5 files changed

+90
-19
lines changed

connection.go

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ var (
3939
"to the current connection or connection pool")
4040
)
4141

42+
var slicePool = &sync.Pool{
43+
New: func() interface{} {
44+
buf := make([]byte, 1024)
45+
return &buf
46+
},
47+
}
48+
4249
const (
4350
// Connected signals that connection is established or reestablished.
4451
Connected ConnEventKind = iota + 1
@@ -843,12 +850,36 @@ func readWatchEvent(reader io.Reader) (connWatchEvent, error) {
843850
return event, nil
844851
}
845852

853+
func getSlice(length int) *[]byte {
854+
bs, ok := slicePool.Get().(*[]byte)
855+
856+
if ok && cap(*bs) >= length {
857+
clear((*bs)[:cap(*bs)])
858+
*bs = (*bs)[:length]
859+
return bs
860+
}
861+
862+
if ok {
863+
slicePool.Put(bs)
864+
}
865+
866+
b := make([]byte, length)
867+
868+
return &b
869+
}
870+
871+
func putSlice(buf *[]byte) {
872+
slicePool.Put(buf)
873+
}
874+
846875
func (conn *Connection) reader(r io.Reader, c Conn) {
847876
events := make(chan connWatchEvent, 1024)
848877
defer close(events)
849878

850879
go conn.eventer(events)
851880

881+
buf := &smallBuf{}
882+
852883
for atomic.LoadUint32(&conn.state) != connClosed {
853884
respBytes, err := read(r, conn.lenbuf[:])
854885
if err != nil {
@@ -860,8 +891,11 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
860891
return
861892
}
862893

863-
buf := smallBuf{b: respBytes}
864-
header, code, err := decodeHeader(conn.dec, &buf)
894+
buf.b = *respBytes
895+
buf.p = 0
896+
buf.ptr = respBytes
897+
898+
header, code, err := decodeHeader(conn.dec, buf)
865899

866900
if err != nil {
867901
err = ClientError{
@@ -874,7 +908,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
874908

875909
var fut *future = nil
876910
if code == iproto.IPROTO_EVENT {
877-
if event, err := readWatchEvent(&buf); err == nil {
911+
if event, err := readWatchEvent(buf); err == nil {
878912
events <- event
879913
} else {
880914
err = ClientError{
@@ -888,7 +922,7 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
888922
conn.opts.Logger.Report(LogBoxSessionPushUnsupported, conn, header)
889923
} else {
890924
if fut = conn.fetchFuture(header.RequestId); fut != nil {
891-
if err := fut.setResponse(header, &buf); err != nil {
925+
if err := fut.setResponse(header, buf); err != nil {
892926
fut.setError(fmt.Errorf("failed to set response: %w", err))
893927
}
894928
conn.markDone(fut)
@@ -1191,7 +1225,9 @@ func (conn *Connection) timeouts() {
11911225
}
11921226
}
11931227

1194-
func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
1228+
// read uses args to allocate slices for responses using sync.Pool.
1229+
// data must be released later using Release.
1230+
func read(r io.Reader, lenbuf []byte) (response *[]byte, err error) {
11951231
var length uint64
11961232

11971233
if _, err = io.ReadFull(r, lenbuf); err != nil {
@@ -1215,8 +1251,8 @@ func read(r io.Reader, lenbuf []byte) (response []byte, err error) {
12151251
return
12161252
}
12171253

1218-
response = make([]byte, length)
1219-
_, err = io.ReadFull(r, response)
1254+
response = getSlice(int(length))
1255+
_, err = io.ReadFull(r, *response)
12201256

12211257
return
12221258
}

dial.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -623,17 +623,17 @@ func readResponse(ctx context.Context, conn Conn, req Request) (Response, error)
623623
return nil, fmt.Errorf("read error: %w", err)
624624
}
625625

626-
buf := smallBuf{b: respBytes, p: 0}
626+
buf := &smallBuf{b: *respBytes, p: 0}
627627

628-
d := getDecoder(&buf)
628+
d := getDecoder(buf)
629629
defer putDecoder(d)
630630

631-
header, _, err := decodeHeader(d, &buf)
631+
header, _, err := decodeHeader(d, buf)
632632
if err != nil {
633633
return nil, fmt.Errorf("decode response header error: %w", err)
634634
}
635635

636-
resp, err := req.Response(header, &buf)
636+
resp, err := req.Response(header, buf)
637637
if err != nil {
638638
return nil, fmt.Errorf("creating response error: %w", err)
639639
}

request.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -620,11 +620,12 @@ func (req *SelectRequest) Context(ctx context.Context) *SelectRequest {
620620

621621
// Response creates a response for the SelectRequest.
622622
func (req *SelectRequest) Response(header Header, body io.Reader) (Response, error) {
623-
baseResp, err := createBaseResponse(header, body)
623+
SelectResp, err := createSelectResponse(header, body)
624624
if err != nil {
625625
return nil, err
626626
}
627-
return &SelectResponse{baseResponse: *baseResp}, nil
627+
628+
return SelectResp, nil
628629
}
629630

630631
// InsertRequest helps you to create an insert request object for execution

response.go

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,31 @@ type baseResponse struct {
3434
}
3535

3636
func createBaseResponse(header Header, body io.Reader) (*baseResponse, error) {
37+
resp := &baseResponse{}
3738
if body == nil {
38-
return &baseResponse{header: header}, nil
39+
resp.header = header
40+
return resp, nil
3941
}
4042
if buf, ok := body.(*smallBuf); ok {
41-
return &baseResponse{header: header, buf: *buf}, nil
43+
resp.header = header
44+
resp.buf.b = buf.b
45+
resp.buf.p = buf.p
46+
resp.buf.ptr = buf.ptr
47+
return resp, nil
4248
}
4349
data, err := io.ReadAll(body)
4450
if err != nil {
45-
return nil, err
51+
return resp, err
4652
}
47-
return &baseResponse{header: header, buf: smallBuf{b: data}}, nil
53+
resp.header = header
54+
resp.buf.b = data
55+
return resp, nil
4856
}
4957

5058
func (resp *baseResponse) Release() {
59+
if resp.buf.ptr != nil {
60+
putSlice(resp.buf.ptr)
61+
}
5162
*resp = baseResponse{}
5263
}
5364

@@ -676,6 +687,28 @@ func (resp *baseResponse) Header() Header {
676687
return resp.header
677688
}
678689

690+
func createSelectResponse(header Header, body io.Reader) (*SelectResponse, error) {
691+
resp := &SelectResponse{}
692+
if body == nil {
693+
resp.header = header
694+
return resp, nil
695+
}
696+
if buf, ok := body.(*smallBuf); ok {
697+
resp.header = header
698+
resp.buf.b = buf.b
699+
resp.buf.p = buf.p
700+
resp.buf.ptr = buf.ptr
701+
return resp, nil
702+
}
703+
data, err := io.ReadAll(body)
704+
if err != nil {
705+
return resp, err
706+
}
707+
resp.header = header
708+
resp.buf.b = data
709+
return resp, nil
710+
}
711+
679712
func (resp *SelectResponse) Release() {
680713
resp.baseResponse.Release()
681714
resp.pos = nil

smallbuf.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import (
66
)
77

88
type smallBuf struct {
9-
b []byte
10-
p int
9+
b []byte
10+
p int
11+
ptr *[]byte
1112
}
1213

1314
func (s *smallBuf) Read(d []byte) (l int, err error) {

0 commit comments

Comments
 (0)