Skip to content

Commit 3a8f481

Browse files
author
wangshaoyi
committed
set error response for timeout request
1 parent 2b5363e commit 3a8f481

File tree

5 files changed

+120
-27
lines changed

5 files changed

+120
-27
lines changed

codis/pkg/proxy/backend.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -282,27 +282,48 @@ func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round in
282282
log.WarnErrorf(err, "backend conn [%p] to %s, db-%d reader-[%d] exit",
283283
bc, bc.addr, bc.database, round)
284284
}()
285+
286+
var timeout_resp_cnt int
285287
for r := range tasks {
286-
resp, err := c.Decode()
287-
r.ReceiveFromServerTime = time.Now().UnixNano()
288-
if err != nil {
289-
return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
290-
}
291-
if resp != nil && resp.IsError() {
292-
switch {
293-
case bytes.HasPrefix(resp.Value, errRespMasterDown):
294-
if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
295-
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'MASTERDOWN'",
296-
bc, bc.addr, bc.database)
288+
for {
289+
resp, err := c.Decode()
290+
r.ReceiveFromServerTime = time.Now().UnixNano()
291+
if err != nil {
292+
if ne, ok := errors.Cause(err).(net.Error); ok && ne.Timeout() {
293+
timeout_resp_cnt++
294+
if timeout_resp_cnt%10 == 0 {
295+
log.Warnf(`backend conn [%p] to %s, db-%d, reader-[%d]
296+
accumulated timeout request num: %d`,
297+
bc, bc.addr, bc.database, round, timeout_resp_cnt)
298+
}
299+
bc.setResponse(r, nil, fmt.Errorf("backend request timout, %s", err))
300+
break
297301
}
298-
case bytes.HasPrefix(resp.Value, errRespLoading):
299-
if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
300-
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'LOADING'",
301-
bc, bc.addr, bc.database)
302+
return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
303+
}
304+
305+
if timeout_resp_cnt != 0 {
306+
timeout_resp_cnt--
307+
continue
308+
}
309+
310+
if resp != nil && resp.IsError() {
311+
switch {
312+
case bytes.HasPrefix(resp.Value, errRespMasterDown):
313+
if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
314+
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'MASTERDOWN'",
315+
bc, bc.addr, bc.database)
316+
}
317+
case bytes.HasPrefix(resp.Value, errRespLoading):
318+
if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
319+
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'LOADING'",
320+
bc, bc.addr, bc.database)
321+
}
302322
}
303323
}
324+
bc.setResponse(r, resp, nil)
325+
break
304326
}
305-
bc.setResponse(r, resp, nil)
306327
}
307328
return nil
308329
}

codis/pkg/proxy/backend_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,61 @@ func TestBackend(t *testing.T) {
8080
assert.Must(string(r.Resp.Value) == strconv.Itoa(i))
8181
}
8282
}
83+
84+
func TestBackendTimeOut(t *testing.T) {
85+
config := NewDefaultConfig()
86+
config.BackendMaxPipeline = 3
87+
config.BackendSendTimeout.Set(10 * time.Second)
88+
config.BackendRecvTimeout.Set(1 * time.Second)
89+
90+
conn, bc := newConnPair(config)
91+
defer bc.Close()
92+
93+
var array = make([]*Request, 5)
94+
for i := range array {
95+
array[i] = &Request{Batch: &sync.WaitGroup{}}
96+
}
97+
98+
// mock backend server, sleep 1.1s for 50% requests
99+
// to simulate request timeout
100+
go func() {
101+
defer conn.Close()
102+
time.Sleep(time.Millisecond * 20)
103+
for i := range array {
104+
_, err := conn.Decode()
105+
if i%2 == 0 {
106+
time.Sleep(time.Millisecond * 1100)
107+
}
108+
assert.MustNoError(err)
109+
resp := redis.NewString([]byte(strconv.Itoa(i)))
110+
assert.MustNoError(conn.Encode(resp, true))
111+
}
112+
}()
113+
114+
ticker := time.NewTicker(time.Second)
115+
defer ticker.Stop()
116+
go func() {
117+
for i := 0; i < 10; i++ {
118+
<-ticker.C
119+
}
120+
log.Panicf("timeout")
121+
}()
122+
123+
for _, r := range array {
124+
bc.PushBack(r)
125+
}
126+
127+
for i, r := range array {
128+
r.Batch.Wait()
129+
if i%2 == 0 {
130+
// request timeout
131+
assert.Must(r.Err != nil)
132+
assert.Must(r.Resp == nil)
133+
} else {
134+
// request succeed and response value matches
135+
assert.MustNoError(r.Err)
136+
assert.Must(r.Resp != nil)
137+
assert.Must(string(r.Resp.Value) == strconv.Itoa(i))
138+
}
139+
}
140+
}

codis/pkg/proxy/redis/decoder.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package redis
66
import (
77
"bytes"
88
"io"
9+
"net"
910
"strconv"
1011

1112
"pika/codis/v2/pkg/utils/bufio2"
@@ -87,9 +88,14 @@ func (d *Decoder) Decode() (*Resp, error) {
8788
}
8889
r, err := d.decodeResp()
8990
if err != nil {
90-
d.Err = err
91+
// if err is timeout, we reuse this conn, so don't set d.Err
92+
if ne, ok := errors.Cause(err).(net.Error); ok && ne.Timeout() {
93+
d.Err = nil
94+
} else {
95+
d.Err = err
96+
}
9197
}
92-
return r, d.Err
98+
return r, err
9399
}
94100

95101
func (d *Decoder) DecodeMultiBulk() ([]*Resp, error) {

codis/pkg/proxy/request_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ import (
1616
func TestRequestChan1(t *testing.T) {
1717
var ch = NewRequestChanBuffer(0)
1818
for i := 0; i < 8192; i++ {
19-
n := ch.PushBack(&Request{UnixNano: int64(i)})
19+
n := ch.PushBack(&Request{ReceiveTime: int64(i)})
2020
assert.Must(n == i+1)
2121
}
2222
for i := 0; i < 8192; i++ {
2323
r, ok := ch.PopFront()
24-
assert.Must(ok && r.UnixNano == int64(i))
24+
assert.Must(ok && r.ReceiveTime == int64(i))
2525
}
2626
assert.Must(ch.Buffered() == 0)
2727

@@ -34,7 +34,7 @@ func TestRequestChan1(t *testing.T) {
3434
func TestRequestChan2(t *testing.T) {
3535
var ch = NewRequestChanBuffer(512)
3636
for i := 0; i < 8192; i++ {
37-
n := ch.PushBack(&Request{UnixNano: int64(i)})
37+
n := ch.PushBack(&Request{ReceiveTime: int64(i)})
3838
assert.Must(n == i+1)
3939
}
4040
ch.Close()
@@ -43,7 +43,7 @@ func TestRequestChan2(t *testing.T) {
4343

4444
for i := 0; i < 8192; i++ {
4545
r, ok := ch.PopFront()
46-
assert.Must(ok && r.UnixNano == int64(i))
46+
assert.Must(ok && r.ReceiveTime == int64(i))
4747
}
4848
assert.Must(ch.Buffered() == 0)
4949

@@ -61,7 +61,7 @@ func TestRequestChan3(t *testing.T) {
6161
go func() {
6262
defer wg.Done()
6363
for i := 0; i < n; i++ {
64-
ch.PushBack(&Request{UnixNano: int64(i)})
64+
ch.PushBack(&Request{ReceiveTime: int64(i)})
6565
if i%1024 == 0 {
6666
runtime.Gosched()
6767
}
@@ -73,7 +73,7 @@ func TestRequestChan3(t *testing.T) {
7373
defer wg.Done()
7474
for i := 0; i < n; i++ {
7575
r, ok := ch.PopFront()
76-
assert.Must(ok && r.UnixNano == int64(i))
76+
assert.Must(ok && r.ReceiveTime == int64(i))
7777
if i%4096 == 0 {
7878
runtime.Gosched()
7979
}

codis/pkg/utils/bufio2/bufio.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ import (
77
"bufio"
88
"bytes"
99
"io"
10+
"net"
11+
12+
"pika/codis/v2/pkg/utils/errors"
1013
)
1114

1215
const DefaultBufferSize = 1024
@@ -51,7 +54,12 @@ func (b *Reader) fill() error {
5154
}
5255
n, err := b.rd.Read(b.buf[b.wpos:])
5356
if err != nil {
54-
b.err = err
57+
// if err is timeout, we reuse this conn, so don't set b.err
58+
if ne, ok := errors.Cause(err).(net.Error); ok && ne.Timeout() {
59+
return err
60+
} else {
61+
b.err = err
62+
}
5563
} else if n == 0 {
5664
b.err = io.ErrNoProgress
5765
} else {
@@ -90,8 +98,8 @@ func (b *Reader) ReadByte() (byte, error) {
9098
return 0, b.err
9199
}
92100
if b.buffered() == 0 {
93-
if b.fill() != nil {
94-
return 0, b.err
101+
if err := b.fill(); err != nil {
102+
return 0, err
95103
}
96104
}
97105
c := b.buf[b.rpos]

0 commit comments

Comments
 (0)