@@ -774,25 +774,43 @@ func TestReplicaRangefeedErrors(t *testing.T) {
774
774
return tc , rangeID
775
775
}
776
776
777
+ assertRangefeedRetryErr := func (
778
+ t * testing.T , pErr error , expReason kvpb.RangeFeedRetryError_Reason ,
779
+ ) {
780
+ t .Helper ()
781
+ expErr := kvpb .NewRangeFeedRetryError (expReason )
782
+ if pErr == nil {
783
+ t .Fatalf ("got nil error for RangeFeed: expecting %v" , expErr )
784
+ }
785
+ rfErr , ok := kvpb .NewError (pErr ).GetDetail ().(* kvpb.RangeFeedRetryError )
786
+ if ! ok {
787
+ t .Fatalf ("got incorrect error for RangeFeed: %v; expecting %v" , pErr , expErr )
788
+ }
789
+ if rfErr .Reason != expReason {
790
+ t .Fatalf ("got incorrect RangeFeedRetryError reason for RangeFeed: %v; expecting %v" ,
791
+ rfErr .Reason , expReason )
792
+ }
793
+ }
794
+
777
795
waitForInitialCheckpointAcrossSpan := func (
778
- t * testing.T , stream * testStream , streamErrC <- chan error , span roachpb.Span ,
796
+ t * testing.T , stream * testStream , streamErrC <- chan error , span roachpb.Span , allowedError * kvpb. RangeFeedRetryError_Reason ,
779
797
) {
780
798
t .Helper ()
781
799
var events []* kvpb.RangeFeedEvent
782
800
testutils .SucceedsSoon (t , func () error {
783
801
if len (streamErrC ) > 0 {
784
- // Break if the error channel is already populated.
785
- return nil
802
+ if allowedError == nil {
803
+ t .Fatalf ("unexpected error from stream: %v" , <- streamErrC )
804
+ }
805
+ streamErr := <- streamErrC
806
+ assertRangefeedRetryErr (t , streamErr , * allowedError )
786
807
}
787
808
events = stream .Events ()
788
809
if len (events ) < 1 {
789
810
return errors .Errorf ("too few events: %v" , events )
790
811
}
791
812
return nil
792
813
})
793
- if len (streamErrC ) > 0 {
794
- t .Fatalf ("unexpected error from stream: %v" , <- streamErrC )
795
- }
796
814
797
815
var lastTS hlc.Timestamp
798
816
for _ , evt := range events {
@@ -803,24 +821,6 @@ func TestReplicaRangefeedErrors(t *testing.T) {
803
821
}
804
822
}
805
823
806
- assertRangefeedRetryErr := func (
807
- t * testing.T , pErr error , expReason kvpb.RangeFeedRetryError_Reason ,
808
- ) {
809
- t .Helper ()
810
- expErr := kvpb .NewRangeFeedRetryError (expReason )
811
- if pErr == nil {
812
- t .Fatalf ("got nil error for RangeFeed: expecting %v" , expErr )
813
- }
814
- rfErr , ok := kvpb .NewError (pErr ).GetDetail ().(* kvpb.RangeFeedRetryError )
815
- if ! ok {
816
- t .Fatalf ("got incorrect error for RangeFeed: %v; expecting %v" , pErr , expErr )
817
- }
818
- if rfErr .Reason != expReason {
819
- t .Fatalf ("got incorrect RangeFeedRetryError reason for RangeFeed: %v; expecting %v" ,
820
- rfErr .Reason , expReason )
821
- }
822
- }
823
-
824
824
t .Run (kvpb .RangeFeedRetryError_REASON_REPLICA_REMOVED .String (), func (t * testing.T ) {
825
825
const removeStore = 2
826
826
tc , rangeID := setup (t , base.TestingKnobs {})
@@ -848,7 +848,7 @@ func TestReplicaRangefeedErrors(t *testing.T) {
848
848
}()
849
849
850
850
// Wait for the first checkpoint event.
851
- waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan )
851
+ waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan , nil )
852
852
853
853
// Remove the replica from the range.
854
854
tc .RemoveVotersOrFatal (t , startKey , tc .Target (removeStore ))
@@ -883,7 +883,7 @@ func TestReplicaRangefeedErrors(t *testing.T) {
883
883
}()
884
884
885
885
// Wait for the first checkpoint event.
886
- waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan )
886
+ waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan , nil )
887
887
888
888
// Split the range.
889
889
tc .SplitRangeOrFatal (t , mkKey ("m" ))
@@ -944,8 +944,8 @@ func TestReplicaRangefeedErrors(t *testing.T) {
944
944
}()
945
945
946
946
// Wait for the first checkpoint event on each stream.
947
- waitForInitialCheckpointAcrossSpan (t , streamLeft , streamLeftErrC , rangefeedLeftSpan )
948
- waitForInitialCheckpointAcrossSpan (t , streamRight , streamRightErrC , rangefeedRightSpan )
947
+ waitForInitialCheckpointAcrossSpan (t , streamLeft , streamLeftErrC , rangefeedLeftSpan , nil )
948
+ waitForInitialCheckpointAcrossSpan (t , streamRight , streamRightErrC , rangefeedRightSpan , nil )
949
949
950
950
// Merge the ranges back together
951
951
mergeArgs := adminMergeArgs (startKey )
@@ -1002,7 +1002,7 @@ func TestReplicaRangefeedErrors(t *testing.T) {
1002
1002
}()
1003
1003
1004
1004
// Wait for the first checkpoint event.
1005
- waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan )
1005
+ waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan , nil )
1006
1006
1007
1007
// Force the leader off the replica on partitionedStore. If it's the
1008
1008
// leader, this test will fall over when it cuts the replica off from
@@ -1094,6 +1094,21 @@ func TestReplicaRangefeedErrors(t *testing.T) {
1094
1094
conf .RangefeedEnabled = false
1095
1095
return conf
1096
1096
},
1097
+ // This test has been failing with REASON_LOGICAL_OPS_MISSING (#146566)
1098
+ // coming from waitForInitialCheckpointAcrossSpan, which could indicate
1099
+ // that some other request to this range is racing with the enabling of
1100
+ // the rangefeed and hitting an error in handleLogicalOpLogRaftMuLocked.
1101
+ // To help debug this, we intercept all requests to this range. If the
1102
+ // test stops flaking, we can remove this filter; it's not needed for
1103
+ // the test functionality.
1104
+ TestingRequestFilter : func (ctx context.Context , ba * kvpb.BatchRequest ) * kvpb.Error {
1105
+ for _ , req := range ba .Requests {
1106
+ if req .GetInner ().Header ().Key .Equal (startKey ) {
1107
+ log .Infof (ctx , "intercepting request %+v" , req .GetInner ())
1108
+ }
1109
+ }
1110
+ return nil
1111
+ },
1097
1112
},
1098
1113
}
1099
1114
tc , _ := setup (t , knobs )
@@ -1104,17 +1119,19 @@ func TestReplicaRangefeedErrors(t *testing.T) {
1104
1119
if err != nil {
1105
1120
t .Fatal (err )
1106
1121
}
1107
- // Split the range so that the RHS is not a system range and thus will
1108
- // respect the rangefeed_enabled cluster setting.
1122
+ // Split off a small range to ensure it's not a system range and thus will
1123
+ // respect the rangefeed_enabled cluster setting. It also helps ensure no
1124
+ // one else writes to it.
1109
1125
tc .SplitRangeOrFatal (t , startKey )
1126
+ endKey := startKey .Next ()
1127
+ tc .SplitRangeOrFatal (t , endKey )
1110
1128
1111
1129
rightRangeID := store .LookupReplica (roachpb .RKey (startKey )).RangeID
1112
1130
1113
1131
// Establish a rangefeed.
1114
1132
stream := newTestStream ()
1115
1133
streamErrC := make (chan error , 1 )
1116
1134
1117
- endKey := keys .ScratchRangeMax
1118
1135
rangefeedSpan := roachpb.Span {Key : startKey , EndKey : endKey }
1119
1136
go func () {
1120
1137
req := kvpb.RangeFeedRequest {
@@ -1129,8 +1146,12 @@ func TestReplicaRangefeedErrors(t *testing.T) {
1129
1146
streamErrC <- waitRangeFeed (t , store , & req , stream )
1130
1147
}()
1131
1148
1132
- // Wait for the first checkpoint event.
1133
- waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan )
1149
+ // Wait for the first checkpoint event. It's possible to get a
1150
+ // REASON_LOGICAL_OPS_MISSING error here if an earlier proposal raced in
1151
+ // with enabling the rangefeed. See the comment in
1152
+ // handleLogicalOpLogRaftMuLocked.
1153
+ allowedError := kvpb .RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING
1154
+ waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan , & allowedError )
1134
1155
1135
1156
// Disable rangefeeds, which stops logical op logs from being provided
1136
1157
// with Raft commands.
@@ -1225,7 +1246,7 @@ func TestReplicaRangefeedErrors(t *testing.T) {
1225
1246
}()
1226
1247
1227
1248
// Wait for the first checkpoint event.
1228
- waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan )
1249
+ waitForInitialCheckpointAcrossSpan (t , stream , streamErrC , rangefeedSpan , nil )
1229
1250
})
1230
1251
t .Run ("multiple-origin-ids" , func (t * testing.T ) {
1231
1252
tc , rangeID := setup (t , base.TestingKnobs {})
0 commit comments