@@ -774,25 +774,43 @@ func TestReplicaRangefeedErrors(t *testing.T) {
774774 return tc , rangeID
775775 }
776776
777+ assertRangefeedRetryErr := func (
778+ t * testing.T , pErr error , expReason kvpb.RangeFeedRetryError_Reason ,
779+ ) {
780+ t .Helper ()
781+ expErr := kvpb .NewRangeFeedRetryError (expReason )
782+ if pErr == nil {
783+ t .Fatalf ("got nil error for RangeFeed: expecting %v" , expErr )
784+ }
785+ rfErr , ok := kvpb .NewError (pErr ).GetDetail ().(* kvpb.RangeFeedRetryError )
786+ if ! ok {
787+ t .Fatalf ("got incorrect error for RangeFeed: %v; expecting %v" , pErr , expErr )
788+ }
789+ if rfErr .Reason != expReason {
790+ t .Fatalf ("got incorrect RangeFeedRetryError reason for RangeFeed: %v; expecting %v" ,
791+ rfErr .Reason , expReason )
792+ }
793+ }
794+
777795 waitForInitialCheckpointAcrossSpan := func (
778- t * testing.T , stream * testStream , streamErrC <- chan error , span roachpb.Span ,
796+ t * testing.T , stream * testStream , streamErrC <- chan error , span roachpb.Span , allowedError * kvpb. RangeFeedRetryError_Reason ,
779797 ) {
780798 t .Helper ()
781799 var events []* kvpb.RangeFeedEvent
782800 testutils .SucceedsSoon (t , func () error {
783801 if len (streamErrC ) > 0 {
784- // Break if the error channel is already populated.
785- return nil
802+ if allowedError == nil {
803+ t .Fatalf ("unexpected error from stream: %v" , <- streamErrC )
804+ }
805+ streamErr := <- streamErrC
806+ assertRangefeedRetryErr (t , streamErr , * allowedError )
786807 }
787808 events = stream .Events ()
788809 if len (events ) < 1 {
789810 return errors .Errorf ("too few events: %v" , events )
790811 }
791812 return nil
792813 })
793- if len (streamErrC ) > 0 {
794- t .Fatalf ("unexpected error from stream: %v" , <- streamErrC )
795- }
796814
797815 var lastTS hlc.Timestamp
798816 for _ , evt := range events {
@@ -803,24 +821,6 @@ func TestReplicaRangefeedErrors(t *testing.T) {
803821 }
804822 }
805823
806- assertRangefeedRetryErr := func (
807- t * testing.T , pErr error , expReason kvpb.RangeFeedRetryError_Reason ,
808- ) {
809- t .Helper ()
810- expErr := kvpb .NewRangeFeedRetryError (expReason )
811- if pErr == nil {
812- t .Fatalf ("got nil error for RangeFeed: expecting %v" , expErr )
813- }
814- rfErr , ok := kvpb .NewError (pErr ).GetDetail ().(* kvpb.RangeFeedRetryError )
815- if ! ok {
816- t .Fatalf ("got incorrect error for RangeFeed: %v; expecting %v" , pErr , expErr )
817- }
818- if rfErr .Reason != expReason {
819- t .Fatalf ("got incorrect RangeFeedRetryError reason for RangeFeed: %v; expecting %v" ,
820- rfErr .Reason , expReason )
821- }
822- }
823-
824824 t .Run (kvpb .RangeFeedRetryError_REASON_REPLICA_REMOVED .String (), func (t * testing.T ) {
825825 const removeStore = 2
826826 tc , rangeID := setup (t , base.TestingKnobs {})
@@ -848,7 +848,7 @@ func TestReplicaRangefeedErrors(t *testing.T) {
848848 }()
849849
850850 // Wait for the first checkpoint event.
851- waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan )
851+ waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan , nil )
852852
853853 // Remove the replica from the range.
854854 tc .RemoveVotersOrFatal (t , startKey , tc .Target (removeStore ))
@@ -883,7 +883,7 @@ func TestReplicaRangefeedErrors(t *testing.T) {
883883 }()
884884
885885 // Wait for the first checkpoint event.
886- waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan )
886+ waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan , nil )
887887
888888 // Split the range.
889889 tc .SplitRangeOrFatal (t , mkKey ("m" ))
@@ -944,8 +944,8 @@ func TestReplicaRangefeedErrors(t *testing.T) {
944944 }()
945945
946946 // Wait for the first checkpoint event on each stream.
947- waitForInitialCheckpointAcrossSpan (t , streamLeft , streamLeftErrC , rangefeedLeftSpan )
948- waitForInitialCheckpointAcrossSpan (t , streamRight , streamRightErrC , rangefeedRightSpan )
947+ waitForInitialCheckpointAcrossSpan (t , streamLeft , streamLeftErrC , rangefeedLeftSpan , nil )
948+ waitForInitialCheckpointAcrossSpan (t , streamRight , streamRightErrC , rangefeedRightSpan , nil )
949949
950950 // Merge the ranges back together
951951 mergeArgs := adminMergeArgs (startKey )
@@ -1002,7 +1002,7 @@ func TestReplicaRangefeedErrors(t *testing.T) {
10021002 }()
10031003
10041004 // Wait for the first checkpoint event.
1005- waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan )
1005+ waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan , nil )
10061006
10071007 // Force the leader off the replica on partitionedStore. If it's the
10081008 // leader, this test will fall over when it cuts the replica off from
@@ -1094,6 +1094,21 @@ func TestReplicaRangefeedErrors(t *testing.T) {
10941094 conf .RangefeedEnabled = false
10951095 return conf
10961096 },
1097+ // This test has been failing with REASON_LOGICAL_OPS_MISSING (#146566)
1098+ // coming from waitForInitialCheckpointAcrossSpan, which could indicate
1099+ // that some other request to this range is racing with the enabling of
1100+ // the rangefeed and hitting an error in handleLogicalOpLogRaftMuLocked.
1101+ // To help debug this, we intercept all requests to this range. If the
1102+ // test stops flaking, we can remove this filter; it's not needed for
1103+ // the test functionality.
1104+ TestingRequestFilter : func (ctx context.Context , ba * kvpb.BatchRequest ) * kvpb.Error {
1105+ for _ , req := range ba .Requests {
1106+ if req .GetInner ().Header ().Key .Equal (startKey ) {
1107+ log .Infof (ctx , "intercepting request %+v" , req .GetInner ())
1108+ }
1109+ }
1110+ return nil
1111+ },
10971112 },
10981113 }
10991114 tc , _ := setup (t , knobs )
@@ -1104,17 +1119,19 @@ func TestReplicaRangefeedErrors(t *testing.T) {
11041119 if err != nil {
11051120 t .Fatal (err )
11061121 }
1107- // Split the range so that the RHS is not a system range and thus will
1108- // respect the rangefeed_enabled cluster setting.
1122+ // Split off a small range to ensure it's not a system range and thus will
1123+ // respect the rangefeed_enabled cluster setting. It also helps ensure no
1124+ // one else writes to it.
11091125 tc .SplitRangeOrFatal (t , startKey )
1126+ endKey := startKey .Next ()
1127+ tc .SplitRangeOrFatal (t , endKey )
11101128
11111129 rightRangeID := store .LookupReplica (roachpb .RKey (startKey )).RangeID
11121130
11131131 // Establish a rangefeed.
11141132 stream := newTestStream ()
11151133 streamErrC := make (chan error , 1 )
11161134
1117- endKey := keys .ScratchRangeMax
11181135 rangefeedSpan := roachpb.Span {Key : startKey , EndKey : endKey }
11191136 go func () {
11201137 req := kvpb.RangeFeedRequest {
@@ -1129,8 +1146,12 @@ func TestReplicaRangefeedErrors(t *testing.T) {
11291146 streamErrC <- waitRangeFeed (t , store , & req , stream )
11301147 }()
11311148
1132- // Wait for the first checkpoint event.
1133- waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan )
1149+ // Wait for the first checkpoint event. It's possible to get a
1150+ // REASON_LOGICAL_OPS_MISSING error here if an earlier proposal raced in
1151+ // with enabling the rangefeed. See the comment in
1152+ // handleLogicalOpLogRaftMuLocked.
1153+ allowedError := kvpb .RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING
1154+ waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan , & allowedError )
11341155
11351156 // Disable rangefeeds, which stops logical op logs from being provided
11361157 // with Raft commands.
@@ -1225,7 +1246,7 @@ func TestReplicaRangefeedErrors(t *testing.T) {
12251246 }()
12261247
12271248 // Wait for the first checkpoint event.
1228- waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan )
1249+ waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan , nil )
12291250 })
12301251 t .Run ("multiple-origin-ids" , func (t * testing.T ) {
12311252 tc , rangeID := setup (t , base.TestingKnobs {})
0 commit comments