Skip to content

Commit 634f6c9

Browse files
Improve error message and retry behaviour (#111)
Be more restrictive on retries: only retry on 429 and 503 StatusCodes or connection errors. Return better error messages regarding retry reasons and invalid user input. Signed-off-by: Andre Furlan <[email protected]>
1 parent f27a47b commit 634f6c9

File tree

5 files changed

+103
-27
lines changed

5 files changed

+103
-27
lines changed

driver_e2e_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func TestContextTimeoutExample(t *testing.T) {
266266

267267
defer ts.Close()
268268

269-
db, err := sql.Open("databricks", ts.URL)
269+
db, err := sql.Open("databricks", ts.URL+"/path")
270270
require.NoError(t, err)
271271
defer db.Close()
272272

@@ -363,7 +363,7 @@ func TestRetries(t *testing.T) {
363363

364364
connector, err := NewConnector(
365365
WithServerHostname("localhost"),
366-
WithHTTPPath("/500-5-retries"),
366+
WithHTTPPath("/503-5-retries"),
367367
WithPort(port),
368368
WithRetries(2, 10*time.Millisecond, 1*time.Second),
369369
)
@@ -397,7 +397,7 @@ func TestRetries(t *testing.T) {
397397

398398
connector, err := NewConnector(
399399
WithServerHostname("localhost"),
400-
WithHTTPPath("/500-5-retries"),
400+
WithHTTPPath("/429-2-retries"),
401401
WithPort(port),
402402
WithRetries(-1, 0, 0),
403403
)

internal/client/client.go

Lines changed: 62 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"net"
99
"net/http"
1010
"net/http/httptrace"
11+
"net/url"
1112
"os"
13+
"regexp"
1214
"time"
1315

1416
"github.com/apache/thrift/lib/go/thrift"
@@ -175,7 +177,11 @@ func (tsc *ThriftServiceClient) CancelOperation(ctx context.Context, req *cli_se
175177
// InitThriftClient is a wrapper of the http transport, so we can have access to response code and headers.
176178
// It is important to know the code and headers to know if we need to retry or not
177179
func InitThriftClient(cfg *config.Config, httpclient *http.Client) (*ThriftServiceClient, error) {
178-
endpoint := cfg.ToEndpointURL()
180+
var err error
181+
endpoint, err := cfg.ToEndpointURL()
182+
if err != nil {
183+
return nil, err
184+
}
179185
tcfg := &thrift.TConfiguration{
180186
TLSConfig: cfg.TLSConfig,
181187
}
@@ -200,7 +206,6 @@ func InitThriftClient(cfg *config.Config, httpclient *http.Client) (*ThriftServi
200206
}
201207

202208
var tTrans thrift.TTransport
203-
var err error
204209

205210
switch cfg.ThriftTransport {
206211
case "http":
@@ -270,6 +275,8 @@ func SprintGuid(bts []byte) string {
270275
return fmt.Sprintf("%x", bts)
271276
}
272277

278+
var retryableStatusCode = []int{http.StatusTooManyRequests, http.StatusServiceUnavailable}
279+
273280
type Transport struct {
274281
Base *http.Transport
275282
Authr auth.Authenticator
@@ -309,6 +316,27 @@ func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
309316
if err != nil {
310317
return nil, err
311318
}
319+
if resp.StatusCode != http.StatusOK {
320+
reason := resp.Header.Get("X-Databricks-Reason-Phrase")
321+
terrmsg := resp.Header.Get("X-Thriftserver-Error-Message")
322+
for _, c := range retryableStatusCode {
323+
if c == resp.StatusCode {
324+
if terrmsg != "" {
325+
logger.Warn().Msg(terrmsg)
326+
}
327+
return resp, nil
328+
}
329+
}
330+
if reason != "" {
331+
logger.Err(fmt.Errorf(reason)).Msg("non retryable error")
332+
return nil, errors.New(reason)
333+
}
334+
if terrmsg != "" {
335+
logger.Err(fmt.Errorf(terrmsg)).Msg("non retryable error")
336+
return nil, errors.New(terrmsg)
337+
}
338+
return nil, errors.New(resp.Status)
339+
}
312340

313341
return resp, nil
314342
}
@@ -322,7 +350,7 @@ func RetryableClient(cfg *config.Config) *http.Client {
322350
RetryWaitMax: cfg.RetryWaitMax,
323351
RetryMax: cfg.RetryMax,
324352
ErrorHandler: errorHandler,
325-
CheckRetry: retryablehttp.DefaultRetryPolicy,
353+
CheckRetry: RetryPolicy,
326354
Backoff: retryablehttp.DefaultBackoff,
327355
}
328356
return retryableClient.StandardClient()
@@ -412,3 +440,34 @@ func errorHandler(resp *http.Response, err error, numTries int) (*http.Response,
412440

413441
return resp, werr
414442
}
443+
444+
func RetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
445+
var lostConn = regexp.MustCompile(`EOF`)
446+
447+
// do not retry on context.Canceled or context.DeadlineExceeded
448+
if ctx.Err() != nil {
449+
return false, ctx.Err()
450+
}
451+
452+
if err != nil {
453+
if v, ok := err.(*url.Error); ok {
454+
if lostConn.MatchString(v.Error()) {
455+
return true, v
456+
}
457+
}
458+
return false, nil
459+
}
460+
461+
// 429 Too Many Requests or 503 service unavailable is recoverable. Sometimes the server puts
462+
// a Retry-After response header to indicate when the server is
463+
// available to start processing request from client.
464+
465+
for _, c := range retryableStatusCode {
466+
if c == resp.StatusCode {
467+
return true, nil
468+
}
469+
}
470+
471+
return false, nil
472+
473+
}

internal/config/config.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,19 @@ type Config struct {
3636
}
3737

3838
// ToEndpointURL generates the endpoint URL from Config that a Thrift client will connect to
39-
func (c *Config) ToEndpointURL() string {
39+
func (c *Config) ToEndpointURL() (string, error) {
4040
var userInfo string
4141
endpointUrl := fmt.Sprintf("%s://%s%s:%d%s", c.Protocol, userInfo, c.Host, c.Port, c.HTTPPath)
42-
return endpointUrl
42+
if c.Host == "" {
43+
return endpointUrl, errors.New("databricks: missing Hostname")
44+
}
45+
if c.Port == 0 {
46+
return endpointUrl, errors.New("databricks: missing Port")
47+
}
48+
if c.HTTPPath == "" && c.Host != "localhost" {
49+
return endpointUrl, errors.New("databricks: missing HTTP Path")
50+
}
51+
return endpointUrl, nil
4352
}
4453

4554
// DeepCopy returns a true deep copy of Config

internal/config/config_test.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@ func TestParseConfig(t *testing.T) {
1717
}
1818
tz, _ := time.LoadLocation("America/Vancouver")
1919
tests := []struct {
20-
name string
21-
args args
22-
wantCfg UserConfig
23-
wantURL string
24-
wantErr bool
20+
name string
21+
args args
22+
wantCfg UserConfig
23+
wantURL string
24+
wantErr bool
25+
wantEndpointErr bool
2526
}{
2627
{
2728
name: "base case",
@@ -253,11 +254,12 @@ func TestParseConfig(t *testing.T) {
253254
RetryWaitMin: 1 * time.Second,
254255
RetryWaitMax: 30 * time.Second,
255256
},
256-
wantURL: "https://example.cloud.databricks.com:443",
257-
wantErr: false,
257+
wantURL: "https://example.cloud.databricks.com:443",
258+
wantErr: false,
259+
wantEndpointErr: true,
258260
},
259261
{
260-
name: "missing http path",
262+
name: "missing http path 2",
261263
args: args{dsn: "token:[email protected]:443?catalog=default&schema=system&timeout=100&maxRows=1000"},
262264
wantCfg: UserConfig{
263265
Protocol: "https",
@@ -274,8 +276,9 @@ func TestParseConfig(t *testing.T) {
274276
RetryWaitMin: 1 * time.Second,
275277
RetryWaitMax: 30 * time.Second,
276278
},
277-
wantURL: "https://example.cloud.databricks.com:443",
278-
wantErr: false,
279+
wantURL: "https://example.cloud.databricks.com:443",
280+
wantErr: false,
281+
wantEndpointErr: true,
279282
},
280283
{
281284
name: "with wrong port",
@@ -331,8 +334,9 @@ func TestParseConfig(t *testing.T) {
331334
RetryWaitMin: 1 * time.Second,
332335
RetryWaitMax: 30 * time.Second,
333336
},
334-
wantURL: "https://:443",
335-
wantErr: false,
337+
wantURL: "https://:443",
338+
wantErr: false,
339+
wantEndpointErr: true,
336340
},
337341
}
338342
for _, tt := range tests {
@@ -348,7 +352,11 @@ func TestParseConfig(t *testing.T) {
348352
}
349353
if err == nil {
350354
cfg := &Config{UserConfig: got}
351-
gotUrl := cfg.ToEndpointURL()
355+
gotUrl, err := cfg.ToEndpointURL()
356+
if (err != nil) != tt.wantEndpointErr {
357+
t.Errorf("ParseConfig() error = %v, wantErr %v", err, tt.wantErr)
358+
return
359+
}
352360
if gotUrl != tt.wantURL {
353361
t.Errorf("ToEndpointURL() = %v, want %v", got, tt.wantErr)
354362
return

testserver.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ type thriftHandler struct {
1212
processor thrift.TProcessor
1313
inPfactory, outPfactory thrift.TProtocolFactory
1414
count503_2_retries int
15-
count500_5_retries int
15+
count503_5_retries int
1616
count429_2_retries int
1717
}
1818

@@ -36,13 +36,13 @@ func (h *thriftHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
3636
} else {
3737
h.count429_2_retries = 0
3838
}
39-
case "/500-5-retries":
40-
if h.count500_5_retries <= 5 {
41-
w.WriteHeader(http.StatusInternalServerError)
42-
h.count500_5_retries++
39+
case "/503-5-retries":
40+
if h.count503_5_retries <= 5 {
41+
w.WriteHeader(http.StatusServiceUnavailable)
42+
h.count503_5_retries++
4343
return
4444
} else {
45-
h.count500_5_retries = 0
45+
h.count503_5_retries = 0
4646
}
4747
}
4848

0 commit comments

Comments
 (0)