From 1133efa676f963e0e8a8748c5b052ae8783c0a2f Mon Sep 17 00:00:00 2001 From: wangshaoyi Date: Mon, 31 Mar 2025 12:13:31 +0800 Subject: [PATCH] set error response for timeout request --- codis/pkg/proxy/backend.go | 53 ++++++++++++++++++++--------- codis/pkg/proxy/backend_test.go | 58 ++++++++++++++++++++++++++++++++ codis/pkg/proxy/redis/decoder.go | 10 ++++-- codis/pkg/proxy/request_test.go | 12 +++---- codis/pkg/utils/bufio2/bufio.go | 14 ++++++-- 5 files changed, 120 insertions(+), 27 deletions(-) diff --git a/codis/pkg/proxy/backend.go b/codis/pkg/proxy/backend.go index 7c76a82176..9c94e3ec2c 100644 --- a/codis/pkg/proxy/backend.go +++ b/codis/pkg/proxy/backend.go @@ -282,27 +282,48 @@ func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round in log.WarnErrorf(err, "backend conn [%p] to %s, db-%d reader-[%d] exit", bc, bc.addr, bc.database, round) }() + + var timeout_resp_cnt int for r := range tasks { - resp, err := c.Decode() - r.ReceiveFromServerTime = time.Now().UnixNano() - if err != nil { - return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err)) - } - if resp != nil && resp.IsError() { - switch { - case bytes.HasPrefix(resp.Value, errRespMasterDown): - if bc.state.CompareAndSwap(stateConnected, stateDataStale) { - log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'MASTERDOWN'", - bc, bc.addr, bc.database) + for { + resp, err := c.Decode() + r.ReceiveFromServerTime = time.Now().UnixNano() + if err != nil { + if ne, ok := errors.Cause(err).(net.Error); ok && ne.Timeout() { + timeout_resp_cnt++ + if timeout_resp_cnt%10 == 0 { + log.Warnf(`backend conn [%p] to %s, db-%d, reader-[%d] + accumulated timeout request num: %d`, + bc, bc.addr, bc.database, round, timeout_resp_cnt) + } + bc.setResponse(r, nil, fmt.Errorf("backend request timout, %s", err)) + break } - case bytes.HasPrefix(resp.Value, errRespLoading): - if bc.state.CompareAndSwap(stateConnected, stateDataStale) { - log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'LOADING'", - bc, bc.addr, bc.database) + return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err)) + } + + if timeout_resp_cnt != 0 { + timeout_resp_cnt-- + continue + } + + if resp != nil && resp.IsError() { + switch { + case bytes.HasPrefix(resp.Value, errRespMasterDown): + if bc.state.CompareAndSwap(stateConnected, stateDataStale) { + log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'MASTERDOWN'", + bc, bc.addr, bc.database) + } + case bytes.HasPrefix(resp.Value, errRespLoading): + if bc.state.CompareAndSwap(stateConnected, stateDataStale) { + log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'LOADING'", + bc, bc.addr, bc.database) + } } } + bc.setResponse(r, resp, nil) + break } - bc.setResponse(r, resp, nil) } return nil } diff --git a/codis/pkg/proxy/backend_test.go b/codis/pkg/proxy/backend_test.go index 028085ab4d..79c5845032 100644 --- a/codis/pkg/proxy/backend_test.go +++ b/codis/pkg/proxy/backend_test.go @@ -80,3 +80,61 @@ func TestBackend(t *testing.T) { assert.Must(string(r.Resp.Value) == strconv.Itoa(i)) } } + +func TestBackendTimeOut(t *testing.T) { + config := NewDefaultConfig() + config.BackendMaxPipeline = 3 + config.BackendSendTimeout.Set(10 * time.Second) + config.BackendRecvTimeout.Set(1 * time.Second) + + conn, bc := newConnPair(config) + defer bc.Close() + + var array = make([]*Request, 5) + for i := range array { + array[i] = &Request{Batch: &sync.WaitGroup{}} + } + + // mock backend server, sleep 1.1s for 50% requests + // to simulate request timeout + go func() { + defer conn.Close() + time.Sleep(time.Millisecond * 20) + for i := range array { + _, err := conn.Decode() + if i%2 == 0 { + time.Sleep(time.Millisecond * 1100) + } + assert.MustNoError(err) + resp := redis.NewString([]byte(strconv.Itoa(i))) + assert.MustNoError(conn.Encode(resp, true)) + } + }() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + go func() { + for i := 0; i < 10; i++ { + <-ticker.C + } + log.Panicf("timeout") + }() + + for _, r := range array { + bc.PushBack(r) + } + + for i, r := range array { + r.Batch.Wait() + if i%2 == 0 { + // request timeout + assert.Must(r.Err != nil) + assert.Must(r.Resp == nil) + } else { + // request succeed and response value matches + assert.MustNoError(r.Err) + assert.Must(r.Resp != nil) + assert.Must(string(r.Resp.Value) == strconv.Itoa(i)) + } + } +} diff --git a/codis/pkg/proxy/redis/decoder.go b/codis/pkg/proxy/redis/decoder.go index 0add2a6a74..a3495e5d35 100644 --- a/codis/pkg/proxy/redis/decoder.go +++ b/codis/pkg/proxy/redis/decoder.go @@ -6,6 +6,7 @@ package redis import ( "bytes" "io" + "net" "strconv" "pika/codis/v2/pkg/utils/bufio2" @@ -87,9 +88,14 @@ func (d *Decoder) Decode() (*Resp, error) { } r, err := d.decodeResp() if err != nil { - d.Err = err + // if err is timeout, we reuse this conn, so don't set d.Err + if ne, ok := errors.Cause(err).(net.Error); ok && ne.Timeout() { + d.Err = nil + } else { + d.Err = err + } } - return r, d.Err + return r, err } func (d *Decoder) DecodeMultiBulk() ([]*Resp, error) { diff --git a/codis/pkg/proxy/request_test.go b/codis/pkg/proxy/request_test.go index 3bbe1cd96e..8ed253e06c 100644 --- a/codis/pkg/proxy/request_test.go +++ b/codis/pkg/proxy/request_test.go @@ -16,12 +16,12 @@ import ( func TestRequestChan1(t *testing.T) { var ch = NewRequestChanBuffer(0) for i := 0; i < 8192; i++ { - n := ch.PushBack(&Request{UnixNano: int64(i)}) + n := ch.PushBack(&Request{ReceiveTime: int64(i)}) assert.Must(n == i+1) } for i := 0; i < 8192; i++ { r, ok := ch.PopFront() - assert.Must(ok && r.UnixNano == int64(i)) + assert.Must(ok && r.ReceiveTime == int64(i)) } assert.Must(ch.Buffered() == 0) @@ -34,7 +34,7 @@ func TestRequestChan1(t *testing.T) { func TestRequestChan2(t *testing.T) { var ch = NewRequestChanBuffer(512) for i := 0; i < 8192; i++ { - n := ch.PushBack(&Request{UnixNano: int64(i)}) + n := ch.PushBack(&Request{ReceiveTime: int64(i)}) assert.Must(n == i+1) } ch.Close() @@ -43,7 +43,7 @@ func TestRequestChan2(t *testing.T) { for i := 0; i < 8192; i++ { r, ok := ch.PopFront() - assert.Must(ok && r.UnixNano == int64(i)) + assert.Must(ok && r.ReceiveTime == int64(i)) } assert.Must(ch.Buffered() == 0) @@ -61,7 +61,7 @@ func TestRequestChan3(t *testing.T) { go func() { defer wg.Done() for i := 0; i < n; i++ { - ch.PushBack(&Request{UnixNano: int64(i)}) + ch.PushBack(&Request{ReceiveTime: int64(i)}) if i%1024 == 0 { runtime.Gosched() } @@ -73,7 +73,7 @@ func TestRequestChan3(t *testing.T) { defer wg.Done() for i := 0; i < n; i++ { r, ok := ch.PopFront() - assert.Must(ok && r.UnixNano == int64(i)) + assert.Must(ok && r.ReceiveTime == int64(i)) if i%4096 == 0 { runtime.Gosched() } diff --git a/codis/pkg/utils/bufio2/bufio.go b/codis/pkg/utils/bufio2/bufio.go index c5dc3e5b79..d260290fec 100644 --- a/codis/pkg/utils/bufio2/bufio.go +++ b/codis/pkg/utils/bufio2/bufio.go @@ -7,6 +7,9 @@ import ( "bufio" "bytes" "io" + "net" + + "pika/codis/v2/pkg/utils/errors" ) const DefaultBufferSize = 1024 @@ -51,7 +54,12 @@ func (b *Reader) fill() error { } n, err := b.rd.Read(b.buf[b.wpos:]) if err != nil { - b.err = err + // if err is timeout, we reuse this conn, so don't set b.err + if ne, ok := errors.Cause(err).(net.Error); ok && ne.Timeout() { + return err + } else { + b.err = err + } } else if n == 0 { b.err = io.ErrNoProgress } else { @@ -90,8 +98,8 @@ func (b *Reader) ReadByte() (byte, error) { return 0, b.err } if b.buffered() == 0 { - if b.fill() != nil { - return 0, b.err + if err := b.fill(); err != nil { + return 0, err } } c := b.buf[b.rpos]