Skip to content

Commit 9d3ef8f

Browse files
authored
Merge pull request #41 from HarrisChu/retry_with_timeout
client retry with timeout
2 parents 2d3f779 + 29e1224 commit 9d3ef8f

File tree

1 file changed

+32
-10
lines changed

1 file changed

+32
-10
lines changed

pkg/nebulagraph/client.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
)
1414

1515
const EnvRetryTimes = "NEBULA_RETRY_TIMES"
16+
const EnvRetryIntervalUs = "NEBULA_RETRY_INTERVAL_US"
17+
const EnvRetryTimeoutUs = "NEBULA_RETRY_TIMEOUT_US"
1618

1719
type (
1820
// GraphPool nebula connection pool
@@ -29,6 +31,8 @@ type (
2931
mutex sync.Mutex
3032
csvReader common.ICsvReader
3133
retryTimes int
34+
retryIntervalUs int
35+
retryTimeoutUs int
3236
}
3337

3438
// GraphClient a wrapper for nebula client, could read data from DataCh
@@ -106,7 +110,12 @@ func (gp *GraphPool) Init(address string, concurrent int) (common.IGraphClientPo
106110

107111
// InitWithSize initializes nebula pool with channel buffer size
108112
func (gp *GraphPool) InitWithSize(address string, concurrent int, chanSize int) (common.IGraphClientPool, error) {
109-
var retryTimes int
113+
var (
114+
retryTimes int
115+
retryIntervalUs int
116+
retryTimeoutUs int
117+
)
118+
110119
gp.mutex.Lock()
111120
defer gp.mutex.Unlock()
112121
if gp.initialized {
@@ -115,10 +124,19 @@ func (gp *GraphPool) InitWithSize(address string, concurrent int, chanSize int)
115124
if os.Getenv(EnvRetryTimes) != "" {
116125
retryTimes, _ = strconv.Atoi(os.Getenv(EnvRetryTimes))
117126
}
127+
if os.Getenv(EnvRetryIntervalUs) != "" {
128+
retryIntervalUs, _ = strconv.Atoi(os.Getenv(EnvRetryIntervalUs))
129+
}
130+
if os.Getenv(EnvRetryTimeoutUs) != "" {
131+
retryTimeoutUs, _ = strconv.Atoi(os.Getenv(EnvRetryTimeoutUs))
132+
}
118133

119134
if retryTimes == 0 {
120135
retryTimes = 200
121136
}
137+
if retryIntervalUs == 0 {
138+
retryIntervalUs = 100 * 1e3
139+
}
122140

123141
err := gp.initAndVerifyPool(address, concurrent, chanSize)
124142
if err != nil {
@@ -127,6 +145,8 @@ func (gp *GraphPool) InitWithSize(address string, concurrent int, chanSize int)
127145
gp.DataCh = make(chan common.Data, chanSize)
128146
gp.initialized = true
129147
gp.retryTimes = retryTimes
148+
gp.retryIntervalUs = retryIntervalUs
149+
gp.retryTimeoutUs = retryTimeoutUs
130150

131151
return gp, nil
132152
}
@@ -249,29 +269,31 @@ func (gc *GraphClient) GetData() (common.Data, error) {
249269
}
250270

251271
func (gc *GraphClient) executeRetry(stmt string) (*graph.ResultSet, error) {
252-
// retry only when leader changed
272+
// retry only when execution error
253273
// if other errors, e.g. SemanticError, would return directly
254274
var (
255275
resp *graph.ResultSet
256276
err error
257277
)
278+
start := time.Now()
258279
for i := 0; i < gc.Pool.retryTimes; i++ {
280+
if gc.Pool.retryTimeoutUs != 0 && time.Since(start).Microseconds() > int64(gc.Pool.retryTimeoutUs) {
281+
return resp, nil
282+
}
259283
resp, err = gc.Client.Execute(stmt)
260284
if err != nil {
261285
return nil, err
262286
}
263287
graphErr := resp.GetErrorCode()
264-
if graphErr != graph.ErrorCode_SUCCEEDED {
265-
if graphErr == graph.ErrorCode_E_EXECUTION_ERROR {
266-
<-time.After(100 * time.Millisecond)
267-
continue
268-
}
288+
if graphErr == graph.ErrorCode_SUCCEEDED {
269289
return resp, nil
270290
}
271-
return resp, nil
291+
// only retry for execution error
292+
if graphErr != graph.ErrorCode_E_EXECUTION_ERROR {
293+
break
294+
}
295+
<-time.After(time.Duration(gc.Pool.retryIntervalUs) * time.Microsecond)
272296
}
273-
// still leader changed
274-
fmt.Printf("retry %d times, but still error: %s, return directly\n", gc.Pool.retryTimes, resp.GetErrorMsg())
275297
return resp, nil
276298
}
277299

0 commit comments

Comments
 (0)