Skip to content

Commit 441666a

Browse files
committed
Merge branch 'release/v0.6.1'
2 parents d929862 + 3273f8e commit 441666a

25 files changed

+472
-193
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## v0.6.1 - 13 Feb 2015
4+
5+
- Reduce GC by using buffers when reading and writing
6+
- Fixed encoding `time.Time` ignoring millseconds
7+
- Fixed pointers in structs that implement the `Marshaler`/`Unmarshaler` interfaces being ignored
8+
39
## v0.6.0 - 1 Feb 2015
410

511
There are some major changes to the driver with this release that are not related to the RethinkDB v1.16 release. Please have a read through them:

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
[Go](http://golang.org/) driver for [RethinkDB](http://www.rethinkdb.com/)
88

99

10-
Current version: v0.6.0 (RethinkDB v1.16.0)
10+
Current version: v0.6.1 (RethinkDB v1.16)
1111

1212
**Version 0.6 introduced some small API changes and some significant internal changes, for more information check the [change log](CHANGELOG.md) and please be aware the driver is not yet stable**
1313

buffer.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package gorethink
2+
3+
import "io"
4+
5+
const defaultBufSize = 4096
6+
7+
// A buffer which is used for both reading and writing.
8+
// This is possible since communication on each connection is synchronous.
9+
// In other words, we can't write and read simultaneously on the same connection.
10+
// The buffer is similar to bufio.Reader / Writer but zero-copy-ish
11+
// Also highly optimized for this particular use case.
12+
type buffer struct {
13+
buf []byte
14+
rd io.Reader
15+
idx int
16+
length int
17+
}
18+
19+
func newBuffer(rd io.Reader) buffer {
20+
var b [defaultBufSize]byte
21+
return buffer{
22+
buf: b[:],
23+
rd: rd,
24+
}
25+
}
26+
27+
// fill reads into the buffer until at least _need_ bytes are in it
28+
func (b *buffer) fill(need int) error {
29+
n := b.length
30+
31+
// move existing data to the beginning
32+
if n > 0 && b.idx > 0 {
33+
copy(b.buf[0:n], b.buf[b.idx:])
34+
}
35+
36+
// grow buffer if necessary
37+
// TODO: let the buffer shrink again at some point
38+
// Maybe keep the org buf slice and swap back?
39+
if need > len(b.buf) {
40+
// Round up to the next multiple of the default size
41+
newBuf := make([]byte, ((need/defaultBufSize)+1)*defaultBufSize)
42+
copy(newBuf, b.buf)
43+
b.buf = newBuf
44+
}
45+
46+
b.idx = 0
47+
48+
for {
49+
nn, err := b.rd.Read(b.buf[n:])
50+
n += nn
51+
52+
switch err {
53+
case nil:
54+
if n < need {
55+
continue
56+
}
57+
b.length = n
58+
return nil
59+
60+
case io.EOF:
61+
if n >= need {
62+
b.length = n
63+
return nil
64+
}
65+
return io.ErrUnexpectedEOF
66+
67+
default:
68+
return err
69+
}
70+
}
71+
}
72+
73+
// returns next N bytes from buffer.
74+
// The returned slice is only guaranteed to be valid until the next read
75+
func (b *buffer) readNext(need int) ([]byte, error) {
76+
if b.length < need {
77+
// refill
78+
if err := b.fill(need); err != nil {
79+
return nil, err
80+
}
81+
}
82+
83+
offset := b.idx
84+
b.idx += need
85+
b.length -= need
86+
return b.buf[offset:b.idx], nil
87+
}
88+
89+
// returns a buffer with the requested size.
90+
// If possible, a slice from the existing buffer is returned.
91+
// Otherwise a bigger buffer is made.
92+
// Only one buffer (total) can be used at a time.
93+
func (b *buffer) takeBuffer(length int) []byte {
94+
if b.length > 0 {
95+
return nil
96+
}
97+
98+
// test (cheap) general case first
99+
if length <= defaultBufSize || length <= cap(b.buf) {
100+
return b.buf[:length]
101+
}
102+
103+
return make([]byte, length)
104+
}
105+
106+
// shortcut which can be used if the requested buffer is guaranteed to be
107+
// smaller than defaultBufSize
108+
// Only one buffer (total) can be used at a time.
109+
func (b *buffer) takeSmallBuffer(length int) []byte {
110+
if b.length == 0 {
111+
return b.buf[:length]
112+
}
113+
return nil
114+
}
115+
116+
// takeCompleteBuffer returns the complete existing buffer.
117+
// This can be used if the necessary buffer size is unknown.
118+
// Only one buffer (total) can be used at a time.
119+
func (b *buffer) takeCompleteBuffer() []byte {
120+
if b.length == 0 {
121+
return b.buf
122+
}
123+
return nil
124+
}
125+
126+
var responseCache = make(chan *Response, 16)
127+
128+
func newCachedResponse() *Response {
129+
select {
130+
case r := <-responseCache:
131+
return r
132+
default:
133+
return new(Response)
134+
}
135+
}
136+
137+
func putResponse(r *Response) {
138+
*r = Response{} // zero it
139+
select {
140+
case responseCache <- r:
141+
default:
142+
}
143+
}

connection.go

Lines changed: 48 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package gorethink
22

33
import (
4-
"bufio"
54
"encoding/binary"
65
"encoding/json"
7-
"fmt"
86
"io"
97
"net"
108
"sync/atomic"
@@ -13,6 +11,10 @@ import (
1311
p "github.com/dancannon/gorethink/ql2"
1412
)
1513

14+
const (
15+
respHeaderLen = 12
16+
)
17+
1618
type Response struct {
1719
Token int64
1820
Type p.Response_ResponseType `json:"t"`
@@ -29,62 +31,53 @@ type Connection struct {
2931
token int64
3032
cursors map[int64]*Cursor
3133
bad bool
34+
35+
headerBuf [respHeaderLen]byte
36+
buf buffer
3237
}
3338

3439
// Dial closes the previous connection and attempts to connect again.
3540
func NewConnection(opts *ConnectOpts) (*Connection, error) {
36-
conn, err := net.Dial("tcp", opts.Address)
37-
if err != nil {
38-
return nil, RqlConnectionError{err.Error()}
39-
}
41+
var err error
4042

41-
// Send the protocol version to the server as a 4-byte little-endian-encoded integer
42-
if err := binary.Write(conn, binary.LittleEndian, p.VersionDummy_V0_3); err != nil {
43-
return nil, RqlConnectionError{err.Error()}
43+
// New mysqlConn
44+
c := &Connection{
45+
opts: opts,
46+
cursors: make(map[int64]*Cursor),
4447
}
4548

46-
// Send the length of the auth key to the server as a 4-byte little-endian-encoded integer
47-
if err := binary.Write(conn, binary.LittleEndian, uint32(len(opts.AuthKey))); err != nil {
48-
return nil, RqlConnectionError{err.Error()}
49+
// Connect to Server
50+
nd := net.Dialer{Timeout: c.opts.Timeout}
51+
c.conn, err = nd.Dial("tcp", c.opts.Address)
52+
if err != nil {
53+
return nil, err
4954
}
5055

51-
// Send the auth key as an ASCII string
52-
// If there is no auth key, skip this step
53-
if opts.AuthKey != "" {
54-
if _, err := io.WriteString(conn, opts.AuthKey); err != nil {
55-
return nil, RqlConnectionError{err.Error()}
56+
// Enable TCP Keepalives on TCP connections
57+
if tc, ok := c.conn.(*net.TCPConn); ok {
58+
if err := tc.SetKeepAlive(true); err != nil {
59+
// Don't send COM_QUIT before handshake.
60+
c.conn.Close()
61+
c.conn = nil
62+
return nil, err
5663
}
5764
}
5865

59-
// Send the protocol type as a 4-byte little-endian-encoded integer
60-
if err := binary.Write(conn, binary.LittleEndian, p.VersionDummy_JSON); err != nil {
61-
return nil, RqlConnectionError{err.Error()}
62-
}
66+
c.buf = newBuffer(c.conn)
6367

64-
// read server response to authorization key (terminated by NUL)
65-
reader := bufio.NewReader(conn)
66-
line, err := reader.ReadBytes('\x00')
67-
if err != nil {
68-
if err == io.EOF {
69-
return nil, fmt.Errorf("Unexpected EOF: %s", string(line))
70-
}
71-
return nil, RqlConnectionError{err.Error()}
72-
}
73-
// convert to string and remove trailing NUL byte
74-
response := string(line[:len(line)-1])
75-
if response != "SUCCESS" {
76-
// we failed authorization or something else terrible happened
77-
return nil, RqlDriverError{fmt.Sprintf("Server dropped connection with message: \"%s\"", response)}
68+
// Send handshake request
69+
if err = c.writeHandshakeReq(); err != nil {
70+
c.Close()
71+
return nil, err
7872
}
7973

80-
c := &Connection{
81-
opts: opts,
82-
conn: conn,
83-
cursors: make(map[int64]*Cursor),
74+
// Read handshake response
75+
err = c.readHandshakeSuccess()
76+
if err != nil {
77+
c.Close()
78+
return nil, err
8479
}
8580

86-
c.conn.SetDeadline(time.Time{})
87-
8881
return c, nil
8982
}
9083

@@ -127,9 +120,8 @@ func (c *Connection) Query(q Query) (*Response, *Cursor, error) {
127120
return nil, nil, nil
128121
}
129122

130-
var response *Response
131123
for {
132-
response, err = c.readResponse()
124+
response, err := c.readResponse()
133125
if err != nil {
134126
return nil, nil, err
135127
}
@@ -140,6 +132,8 @@ func (c *Connection) Query(q Query) (*Response, *Cursor, error) {
140132
} else if _, ok := c.cursors[response.Token]; ok {
141133
// If the token is in the cursor cache then process the response
142134
c.processResponse(q, response)
135+
} else {
136+
putResponse(response)
143137
}
144138
}
145139
}
@@ -158,21 +152,8 @@ func (c *Connection) sendQuery(q Query) error {
158152
c.conn.SetDeadline(time.Now().Add(c.opts.Timeout))
159153
}
160154

161-
// Send a unique 8-byte token
162-
if err = binary.Write(c.conn, binary.LittleEndian, q.Token); err != nil {
163-
c.bad = true
164-
return RqlConnectionError{err.Error()}
165-
}
166-
167-
// Send the length of the JSON-encoded query as a 4-byte
168-
// little-endian-encoded integer.
169-
if err = binary.Write(c.conn, binary.LittleEndian, uint32(len(b))); err != nil {
170-
c.bad = true
171-
return RqlConnectionError{err.Error()}
172-
}
173-
174155
// Send the JSON encoding of the query itself.
175-
if err = binary.Write(c.conn, binary.BigEndian, b); err != nil {
156+
if err = c.writeQuery(q.Token, b); err != nil {
176157
c.bad = true
177158
return RqlConnectionError{err.Error()}
178159
}
@@ -187,30 +168,24 @@ func (c *Connection) nextToken() int64 {
187168
}
188169

189170
func (c *Connection) readResponse() (*Response, error) {
190-
// Read the 8-byte token of the query the response corresponds to.
191-
var responseToken int64
192-
if err := binary.Read(c.conn, binary.LittleEndian, &responseToken); err != nil {
193-
c.bad = true
194-
return nil, RqlConnectionError{err.Error()}
171+
// Read response header (token+length)
172+
_, err := io.ReadFull(c.conn, c.headerBuf[:respHeaderLen])
173+
if err != nil {
174+
return nil, err
195175
}
196176

197-
// Read the length of the JSON-encoded response as a 4-byte
198-
// little-endian-encoded integer.
199-
var messageLength uint32
200-
if err := binary.Read(c.conn, binary.LittleEndian, &messageLength); err != nil {
201-
c.bad = true
202-
return nil, RqlConnectionError{err.Error()}
203-
}
177+
responseToken := int64(binary.LittleEndian.Uint64(c.headerBuf[:8]))
178+
messageLength := binary.LittleEndian.Uint32(c.headerBuf[8:])
204179

205180
// Read the JSON encoding of the Response itself.
206-
b := make([]byte, messageLength)
207-
if _, err := io.ReadFull(c.conn, b); err != nil {
181+
b := c.buf.takeBuffer(int(messageLength))
182+
if _, err := io.ReadFull(c.conn, b[:]); err != nil {
208183
c.bad = true
209184
return nil, RqlConnectionError{err.Error()}
210185
}
211186

212187
// Decode the response
213-
var response = new(Response)
188+
var response = newCachedResponse()
214189
if err := json.Unmarshal(b, response); err != nil {
215190
c.bad = true
216191
return nil, RqlDriverError{err.Error()}
@@ -239,6 +214,7 @@ func (c *Connection) processResponse(q Query, response *Response) (*Response, *C
239214
case p.Response_WAIT_COMPLETE:
240215
return c.processWaitResponse(q, response)
241216
default:
217+
putResponse(response)
242218
return nil, nil, RqlDriverError{"Unexpected response type"}
243219
}
244220
}

0 commit comments

Comments
 (0)