Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions codis/pkg/proxy/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,27 +282,48 @@ func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round in
log.WarnErrorf(err, "backend conn [%p] to %s, db-%d reader-[%d] exit",
bc, bc.addr, bc.database, round)
}()

var timeout_resp_cnt int
for r := range tasks {
resp, err := c.Decode()
r.ReceiveFromServerTime = time.Now().UnixNano()
if err != nil {
return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
}
if resp != nil && resp.IsError() {
switch {
case bytes.HasPrefix(resp.Value, errRespMasterDown):
if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'MASTERDOWN'",
bc, bc.addr, bc.database)
for {
resp, err := c.Decode()
r.ReceiveFromServerTime = time.Now().UnixNano()
if err != nil {
if ne, ok := errors.Cause(err).(net.Error); ok && ne.Timeout() {
timeout_resp_cnt++
if timeout_resp_cnt%10 == 0 {
log.Warnf(`backend conn [%p] to %s, db-%d, reader-[%d]
accumulated timeout request num: %d`,
bc, bc.addr, bc.database, round, timeout_resp_cnt)
}
bc.setResponse(r, nil, fmt.Errorf("backend request timout, %s", err))
break
}
case bytes.HasPrefix(resp.Value, errRespLoading):
if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'LOADING'",
bc, bc.addr, bc.database)
return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
}

if timeout_resp_cnt != 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里应该先处理当前响应,再减少timeout计数。过早的continue会导致即使后续请求成功,客户端也无法收到响应结果。

timeout_resp_cnt--
continue
}

if resp != nil && resp.IsError() {
switch {
case bytes.HasPrefix(resp.Value, errRespMasterDown):
if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'MASTERDOWN'",
bc, bc.addr, bc.database)
}
case bytes.HasPrefix(resp.Value, errRespLoading):
if bc.state.CompareAndSwap(stateConnected, stateDataStale) {
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'LOADING'",
bc, bc.addr, bc.database)
}
}
}
bc.setResponse(r, resp, nil)
break
}
bc.setResponse(r, resp, nil)
}
return nil
}
Expand Down
58 changes: 58 additions & 0 deletions codis/pkg/proxy/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,61 @@ func TestBackend(t *testing.T) {
assert.Must(string(r.Resp.Value) == strconv.Itoa(i))
}
}

func TestBackendTimeOut(t *testing.T) {
config := NewDefaultConfig()
config.BackendMaxPipeline = 3
config.BackendSendTimeout.Set(10 * time.Second)
config.BackendRecvTimeout.Set(1 * time.Second)

conn, bc := newConnPair(config)
defer bc.Close()

var array = make([]*Request, 5)
for i := range array {
array[i] = &Request{Batch: &sync.WaitGroup{}}
}

// mock backend server, sleep 1.1s for 50% requests
// to simulate request timeout
go func() {
defer conn.Close()
time.Sleep(time.Millisecond * 20)
for i := range array {
_, err := conn.Decode()
if i%2 == 0 {
time.Sleep(time.Millisecond * 1100)
}
assert.MustNoError(err)
resp := redis.NewString([]byte(strconv.Itoa(i)))
assert.MustNoError(conn.Encode(resp, true))
}
}()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
go func() {
for i := 0; i < 10; i++ {
<-ticker.C
}
log.Panicf("timeout")
}()

for _, r := range array {
bc.PushBack(r)
}

for i, r := range array {
r.Batch.Wait()
if i%2 == 0 {
// request timeout
assert.Must(r.Err != nil)
assert.Must(r.Resp == nil)
} else {
// request succeed and response value matches
assert.MustNoError(r.Err)
assert.Must(r.Resp != nil)
assert.Must(string(r.Resp.Value) == strconv.Itoa(i))
}
}
}
10 changes: 8 additions & 2 deletions codis/pkg/proxy/redis/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package redis
import (
"bytes"
"io"
"net"
"strconv"

"pika/codis/v2/pkg/utils/bufio2"
Expand Down Expand Up @@ -87,9 +88,14 @@ func (d *Decoder) Decode() (*Resp, error) {
}
r, err := d.decodeResp()
if err != nil {
d.Err = err
// if err is timeout, we reuse this conn, so don't set d.Err
if ne, ok := errors.Cause(err).(net.Error); ok && ne.Timeout() {
d.Err = nil
} else {
d.Err = err
}
}
return r, d.Err
return r, err
}

func (d *Decoder) DecodeMultiBulk() ([]*Resp, error) {
Expand Down
12 changes: 6 additions & 6 deletions codis/pkg/proxy/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
func TestRequestChan1(t *testing.T) {
var ch = NewRequestChanBuffer(0)
for i := 0; i < 8192; i++ {
n := ch.PushBack(&Request{UnixNano: int64(i)})
n := ch.PushBack(&Request{ReceiveTime: int64(i)})
assert.Must(n == i+1)
}
for i := 0; i < 8192; i++ {
r, ok := ch.PopFront()
assert.Must(ok && r.UnixNano == int64(i))
assert.Must(ok && r.ReceiveTime == int64(i))
}
assert.Must(ch.Buffered() == 0)

Expand All @@ -34,7 +34,7 @@ func TestRequestChan1(t *testing.T) {
func TestRequestChan2(t *testing.T) {
var ch = NewRequestChanBuffer(512)
for i := 0; i < 8192; i++ {
n := ch.PushBack(&Request{UnixNano: int64(i)})
n := ch.PushBack(&Request{ReceiveTime: int64(i)})
assert.Must(n == i+1)
}
ch.Close()
Expand All @@ -43,7 +43,7 @@ func TestRequestChan2(t *testing.T) {

for i := 0; i < 8192; i++ {
r, ok := ch.PopFront()
assert.Must(ok && r.UnixNano == int64(i))
assert.Must(ok && r.ReceiveTime == int64(i))
}
assert.Must(ch.Buffered() == 0)

Expand All @@ -61,7 +61,7 @@ func TestRequestChan3(t *testing.T) {
go func() {
defer wg.Done()
for i := 0; i < n; i++ {
ch.PushBack(&Request{UnixNano: int64(i)})
ch.PushBack(&Request{ReceiveTime: int64(i)})
if i%1024 == 0 {
runtime.Gosched()
}
Expand All @@ -73,7 +73,7 @@ func TestRequestChan3(t *testing.T) {
defer wg.Done()
for i := 0; i < n; i++ {
r, ok := ch.PopFront()
assert.Must(ok && r.UnixNano == int64(i))
assert.Must(ok && r.ReceiveTime == int64(i))
if i%4096 == 0 {
runtime.Gosched()
}
Expand Down
14 changes: 11 additions & 3 deletions codis/pkg/utils/bufio2/bufio.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"bufio"
"bytes"
"io"
"net"

"pika/codis/v2/pkg/utils/errors"
)

const DefaultBufferSize = 1024
Expand Down Expand Up @@ -51,7 +54,12 @@ func (b *Reader) fill() error {
}
n, err := b.rd.Read(b.buf[b.wpos:])
if err != nil {
b.err = err
// if err is timeout, we reuse this conn, so don't set b.err
if ne, ok := errors.Cause(err).(net.Error); ok && ne.Timeout() {
return err
} else {
b.err = err
}
} else if n == 0 {
b.err = io.ErrNoProgress
} else {
Expand Down Expand Up @@ -90,8 +98,8 @@ func (b *Reader) ReadByte() (byte, error) {
return 0, b.err
}
if b.buffered() == 0 {
if b.fill() != nil {
return 0, b.err
if err := b.fill(); err != nil {
return 0, err
}
}
c := b.buf[b.rpos]
Expand Down
Loading