Skip to content

Commit 17cfb7c

Browse files
fancy-rabbitspinlock
authored andcommitted
backend: handle LOADING the same way as MASTERDOWN
1 parent 7457304 commit 17cfb7c

File tree

1 file changed

+56
-0
lines changed

1 file changed

+56
-0
lines changed

pkg/proxy/backend.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
const (
2323
stateConnected = 1
2424
stateDataStale = 2
25+
stateLoading = 3
2526
)
2627

2728
type BackendConn struct {
@@ -137,6 +138,55 @@ func (bc *BackendConn) KeepAlive() bool {
137138
bc, bc.addr, bc.database)
138139
}
139140
}
141+
142+
case stateLoading:
143+
m := &Request{}
144+
m.Multi = []*redis.Resp{
145+
redis.NewBulkBytes([]byte("INFO")),
146+
}
147+
m.Batch = &sync.WaitGroup{}
148+
bc.PushBack(m)
149+
150+
keepAliveCallback <- func() {
151+
m.Batch.Wait()
152+
var err = func() error {
153+
if err := m.Err; err != nil {
154+
return err
155+
}
156+
switch resp := m.Resp; {
157+
case resp == nil:
158+
return ErrRespIsRequired
159+
case resp.IsError():
160+
return fmt.Errorf("bad info resp: %s", resp.Value)
161+
case resp.IsBulkBytes():
162+
var info = make(map[string]string)
163+
for _, line := range strings.Split(string(resp.Value), "\n") {
164+
kv := strings.SplitN(line, ":", 2)
165+
if len(kv) != 2 {
166+
continue
167+
}
168+
if key := strings.TrimSpace(kv[0]); key != "" {
169+
info[key] = strings.TrimSpace(kv[1])
170+
}
171+
}
172+
if info["loading"] == "1" {
173+
return nil
174+
}
175+
if bc.state.CompareAndSwap(stateLoading, stateConnected) {
176+
log.Warnf("backend conn [%p] to %s, db-%d state = Connected (keepalive)",
177+
bc, bc.addr, bc.database)
178+
}
179+
return nil
180+
default:
181+
return fmt.Errorf("bad info resp: should be string, but got %s", resp.Type)
182+
}
183+
}()
184+
if err != nil && bc.closed.IsFalse() {
185+
log.WarnErrorf(err, "backend conn [%p] to %s, db-%d recover from Loading failed",
186+
bc, bc.addr, bc.database)
187+
}
188+
}
189+
140190
}
141191
return true
142192
}
@@ -266,6 +316,7 @@ func (bc *BackendConn) run() {
266316
}
267317

268318
var errMasterDown = []byte("MASTERDOWN")
319+
var errLoading = []byte("LOADING")
269320

270321
func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round int) (err error) {
271322
defer func() {
@@ -288,6 +339,11 @@ func (bc *BackendConn) loopReader(tasks <-chan *Request, c *redis.Conn, round in
288339
log.Warnf("backend conn [%p] to %s, db-%d state = DataStale, caused by 'MASTERDOWN'",
289340
bc, bc.addr, bc.database)
290341
}
342+
case bytes.HasPrefix(resp.Value, errLoading):
343+
if bc.state.CompareAndSwap(stateConnected, stateLoading) {
344+
log.Warnf("backend conn [%p] to %s, db-%d state = Loading",
345+
bc, bc.addr, bc.database)
346+
}
291347
}
292348
}
293349
bc.setResponse(r, resp, nil)

0 commit comments

Comments
 (0)