Skip to content

Commit 68a7cd9

Browse files
committed
api: added future/response releasing methods
Added method Release to Future and Response's interface, that allows to free used data directly by calling. Used to reduce allocations in `reader` function. Closes #493
1 parent 40211c2 commit 68a7cd9

File tree

12 files changed

+145
-16
lines changed

12 files changed

+145
-16
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
1717
* Added Future.cond (sync.Cond) and Future.finished bool. Added Future.finish() marks Future as done (#496).
1818
* Added function String() for type datetime (#322).
1919
* New `Future` interface (#470).
20+
* Method `Release` for `Future` and `Response` interface that allows
21+
to free used data directly by calling (#493).
2022

2123
### Changed
2224

connection.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,6 @@ func Connect(ctx context.Context, dialer Dialer, opts Opts) (conn *Connection, e
373373
}
374374

375375
conn.cond = sync.NewCond(&conn.mutex)
376-
377376
if conn.opts.Reconnect > 0 {
378377
// We don't need these mutex.Lock()/mutex.Unlock() here, but
379378
// runReconnects() expects mutex.Lock() to be set, so it's
@@ -860,8 +859,10 @@ func (conn *Connection) reader(r io.Reader, c Conn) {
860859
conn.reconnect(err, c)
861860
return
862861
}
862+
863863
buf := smallBuf{b: respBytes}
864864
header, code, err := decodeHeader(conn.dec, &buf)
865+
865866
if err != nil {
866867
err = ClientError{
867868
ErrProtocolError,

dial.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,7 @@ func identify(ctx context.Context, conn Conn) (ProtocolInfo, error) {
481481
}
482482
return info, err
483483
}
484+
defer resp.Release()
484485
data, err := resp.Decode()
485486
if err != nil {
486487
return info, err
@@ -536,6 +537,7 @@ func checkProtocolInfo(required ProtocolInfo, actual ProtocolInfo) error {
536537
func authenticate(ctx context.Context, c Conn, auth Auth, user, pass, salt string) error {
537538
var req Request
538539
var err error
540+
var resp Response
539541

540542
switch auth {
541543
case ChapSha1Auth:
@@ -552,9 +554,10 @@ func authenticate(ctx context.Context, c Conn, auth Auth, user, pass, salt strin
552554
if err = writeRequest(ctx, c, req); err != nil {
553555
return err
554556
}
555-
if _, err = readResponse(ctx, c, req); err != nil {
557+
if resp, err = readResponse(ctx, c, req); err != nil {
556558
return err
557559
}
560+
resp.Release()
558561
return nil
559562
}
560563

@@ -620,7 +623,7 @@ func readResponse(ctx context.Context, conn Conn, req Request) (Response, error)
620623
return nil, fmt.Errorf("read error: %w", err)
621624
}
622625

623-
buf := smallBuf{b: respBytes}
626+
buf := smallBuf{b: respBytes, p: 0}
624627

625628
d := getDecoder(&buf)
626629
defer putDecoder(d)

example_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,7 @@ func ExampleConnection_Do_failure() {
13131313

13141314
// We got a future, the request actually not performed yet.
13151315
future := conn.Do(req)
1316+
defer future.Release()
13161317

13171318
// When the future receives the response, the result of the Future is set
13181319
// and becomes available. We could wait for that moment with Future.Get(),

future.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ type Future interface {
1111
Get() ([]interface{}, error)
1212
GetTyped(result interface{}) error
1313
GetResponse() (Response, error)
14+
Release()
1415
WaitChan() <-chan struct{}
1516
}
1617

@@ -188,3 +189,11 @@ func (fut *future) WaitChan() <-chan struct{} {
188189

189190
return fut.done
190191
}
192+
193+
// Release is freeing the Future resources.
194+
// After this, using this Future becomes invalid.
195+
func (fut *future) Release() {
196+
if fut.resp != nil {
197+
fut.resp.Release()
198+
}
199+
}

future_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,17 @@ type futureMockResponse struct {
4949

5050
decodeCnt int
5151
decodeTypedCnt int
52+
released bool
5253
}
5354

5455
func (resp *futureMockResponse) Header() Header {
5556
return resp.header
5657
}
5758

59+
func (resp *futureMockResponse) Release() {
60+
resp.released = true
61+
}
62+
5863
func (resp *futureMockResponse) Decode() ([]interface{}, error) {
5964
resp.decodeCnt++
6065

@@ -133,6 +138,23 @@ func TestFuture_GetResponse(t *testing.T) {
133138
assert.Equal(t, []interface{}{uint8('v'), uint8('2')}, data)
134139
}
135140

141+
func TestFuture_Release(t *testing.T) {
142+
fut, err := NewFutureWithResponse(&futureMockRequest{},
143+
Header{}, bytes.NewReader([]byte{'v', '3'}))
144+
assert.NoError(t, err)
145+
146+
resp, err := fut.GetResponse()
147+
assert.NoError(t, err)
148+
mockResp, ok := resp.(*futureMockResponse)
149+
assert.True(t, ok)
150+
assert.False(t, mockResp.released)
151+
152+
// Doing efficient work.
153+
154+
fut.Release()
155+
assert.True(t, mockResp.released)
156+
}
157+
136158
func BenchmarkFuture_Get(b *testing.B) {
137159
fut, err := NewFutureWithResponse(&futureMockRequest{},
138160
Header{}, bytes.NewReader([]byte{'v', '3'}))
@@ -219,6 +241,10 @@ func (*futureMock) WaitChan() <-chan struct{} {
219241
return nil
220242
}
221243

244+
func (*futureMock) Release() {
245+
// Nothing to do.
246+
}
247+
222248
func TestFuture(t *testing.T) {
223249
fut := &futureMock{value: 5}
224250

prepared.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (req *PrepareRequest) Response(header Header, body io.Reader) (Response, er
9090
if err != nil {
9191
return nil, err
9292
}
93-
return &PrepareResponse{baseResponse: baseResp}, nil
93+
return &PrepareResponse{baseResponse: *baseResp}, nil
9494
}
9595

9696
// UnprepareRequest helps you to create an unprepare request object for
@@ -204,5 +204,5 @@ func (req *ExecutePreparedRequest) Response(header Header, body io.Reader) (Resp
204204
if err != nil {
205205
return nil, err
206206
}
207-
return &ExecuteResponse{baseResponse: baseResp}, nil
207+
return &ExecuteResponse{baseResponse: *baseResp}, nil
208208
}

request.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ func (req *SelectRequest) Response(header Header, body io.Reader) (Response, err
624624
if err != nil {
625625
return nil, err
626626
}
627-
return &SelectResponse{baseResponse: baseResp}, nil
627+
return &SelectResponse{baseResponse: *baseResp}, nil
628628
}
629629

630630
// InsertRequest helps you to create an insert request object for execution
@@ -1154,7 +1154,7 @@ func (req *ExecuteRequest) Response(header Header, body io.Reader) (Response, er
11541154
if err != nil {
11551155
return nil, err
11561156
}
1157-
return &ExecuteResponse{baseResponse: baseResp}, nil
1157+
return &ExecuteResponse{baseResponse: *baseResp}, nil
11581158
}
11591159

11601160
// WatchOnceRequest synchronously fetches the value currently associated with a

response.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
type Response interface {
1313
// Header returns a response header.
1414
Header() Header
15+
// Release free responses data.
16+
Release()
1517
// Decode decodes a response.
1618
Decode() ([]interface{}, error)
1719
// DecodeTyped decodes a response into a given container res.
@@ -31,24 +33,28 @@ type baseResponse struct {
3133
err error
3234
}
3335

34-
func createBaseResponse(header Header, body io.Reader) (baseResponse, error) {
36+
func createBaseResponse(header Header, body io.Reader) (*baseResponse, error) {
3537
if body == nil {
36-
return baseResponse{header: header}, nil
38+
return &baseResponse{header: header}, nil
3739
}
3840
if buf, ok := body.(*smallBuf); ok {
39-
return baseResponse{header: header, buf: *buf}, nil
41+
return &baseResponse{header: header, buf: *buf}, nil
4042
}
4143
data, err := io.ReadAll(body)
4244
if err != nil {
43-
return baseResponse{}, err
45+
return nil, err
4446
}
45-
return baseResponse{header: header, buf: smallBuf{b: data}}, nil
47+
return &baseResponse{header: header, buf: smallBuf{b: data}}, nil
48+
}
49+
50+
func (resp *baseResponse) Release() {
51+
*resp = baseResponse{}
4652
}
4753

4854
// DecodeBaseResponse parse response header and body.
4955
func DecodeBaseResponse(header Header, body io.Reader) (Response, error) {
5056
resp, err := createBaseResponse(header, body)
51-
return &resp, err
57+
return resp, err
5258
}
5359

5460
// SelectResponse is used for the select requests.
@@ -670,6 +676,11 @@ func (resp *baseResponse) Header() Header {
670676
return resp.header
671677
}
672678

679+
func (resp *SelectResponse) Release() {
680+
resp.baseResponse.Release()
681+
resp.pos = nil
682+
}
683+
673684
// Pos returns a position descriptor of the last selected tuple for the SelectResponse.
674685
// If the response was not decoded, this method will call Decode().
675686
func (resp *SelectResponse) Pos() ([]byte, error) {

response_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,42 @@ func TestDecodeBaseResponse(t *testing.T) {
5454
})
5555
}
5656
}
57+
58+
func TestBaseResponseRelease(t *testing.T) {
59+
header := tarantool.Header{RequestId: 123}
60+
buf := []byte{'v', '3'}
61+
62+
resp, err := tarantool.DecodeBaseResponse(header, encodeResponseData(t, buf))
63+
require.NoError(t, err)
64+
65+
data, err := resp.Decode()
66+
require.NoError(t, err)
67+
require.Equal(t, resp.Header(), header)
68+
require.Equal(t, []interface{}{buf}, data)
69+
70+
resp.Release()
71+
72+
data, err = resp.Decode()
73+
require.NoError(t, err)
74+
require.Equal(t, []interface{}(nil), data)
75+
}
76+
77+
func TestSelectResponseRelease(t *testing.T) {
78+
header := tarantool.Header{RequestId: 123}
79+
buf := []byte{'v', '3'}
80+
req := tarantool.NewSelectRequest(nil)
81+
82+
resp, err := req.Response(header, encodeResponseData(t, buf))
83+
require.NoError(t, err)
84+
85+
data, err := resp.Decode()
86+
require.NoError(t, err)
87+
require.Equal(t, resp.Header(), header)
88+
require.Equal(t, []interface{}{buf}, data)
89+
90+
resp.Release()
91+
92+
selResp, ok := resp.(*tarantool.SelectResponse)
93+
require.True(t, ok)
94+
require.Equal(t, tarantool.SelectResponse{}, *selResp)
95+
}

0 commit comments

Comments
 (0)