@@ -3,19 +3,29 @@ package gorethink
33import (
44 "bufio"
55 "encoding/binary"
6+ "encoding/json"
67 "fmt"
78 "io"
89 "net"
910 "sync"
1011 "time"
1112
12- "code.google.com/p/goprotobuf/proto"
13+ "gopkg.in/fatih/pool.v1"
14+
1315 p "github.com/dancannon/gorethink/ql2"
1416)
1517
18+ type Response struct {
19+ Token int64
20+ Type p.Response_ResponseType `json:"t"`
21+ Responses []interface {} `json:"r"`
22+ Backtrace []interface {} `json:"b"`
23+ Profile interface {} `json:"p"`
24+ }
25+
1626type Conn interface {
1727 SendQuery (s * Session , q * p.Query , t Term , opts map [string ]interface {}, async bool ) (* Cursor , error )
18- ReadResponse (s * Session , token int64 ) (* p. Response , error )
28+ ReadResponse (s * Session , token int64 ) (* Response , error )
1929 Close () error
2030}
2131
@@ -31,55 +41,54 @@ type Connection struct {
3141}
3242
3343// Dial closes the previous connection and attempts to connect again.
34- func Dial (s * Session ) (* Connection , error ) {
35- conn , err := net .Dial ("tcp" , s .address )
36- if err != nil {
37- return nil , RqlConnectionError {err .Error ()}
38- }
39-
40- // Send the protocol version to the server as a 4-byte little-endian-encoded integer
41- if err := binary .Write (conn , binary .LittleEndian , p .VersionDummy_V0_3 ); err != nil {
42- return nil , RqlConnectionError {err .Error ()}
43- }
44+ func Dial (s * Session ) pool.Factory {
45+ return func () (net.Conn , error ) {
46+ conn , err := net .Dial ("tcp" , s .address )
47+ if err != nil {
48+ return nil , RqlConnectionError {err .Error ()}
49+ }
4450
45- // Send the length of the auth key to the server as a 4-byte little-endian-encoded integer
46- if err := binary .Write (conn , binary .LittleEndian , uint32 ( len ( s . authkey )) ); err != nil {
47- return nil , RqlConnectionError {err .Error ()}
48- }
51+ // Send the protocol version to the server as a 4-byte little-endian-encoded integer
52+ if err := binary .Write (conn , binary .LittleEndian , p . VersionDummy_V0_3 ); err != nil {
53+ return nil , RqlConnectionError {err .Error ()}
54+ }
4955
50- // Send the auth key as an ASCII string
51- // If there is no auth key, skip this step
52- if s .authkey != "" {
53- if _ , err := io .WriteString (conn , s .authkey ); err != nil {
56+ // Send the length of the auth key to the server as a 4-byte little-endian-encoded integer
57+ if err := binary .Write (conn , binary .LittleEndian , uint32 (len (s .authkey ))); err != nil {
5458 return nil , RqlConnectionError {err .Error ()}
5559 }
56- }
5760
58- // Send the protocol type as a 4-byte little-endian-encoded integer
59- if err := binary .Write (conn , binary .LittleEndian , p .VersionDummy_PROTOBUF ); err != nil {
60- return nil , RqlConnectionError {err .Error ()}
61- }
61+ // Send the auth key as an ASCII string
62+ // If there is no auth key, skip this step
63+ if s .authkey != "" {
64+ if _ , err := io .WriteString (conn , s .authkey ); err != nil {
65+ return nil , RqlConnectionError {err .Error ()}
66+ }
67+ }
6268
63- // read server response to authorization key (terminated by NUL)
64- reader := bufio .NewReader (conn )
65- line , err := reader .ReadBytes ('\x00' )
66- if err != nil {
67- if err == io .EOF {
68- return nil , fmt .Errorf ("Unexpected EOF: %s" , string (line ))
69+ // Send the protocol type as a 4-byte little-endian-encoded integer
70+ if err := binary .Write (conn , binary .LittleEndian , p .VersionDummy_JSON ); err != nil {
71+ return nil , RqlConnectionError {err .Error ()}
6972 }
70- return nil , RqlDriverError {err .Error ()}
71- }
72- // convert to string and remove trailing NUL byte
73- response := string (line [:len (line )- 1 ])
74- if response != "SUCCESS" {
75- // we failed authorization or something else terrible happened
76- return nil , RqlDriverError {fmt .Sprintf ("Server dropped connection with message: \" %s\" " , response )}
77- }
7873
79- return & Connection {
80- s : s ,
81- Conn : conn ,
82- }, nil
74+ // read server response to authorization key (terminated by NUL)
75+ reader := bufio .NewReader (conn )
76+ line , err := reader .ReadBytes ('\x00' )
77+ if err != nil {
78+ if err == io .EOF {
79+ return nil , fmt .Errorf ("Unexpected EOF: %s" , string (line ))
80+ }
81+ return nil , RqlDriverError {err .Error ()}
82+ }
83+ // convert to string and remove trailing NUL byte
84+ response := string (line [:len (line )- 1 ])
85+ if response != "SUCCESS" {
86+ // we failed authorization or something else terrible happened
87+ return nil , RqlDriverError {fmt .Sprintf ("Server dropped connection with message: \" %s\" " , response )}
88+ }
89+
90+ return conn , nil
91+ }
8392}
8493
8594func TestOnBorrow (c * Connection , t time.Time ) error {
@@ -97,26 +106,36 @@ func TestOnBorrow(c *Connection, t time.Time) error {
97106 return nil
98107}
99108
100- func (c * Connection ) ReadResponse (s * Session , token int64 ) (* p. Response , error ) {
109+ func (c * Connection ) ReadResponse (s * Session , token int64 ) (* Response , error ) {
101110 for {
111+ // Read the 8-byte token of the query the response corresponds to.
112+ var responseToken int64
113+ if err := binary .Read (c , binary .LittleEndian , & responseToken ); err != nil {
114+ return nil , RqlConnectionError {err .Error ()}
115+ }
116+
117+ // Read the length of the JSON-encoded response as a 4-byte
118+ // little-endian-encoded integer.
102119 var messageLength uint32
103120 if err := binary .Read (c , binary .LittleEndian , & messageLength ); err != nil {
104- c .Close ()
105121 return nil , RqlConnectionError {err .Error ()}
106122 }
107123
108- buffer := make ([] byte , messageLength )
109- if _ , err := io . ReadFull ( c , buffer ); err != nil {
110- c . Close ()
124+ // Read the JSON encoding of the Response itself.
125+ b := make ([] byte , messageLength )
126+ if _ , err := io . ReadFull ( c , b ); err != nil {
111127 return nil , RqlDriverError {err .Error ()}
112128 }
113129
114- response := & p.Response {}
115- if err := proto .Unmarshal (buffer , response ); err != nil {
130+ // Decode the response
131+ var response = new (Response )
132+ response .Token = responseToken
133+ err := json .Unmarshal (b , response )
134+ if err != nil {
116135 return nil , RqlDriverError {err .Error ()}
117136 }
118137
119- if response . GetToken () == token {
138+ if responseToken == token {
120139 return response , nil
121140 } else if cursor , ok := s .checkCache (token ); ok {
122141 // Handle batch response
@@ -127,10 +146,15 @@ func (c *Connection) ReadResponse(s *Session, token int64) (*p.Response, error)
127146 }
128147}
129148
130- func (c * Connection ) SendQuery (s * Session , q * p.Query , t Term , opts map [string ]interface {}, async bool ) (* Cursor , error ) {
131- var data []byte
149+ func (c * Connection ) SendQuery (s * Session , q Query , opts map [string ]interface {}, async bool ) (* Cursor , error ) {
132150 var err error
133151
152+ // Build query
153+ b , err := json .Marshal (q .build ())
154+ if err != nil {
155+ return nil , RqlDriverError {"Error building query" }
156+ }
157+
134158 // Ensure that the connection is not closed
135159 if s .closed {
136160 return nil , RqlDriverError {"Connection is closed" }
@@ -143,63 +167,52 @@ func (c *Connection) SendQuery(s *Session, q *p.Query, t Term, opts map[string]i
143167 c .SetDeadline (time .Now ().Add (s .timeout ))
144168 }
145169
146- // Send query
147- if data , err = proto .Marshal (q ); err != nil {
148- return nil , RqlDriverError {err .Error ()}
149- }
150- if err = binary .Write (c , binary .LittleEndian , uint32 (len (data ))); err != nil {
170+ // Send a unique 8-byte token
171+ if err = binary .Write (c , binary .LittleEndian , q .Token ); err != nil {
151172 c .Close ()
152173 return nil , RqlConnectionError {err .Error ()}
153174 }
154175
155- if err = binary .Write (c , binary .BigEndian , data ); err != nil {
156- c .Close ()
176+ // Send the length of the JSON-encoded query as a 4-byte
177+ // little-endian-encoded integer.
178+ if err = binary .Write (c , binary .LittleEndian , uint32 (len (b ))); err != nil {
179+ return nil , RqlConnectionError {err .Error ()}
180+ }
181+
182+ // Send the JSON encoding of the query itself.
183+ if err = binary .Write (c , binary .BigEndian , b ); err != nil {
157184 return nil , RqlConnectionError {err .Error ()}
158185 }
159186
160187 // Return immediately if the noreply option was set
161- if noreply , ok := opts ["noreply" ]; ok && noreply .(bool ) {
162- c .Close ()
163- return nil , nil
164- } else if async {
188+ if noreply , ok := opts ["noreply" ]; (ok && noreply .(bool )) || async {
165189 return nil , nil
166190 }
167191
168192 // Get response
169- response , err := c .ReadResponse (s , * q .Token )
193+ response , err := c .ReadResponse (s , q .Token )
170194 if err != nil {
171195 return nil , err
172196 }
173197
174- err = checkErrorResponse (response , t )
198+ err = checkErrorResponse (response , q . Term )
175199 if err != nil {
176200 return nil , err
177201 }
178202
179- // De-construct the profile datum if it exists
180- var profile interface {}
181- if response .GetProfile () != nil {
182- var err error
183-
184- profile , err = deconstructDatum (response .GetProfile (), opts )
185- if err != nil {
186- return nil , RqlDriverError {err .Error ()}
187- }
188- }
189-
190203 // De-construct datum and return a cursor
191- switch response .GetType () {
204+ switch response .Type {
192205 case p .Response_SUCCESS_PARTIAL , p .Response_SUCCESS_SEQUENCE , p .Response_SUCCESS_FEED :
193206 cursor := & Cursor {
194207 session : s ,
195208 conn : c ,
196209 query : q ,
197- term : t ,
210+ term : * q . Term ,
198211 opts : opts ,
199- profile : profile ,
212+ profile : response . Profile ,
200213 }
201214
202- s .setCache (* q .Token , cursor )
215+ s .setCache (q .Token , cursor )
203216
204217 cursor .extend (response )
205218
@@ -208,17 +221,15 @@ func (c *Connection) SendQuery(s *Session, q *p.Query, t Term, opts map[string]i
208221 var value []interface {}
209222 var err error
210223
211- if len (response .GetResponse () ) < 1 {
224+ if len (response .Responses ) < 1 {
212225 value = []interface {}{}
213- } else if response .GetResponse ()[0 ].GetType () == p .Datum_R_ARRAY {
214- value , err = deconstructDatums (response .GetResponse ()[0 ].GetRArray (), opts )
215- if err != nil {
216- return nil , err
217- }
218226 } else {
219227 var v interface {}
220228
221- v , err = deconstructDatum (response .GetResponse ()[0 ], opts )
229+ v , err = recursivelyConvertPseudotype (response .Responses [0 ], opts )
230+ if err != nil {
231+ return nil , err
232+ }
222233 if err != nil {
223234 return nil , RqlDriverError {err .Error ()}
224235 }
@@ -236,9 +247,9 @@ func (c *Connection) SendQuery(s *Session, q *p.Query, t Term, opts map[string]i
236247 session : s ,
237248 conn : c ,
238249 query : q ,
239- term : t ,
250+ term : * q . Term ,
240251 opts : opts ,
241- profile : profile ,
252+ profile : response . Profile ,
242253 buffer : value ,
243254 finished : true ,
244255 }
@@ -247,7 +258,7 @@ func (c *Connection) SendQuery(s *Session, q *p.Query, t Term, opts map[string]i
247258 case p .Response_WAIT_COMPLETE :
248259 return nil , nil
249260 default :
250- return nil , RqlDriverError {fmt .Sprintf ("Unexpected response type received: %s" , response .GetType () )}
261+ return nil , RqlDriverError {fmt .Sprintf ("Unexpected response type received: %s" , response .Type )}
251262 }
252263}
253264
@@ -265,28 +276,28 @@ func (c *Connection) CloseNoWait() error {
265276 c .closed = true
266277 c .Unlock ()
267278
268- return c .Conn . Close ( )
279+ return c .s . pool . Put ( c . Conn )
269280}
270281
271282// noreplyWaitQuery sends the NOREPLY_WAIT query to the server.
272283// TODO: Removed duplicated functions in connection and session
273284// for NoReplyWait
274285func (c * Connection ) NoreplyWait () error {
275- q := & p. Query {
276- Type : p .Query_NOREPLY_WAIT . Enum () ,
277- Token : proto . Int64 ( c .s .nextToken () ),
286+ q := Query {
287+ Type : p .Query_NOREPLY_WAIT ,
288+ Token : c .s .nextToken (),
278289 }
279290
280- _ , err := c .SendQuery (c .s , q , Term {}, map [string ]interface {}{}, false )
291+ _ , err := c .SendQuery (c .s , q , map [string ]interface {}{}, false )
281292 if err != nil {
282293 return err
283294 }
284295
285296 return nil
286297}
287298
288- func checkErrorResponse (response * p. Response , t Term ) error {
289- switch response .GetType () {
299+ func checkErrorResponse (response * Response , t * Term ) error {
300+ switch response .Type {
290301 case p .Response_CLIENT_ERROR :
291302 return RqlClientError {rqlResponseError {response , t }}
292303 case p .Response_COMPILE_ERROR :
0 commit comments