Skip to content

Commit bc6eb36

Browse files
committed
fix(retry): add status to fix MixRetry race issue
1 parent fefa17d commit bc6eb36

File tree

2 files changed

+47
-10
lines changed

2 files changed

+47
-10
lines changed

pkg/remote/codec/default_codec.go

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,11 +194,6 @@ func (c *defaultCodec) DecodeMeta(ctx context.Context, message remote.Message, i
194194
if flagBuf, err = in.Peek(2 * Size32); err != nil {
195195
return perrors.NewProtocolErrorWithErrMsg(err, fmt.Sprintf("default codec read failed: %s", err.Error()))
196196
}
197-
198-
if err = checkRPCState(ctx, message); err != nil {
199-
// there is one call has finished in retry task, it doesn't need to do decode for this call
200-
return err
201-
}
202197
isTTHeader := IsTTHeader(flagBuf)
203198
// 1. decode header
204199
if isTTHeader {
@@ -254,6 +249,11 @@ func (c *defaultCodec) DecodePayload(ctx context.Context, message remote.Message
254249

255250
// Decode implements the remote.Codec interface, it does complete message decode include header and payload.
256251
func (c *defaultCodec) Decode(ctx context.Context, message remote.Message, in remote.ByteBuffer) (err error) {
252+
defer updateRPCDecodeState(ctx, message, err)
253+
if err = checkRPCState(ctx, message); err != nil {
254+
// there is one call has finished in retry task, it doesn't need to do decode for this call
255+
return err
256+
}
257257
// 1. decode meta
258258
if err = c.DecodeMeta(ctx, message, in); err != nil {
259259
return err
@@ -381,6 +381,8 @@ func isThriftFramedBinary(flagBuf []byte) bool {
381381
return binary.BigEndian.Uint32(flagBuf[Size32:])&MagicMask == ThriftV1Magic
382382
}
383383

384+
// checkRPCState is used to avoid concurrent decoding situations
385+
// specific situation: request retry and timeout
384386
func checkRPCState(ctx context.Context, message remote.Message) error {
385387
if message.RPCRole() == remote.Server {
386388
return nil
@@ -398,6 +400,23 @@ func checkRPCState(ctx context.Context, message remote.Message) error {
398400
return nil
399401
}
400402

403+
// updateRPCDecodeState is used to update the decoding status on the client side, which is for retry scenarios.
404+
// If decoding is completed, update CtxRespOp to OpDone.
405+
// Note that the update will only occur when it is actually encoded, if it is returned by ErrRPCFinal, it cannot be updated
406+
func updateRPCDecodeState(ctx context.Context, message remote.Message, err error) {
407+
if err == kerrors.ErrRPCFinish {
408+
fmt.Println("ErrRPCFinish 不更新")
409+
return
410+
}
411+
if message.RPCRole() == remote.Server {
412+
return
413+
}
414+
if respOp, ok := ctx.Value(retry.CtxRespOp).(*int32); ok {
415+
atomic.CompareAndSwapInt32(respOp, retry.OpDoing, retry.OpDone)
416+
}
417+
return
418+
}
419+
401420
func checkPayload(flagBuf []byte, message remote.Message, in remote.ByteBuffer, isTTHeader bool, maxPayloadSize int) error {
402421
var transProto transport.Protocol
403422
var codecType serviceinfo.PayloadCodec

pkg/retry/mixed_retryer.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"runtime"
2324
"sync"
2425
"sync/atomic"
2526
"time"
@@ -157,10 +158,10 @@ func (r *mixedRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rpci
157158
}
158159
case res := <-callDone:
159160
// result retry
160-
if respOp, ok := ctx.Value(CtxRespOp).(*int32); ok {
161-
// must set as OpNo, or the new resp cannot be decoded
162-
atomic.StoreInt32(respOp, OpNo)
163-
}
161+
162+
// must set CtxRespOp as OpNo, or the new resp cannot be decoded
163+
respOp, _ := ctx.Value(CtxRespOp).(*int32)
164+
164165
doneCount++
165166
isFinishErr := res.err != nil && errors.Is(res.err, kerrors.ErrRPCFinish)
166167
if nonFinishedErrRes == nil || !isFinishErr {
@@ -175,30 +176,47 @@ func (r *mixedRetryer) Do(ctx context.Context, rpcCall RPCCallFunc, firstRI rpci
175176
// But if all requests return this error, it must be a bug, preventive panic to avoid dead loop
176177
panic(errUnexpectedFinish)
177178
}
179+
updateRespOpStat(respOp)
178180
continue
179181
}
180182
if callCount < retryTimes+1 {
181183
if msg, ok := r.ShouldRetry(ctx, nil, callCount, req, cbKey); ok {
182184
if r.isRetryResult(ctx, res.ri, res.resp, res.err, &r.policy.FailurePolicy) {
183185
doCall = true
184186
timer.Reset(retryDelay)
185-
continue
187+
updateRespOpStat(respOp)
188+
186189
}
187190
} else if msg != "" {
188191
appendMsg := fmt.Sprintf("retried %d, %s", callCount-1, msg)
189192
appendErrMsg(res.err, appendMsg)
190193
}
191194
} else if r.isRetryResult(ctx, res.ri, res.resp, res.err, &r.policy.FailurePolicy) {
195+
// only the Stat is Done, can be reset
196+
updateRespOpStat(respOp)
192197
continue
193198
}
194199
}
200+
atomic.StoreInt32(respOp, OpDone)
195201
atomic.StoreInt32(&abort, 1)
196202
recordRetryInfo(nonFinishedErrRes.ri, atomic.LoadInt32(&callTimes), callCosts.String())
197203
return nonFinishedErrRes.ri, false, nonFinishedErrRes.err
198204
}
199205
}
200206
}
201207

208+
func updateRespOpStat(respOp *int32) {
209+
// only the Stat is Done, can be reset
210+
if atomic.CompareAndSwapInt32(respOp, OpDone, OpNo) {
211+
return
212+
}
213+
var i, maxTry = 0, 3
214+
for i < maxTry && atomic.LoadInt32(respOp) == OpDoing {
215+
runtime.Gosched()
216+
i++
217+
}
218+
}
219+
202220
// UpdatePolicy implements the Retryer interface.
203221
func (r *mixedRetryer) UpdatePolicy(rp Policy) (err error) {
204222
r.Lock()

0 commit comments

Comments
 (0)