@@ -597,13 +597,27 @@ func (t *RaftTransport) StopOutgoingMessage(storeID roachpb.StoreID) {
597
597
// lost and a new instance of processQueue will be started by the next message
598
598
// to be sent.
599
599
func (t * RaftTransport ) processQueue (
600
- q * raftSendQueue , stream RPCMultiRaft_RaftMessageBatchClient , _ rpcbase.ConnectionClass ,
600
+ ctx context. Context , q * raftSendQueue , client RPCMultiRaftClient , _ rpcbase.ConnectionClass ,
601
601
) error {
602
- errCh := make (chan error , 1 )
602
+ batchCtx , cancel := context .WithCancel (ctx )
603
+ defer cancel ()
604
+ stream , err := client .RaftMessageBatch (batchCtx ) // closed via cancellation
605
+ if err != nil {
606
+ return errors .Wrapf (err , "creating batch client" )
607
+ }
603
608
604
- ctx := stream . Context ( )
609
+ errCh := make ( chan error , 1 )
605
610
606
- goCtx , hdl , err := t .stopper .GetHandle (ctx , stop.TaskOpts {
611
+ // NB: the stream context is canceled when this func returns, and causes the
612
+ // response handling loop to terminate asynchronously.
613
+ //
614
+ // TODO(#140958): the context cancellation in the middle of HandleRaftResponse
615
+ // can lead to broken state, such as a replica marked as destroyed but this
616
+ // not being reflected in storage.
617
+ //
618
+ // TODO(pav-kv): wait for the task termination to prevent subsequent
619
+ // processQueue calls from piling up concurrent tasks.
620
+ goCtx , hdl , err := t .stopper .GetHandle (stream .Context (), stop.TaskOpts {
607
621
TaskName : "storage.RaftTransport: processing queue" ,
608
622
})
609
623
if err != nil {
@@ -902,16 +916,7 @@ func (t *RaftTransport) startProcessNewQueue(
902
916
// DialNode already logs sufficiently, so just return.
903
917
return
904
918
}
905
- batchCtx , cancel := context .WithCancel (ctx )
906
- defer cancel ()
907
-
908
- stream , err := client .RaftMessageBatch (batchCtx ) // closed via cancellation
909
- if err != nil {
910
- log .Warningf (ctx , "creating batch client for node %d failed: %+v" , toNodeID , err )
911
- return
912
- }
913
-
914
- if err := t .processQueue (q , stream , class ); err != nil {
919
+ if err := t .processQueue (ctx , q , client , class ); err != nil {
915
920
log .Warningf (ctx , "while processing outgoing Raft queue to node %d: %s:" , toNodeID , err )
916
921
}
917
922
}
0 commit comments