@@ -10,54 +10,42 @@ package mysql
1010
1111import (
1212 "io"
13- "net"
14- "time"
1513)
1614
1715const defaultBufSize = 4096
1816const maxCachedBufSize = 256 * 1024
1917
18+ // readerFunc is a function that compatible with io.Reader.
19+ // We use this function type instead of io.Reader because we want to
20+ // just pass mc.readWithTimeout.
21+ type readerFunc func ([]byte ) (int , error )
22+
2023// A buffer which is used for both reading and writing.
2124// This is possible since communication on each connection is synchronous.
2225// In other words, we can't write and read simultaneously on the same connection.
2326// The buffer is similar to bufio.Reader / Writer but zero-copy-ish
2427// Also highly optimized for this particular use case.
25- // This buffer is backed by two byte slices in a double-buffering scheme
2628type buffer struct {
27- buf []byte // buf is a byte buffer who's length and capacity are equal.
28- nc net.Conn
29- idx int
30- length int
31- timeout time.Duration
32- dbuf [2 ][]byte // dbuf is an array with the two byte slices that back this buffer
33- flipcnt uint // flipccnt is the current buffer counter for double-buffering
29+ buf []byte // read buffer.
30+ cachedBuf []byte // buffer that will be reused. len(cachedBuf) <= maxCachedBufSize.
3431}
3532
3633// newBuffer allocates and returns a new buffer.
37- func newBuffer (nc net.Conn ) buffer {
38- fg := make ([]byte , defaultBufSize )
34+ func newBuffer () buffer {
3935 return buffer {
40- buf : fg ,
41- nc : nc ,
42- dbuf : [2 ][]byte {fg , nil },
36+ cachedBuf : make ([]byte , defaultBufSize ),
4337 }
4438}
4539
46- // flip replaces the active buffer with the background buffer
47- // this is a delayed flip that simply increases the buffer counter;
48- // the actual flip will be performed the next time we call `buffer.fill`
49- func (b * buffer ) flip () {
50- b .flipcnt += 1
40+ // busy returns true if the read buffer is not empty.
41+ func (b * buffer ) busy () bool {
42+ return len (b .buf ) > 0
5143}
5244
53- // fill reads into the buffer until at least _need_ bytes are in it
54- func (b * buffer ) fill (need int ) error {
55- n := b .length
56- // fill data into its double-buffering target: if we've called
57- // flip on this buffer, we'll be copying to the background buffer,
58- // and then filling it with network data; otherwise we'll just move
59- // the contents of the current buffer to the front before filling it
60- dest := b .dbuf [b .flipcnt & 1 ]
45+ // fill reads into the read buffer until at least _need_ bytes are in it.
46+ func (b * buffer ) fill (need int , r readerFunc ) error {
47+ // we'll move the contents of the current buffer to dest before filling it.
48+ dest := b .cachedBuf
6149
6250 // grow buffer if necessary to fit the whole packet.
6351 if need > len (dest ) {
@@ -67,83 +55,67 @@ func (b *buffer) fill(need int) error {
6755 // if the allocated buffer is not too large, move it to backing storage
6856 // to prevent extra allocations on applications that perform large reads
6957 if len (dest ) <= maxCachedBufSize {
70- b .dbuf [ b . flipcnt & 1 ] = dest
58+ b .cachedBuf = dest
7159 }
7260 }
7361
74- // if we're filling the fg buffer, move the existing data to the start of it.
75- // if we're filling the bg buffer, copy over the data
76- if n > 0 {
77- copy (dest [:n ], b .buf [b .idx :])
78- }
79-
80- b .buf = dest
81- b .idx = 0
62+ // move the existing data to the start of the buffer.
63+ n := len (b .buf )
64+ copy (dest [:n ], b .buf )
8265
8366 for {
84- if b .timeout > 0 {
85- if err := b .nc .SetReadDeadline (time .Now ().Add (b .timeout )); err != nil {
86- return err
87- }
67+ nn , err := r (dest [n :])
68+ n += nn
69+
70+ if err == nil && n < need {
71+ continue
8872 }
8973
90- nn , err := b .nc .Read (b .buf [n :])
91- n += nn
74+ b .buf = dest [:n ]
9275
93- switch err {
94- case nil :
76+ if err == io .EOF {
9577 if n < need {
96- continue
78+ err = io .ErrUnexpectedEOF
79+ } else {
80+ err = nil
9781 }
98- b .length = n
99- return nil
100-
101- case io .EOF :
102- if n >= need {
103- b .length = n
104- return nil
105- }
106- return io .ErrUnexpectedEOF
107-
108- default :
109- return err
11082 }
83+ return err
11184 }
11285}
11386
11487// returns next N bytes from buffer.
11588// The returned slice is only guaranteed to be valid until the next read
116- func (b * buffer ) readNext (need int ) ([]byte , error ) {
117- if b . length < need {
89+ func (b * buffer ) readNext (need int , r readerFunc ) ([]byte , error ) {
90+ if len ( b . buf ) < need {
11891 // refill
119- if err := b .fill (need ); err != nil {
92+ if err := b .fill (need , r ); err != nil {
12093 return nil , err
12194 }
12295 }
12396
124- offset := b .idx
125- b .idx += need
126- b .length -= need
127- return b .buf [offset :b .idx ], nil
97+ data := b .buf [:need ]
98+ b .buf = b .buf [need :]
99+ return data , nil
128100}
129101
130102// takeBuffer returns a buffer with the requested size.
131103// If possible, a slice from the existing buffer is returned.
132104// Otherwise a bigger buffer is made.
133105// Only one buffer (total) can be used at a time.
134106func (b * buffer ) takeBuffer (length int ) ([]byte , error ) {
135- if b .length > 0 {
107+ if b .busy () {
136108 return nil , ErrBusyBuffer
137109 }
138110
139111 // test (cheap) general case first
140- if length <= cap (b .buf ) {
141- return b .buf [:length ], nil
112+ if length <= len (b .cachedBuf ) {
113+ return b .cachedBuf [:length ], nil
142114 }
143115
144- if length < maxPacketSize {
145- b .buf = make ([]byte , length )
146- return b .buf , nil
116+ if length < maxCachedBufSize {
117+ b .cachedBuf = make ([]byte , length )
118+ return b .cachedBuf , nil
147119 }
148120
149121 // buffer is larger than we want to store.
@@ -154,29 +126,26 @@ func (b *buffer) takeBuffer(length int) ([]byte, error) {
154126// known to be smaller than defaultBufSize.
155127// Only one buffer (total) can be used at a time.
156128func (b * buffer ) takeSmallBuffer (length int ) ([]byte , error ) {
157- if b .length > 0 {
129+ if b .busy () {
158130 return nil , ErrBusyBuffer
159131 }
160- return b .buf [:length ], nil
132+ return b .cachedBuf [:length ], nil
161133}
162134
163135// takeCompleteBuffer returns the complete existing buffer.
164136// This can be used if the necessary buffer size is unknown.
165137// cap and len of the returned buffer will be equal.
166138// Only one buffer (total) can be used at a time.
167139func (b * buffer ) takeCompleteBuffer () ([]byte , error ) {
168- if b .length > 0 {
140+ if b .busy () {
169141 return nil , ErrBusyBuffer
170142 }
171- return b .buf , nil
143+ return b .cachedBuf , nil
172144}
173145
174146// store stores buf, an updated buffer, if its suitable to do so.
175- func (b * buffer ) store (buf []byte ) error {
176- if b .length > 0 {
177- return ErrBusyBuffer
178- } else if cap (buf ) <= maxPacketSize && cap (buf ) > cap (b .buf ) {
179- b .buf = buf [:cap (buf )]
147+ func (b * buffer ) store (buf []byte ) {
148+ if cap (buf ) <= maxCachedBufSize && cap (buf ) > cap (b .cachedBuf ) {
149+ b .cachedBuf = buf [:cap (buf )]
180150 }
181- return nil
182151}
0 commit comments