Skip to content

Commit 301cc3e

Browse files
craig[bot]pav-kv
andcommitted
Merge #148965
148965: kvserver: do not cancel while in HandleRaftResponse r=tbg a=pav-kv If the context is canceled in the middle of `HandleRaftResponse` (after `processQueue` returns), a `Replica` can be left in a broken state, e.g. its destruction can be partially completed. This can brick a `RangeID`, e.g. cause a subsequent call to `getOrCreateReplica` to be stuck in an infinite loop. This commit allows the `HandleRaftResponse` loop to terminate gracefully, by using a higher-level context which is not cancelable. Fixes #140958, #145030 Co-authored-by: Pavel Kalinnikov <[email protected]>
2 parents ddfad6b + 1f63750 commit 301cc3e

File tree

1 file changed

+17
-13
lines changed

1 file changed

+17
-13
lines changed

pkg/kv/kvserver/raft_transport.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -597,12 +597,25 @@ func (t *RaftTransport) StopOutgoingMessage(storeID roachpb.StoreID) {
597597
// lost and a new instance of processQueue will be started by the next message
598598
// to be sent.
599599
func (t *RaftTransport) processQueue(
600-
q *raftSendQueue, stream RPCMultiRaft_RaftMessageBatchClient, _ rpcbase.ConnectionClass,
600+
ctx context.Context, q *raftSendQueue, client RPCMultiRaftClient, _ rpcbase.ConnectionClass,
601601
) error {
602-
errCh := make(chan error, 1)
602+
streamCtx, cancel := context.WithCancel(ctx)
603+
defer cancel()
604+
stream, err := client.RaftMessageBatch(streamCtx) // closed via cancellation
605+
if err != nil {
606+
return errors.Wrapf(err, "creating batch client")
607+
}
603608

604-
ctx := stream.Context()
609+
errCh := make(chan error, 1)
605610

611+
// NB: the stream context is canceled when this func returns, and causes the
612+
// response handling loop to terminate asynchronously. Note though that the
613+
// task uses the parent context for HandleRaftResponse calls, to let it finish
614+
// gracefully. Previously, context cancellation inside HandleRaftResponse
615+
// could cause incorrect / half-done Replica destruction (see #140958).
616+
//
617+
// TODO(pav-kv): wait for the task termination to prevent subsequent
618+
// processQueue calls from piling up concurrent tasks.
606619
goCtx, hdl, err := t.stopper.GetHandle(ctx, stop.TaskOpts{
607620
TaskName: "storage.RaftTransport: processing queue",
608621
})
@@ -902,16 +915,7 @@ func (t *RaftTransport) startProcessNewQueue(
902915
// DialNode already logs sufficiently, so just return.
903916
return
904917
}
905-
batchCtx, cancel := context.WithCancel(ctx)
906-
defer cancel()
907-
908-
stream, err := client.RaftMessageBatch(batchCtx) // closed via cancellation
909-
if err != nil {
910-
log.Warningf(ctx, "creating batch client for node %d failed: %+v", toNodeID, err)
911-
return
912-
}
913-
914-
if err := t.processQueue(q, stream, class); err != nil {
918+
if err := t.processQueue(ctx, q, client, class); err != nil {
915919
log.Warningf(ctx, "while processing outgoing Raft queue to node %d: %s:", toNodeID, err)
916920
}
917921
}

0 commit comments

Comments
 (0)