Skip to content

Commit c3d66cb

Browse files
committed
combined httpProtocol & Transport
1 parent 4da7623 commit c3d66cb

File tree

5 files changed

+431
-9
lines changed

5 files changed

+431
-9
lines changed

gremlin-go/driver/client.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,19 @@ type ClientSettings struct {
6060
EnableUserAgentOnConnect bool
6161
}
6262

63+
// protocol defines the interface for HTTP communication with Gremlin server
64+
type protocol interface {
65+
send(request *request) (ResultSet, error)
66+
close()
67+
}
68+
6369
// Client is used to connect and interact with a Gremlin-supported server.
6470
type Client struct {
6571
url string
6672
traversalSource string
6773
logHandler *logHandler
6874
connectionSettings *connectionSettings
69-
httpProtocol *httpProtocol
75+
protocol protocol
7076
}
7177

7278
// NewClient creates a Client and configures it with the given parameters.
@@ -111,14 +117,14 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C
111117

112118
logHandler := newLogHandler(settings.Logger, settings.LogVerbosity, settings.Language)
113119

114-
httpProt := newHttpProtocol(logHandler, url, connSettings)
120+
conn := newHttpConnection(logHandler, url, connSettings)
115121

116122
client := &Client{
117123
url: url,
118124
traversalSource: settings.TraversalSource,
119125
logHandler: logHandler,
120126
connectionSettings: connSettings,
121-
httpProtocol: httpProt,
127+
protocol: conn,
122128
}
123129

124130
return client, nil
@@ -142,7 +148,7 @@ func (client *Client) SubmitWithOptions(traversalString string, requestOptions R
142148

143149
// TODO interceptors (ie. auth)
144150

145-
rs, err := client.httpProtocol.send(&request)
151+
rs, err := client.protocol.send(&request)
146152
return rs, err
147153
}
148154

@@ -171,7 +177,7 @@ func (client *Client) submitGremlinLang(gremlinLang *GremlinLang) (ResultSet, er
171177
}
172178

173179
request := MakeStringRequest(gremlinLang.GetGremlin(), client.traversalSource, requestOptionsBuilder.Create())
174-
return client.httpProtocol.send(&request)
180+
return client.protocol.send(&request)
175181
}
176182

177183
func applyOptionsConfig(builder *RequestOptionsBuilder, config map[string]interface{}) *RequestOptionsBuilder {

gremlin-go/driver/driverRemoteConnection.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,14 @@ func NewDriverRemoteConnection(
100100

101101
logHandler := newLogHandler(settings.Logger, settings.LogVerbosity, settings.Language)
102102

103-
httpProt := newHttpProtocol(logHandler, url, connSettings)
103+
conn := newHttpConnection(logHandler, url, connSettings)
104104

105105
client := &Client{
106106
url: url,
107107
traversalSource: settings.TraversalSource,
108108
logHandler: logHandler,
109109
connectionSettings: connSettings,
110-
httpProtocol: httpProt,
110+
protocol: conn,
111111
}
112112

113113
return &DriverRemoteConnection{client: client, isClosed: false, settings: settings}, nil
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
*/
19+
20+
package gremlingo
21+
22+
import (
23+
"bytes"
24+
"compress/zlib"
25+
"io"
26+
"net/http"
27+
"time"
28+
)
29+
30+
// httpConnection handles HTTP request/response for Gremlin queries
31+
type httpConnection struct {
32+
url string
33+
httpClient *http.Client
34+
connSettings *connectionSettings
35+
logHandler *logHandler
36+
serializer *GraphBinarySerializer
37+
}
38+
39+
// Connection pool defaults aligned with Java driver
40+
const (
41+
defaultMaxConnsPerHost = 128 // Java: ConnectionPool.MAX_POOL_SIZE
42+
defaultMaxIdleConnsPerHost = 8 // Keep some connections warm
43+
defaultIdleConnTimeout = 180 * time.Second // Java: CONNECTION_IDLE_TIMEOUT_MILLIS
44+
defaultConnectionTimeout = 15 * time.Second // Java: CONNECTION_SETUP_TIMEOUT_MILLIS
45+
)
46+
47+
func newHttpConnection(handler *logHandler, url string, connSettings *connectionSettings) *httpConnection {
48+
timeout := connSettings.connectionTimeout
49+
if timeout == 0 {
50+
timeout = defaultConnectionTimeout
51+
}
52+
53+
transport := &http.Transport{
54+
TLSClientConfig: connSettings.tlsConfig,
55+
MaxConnsPerHost: defaultMaxConnsPerHost,
56+
MaxIdleConnsPerHost: defaultMaxIdleConnsPerHost,
57+
IdleConnTimeout: defaultIdleConnTimeout,
58+
DisableCompression: !connSettings.enableCompression,
59+
}
60+
61+
return &httpConnection{
62+
url: url,
63+
httpClient: &http.Client{Transport: transport, Timeout: timeout},
64+
connSettings: connSettings,
65+
logHandler: handler,
66+
serializer: newGraphBinarySerializer(handler),
67+
}
68+
}
69+
70+
// send sends request and streams results directly to ResultSet
71+
func (c *httpConnection) send(req *request) (ResultSet, error) {
72+
rs := newChannelResultSet()
73+
74+
data, err := c.serializer.SerializeMessage(req)
75+
if err != nil {
76+
rs.Close()
77+
return rs, err
78+
}
79+
80+
go c.executeAndStream(data, rs)
81+
82+
return rs, nil
83+
}
84+
85+
func (c *httpConnection) executeAndStream(data []byte, rs ResultSet) {
86+
defer rs.Close()
87+
88+
req, err := http.NewRequest(http.MethodPost, c.url, bytes.NewReader(data))
89+
if err != nil {
90+
c.logHandler.logf(Error, failedToSendRequest, err.Error())
91+
rs.setError(err)
92+
return
93+
}
94+
95+
c.setHeaders(req)
96+
97+
resp, err := c.httpClient.Do(req)
98+
if err != nil {
99+
c.logHandler.logf(Error, failedToSendRequest, err.Error())
100+
rs.setError(err)
101+
return
102+
}
103+
defer resp.Body.Close()
104+
105+
reader, zlibReader, err := c.getReader(resp)
106+
if err != nil {
107+
c.logHandler.logf(Error, failedToReceiveResponse, err.Error())
108+
rs.setError(err)
109+
return
110+
}
111+
if zlibReader != nil {
112+
defer zlibReader.Close()
113+
}
114+
115+
c.streamToResultSet(reader, rs)
116+
}
117+
118+
func (c *httpConnection) setHeaders(req *http.Request) {
119+
req.Header.Set("Content-Type", graphBinaryMimeType)
120+
req.Header.Set("Accept", graphBinaryMimeType)
121+
122+
if c.connSettings.enableUserAgentOnConnect {
123+
req.Header.Set(userAgentHeader, userAgent)
124+
}
125+
if c.connSettings.enableCompression {
126+
req.Header.Set("Accept-Encoding", "deflate")
127+
}
128+
if c.connSettings.authInfo != nil {
129+
if headers := c.connSettings.authInfo.GetHeader(); headers != nil {
130+
for k, vals := range headers {
131+
for _, v := range vals {
132+
req.Header.Add(k, v)
133+
}
134+
}
135+
}
136+
if ok, user, pass := c.connSettings.authInfo.GetBasicAuth(); ok {
137+
req.SetBasicAuth(user, pass)
138+
}
139+
}
140+
}
141+
142+
func (c *httpConnection) getReader(resp *http.Response) (io.Reader, io.Closer, error) {
143+
if resp.Header.Get("Content-Encoding") == "deflate" {
144+
zr, err := zlib.NewReader(resp.Body)
145+
if err != nil {
146+
return nil, nil, err
147+
}
148+
return zr, zr, nil
149+
}
150+
return resp.Body, nil, nil
151+
}
152+
153+
func (c *httpConnection) streamToResultSet(reader io.Reader, rs ResultSet) {
154+
d := newStreamingDeserializer(reader)
155+
if err := d.readHeader(); err != nil {
156+
if err != io.EOF {
157+
c.logHandler.logf(Error, failedToReceiveResponse, err.Error())
158+
rs.setError(err)
159+
}
160+
return
161+
}
162+
163+
for {
164+
obj, err := d.readFullyQualified()
165+
if err != nil {
166+
if err != io.EOF {
167+
c.logHandler.logf(Error, failedToReceiveResponse, err.Error())
168+
rs.setError(err)
169+
}
170+
return
171+
}
172+
173+
if marker, ok := obj.(Marker); ok && marker == EndOfStream() {
174+
code, msg, exc, err := d.readStatus()
175+
if err != nil {
176+
c.logHandler.logf(Error, failedToReceiveResponse, err.Error())
177+
rs.setError(err)
178+
return
179+
}
180+
if code != 200 && code != 0 {
181+
if exc != "" {
182+
rs.setError(newError(err0502ResponseHandlerReadLoopError, exc, code))
183+
} else {
184+
rs.setError(newError(err0502ResponseHandlerReadLoopError, msg, code))
185+
}
186+
}
187+
return
188+
}
189+
190+
rs.Channel() <- &Result{obj}
191+
}
192+
}
193+
194+
func (c *httpConnection) close() {
195+
c.httpClient.CloseIdleConnections()
196+
}

0 commit comments

Comments
 (0)