@@ -29,7 +29,6 @@ import (
2929 "github.com/cockroachdb/cockroach/pkg/testutils/skip"
3030 "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
3131 "github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
32- "github.com/cockroachdb/cockroach/pkg/util"
3332 "github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
3433 "github.com/cockroachdb/cockroach/pkg/util/hlc"
3534 "github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -129,14 +128,19 @@ func rangeFeed(
129128 startFrom hlc.Timestamp ,
130129 onValue func (event kvcoord.RangeFeedMessage ),
131130 opts ... kvcoord.RangeFeedOption ,
132- ) func () {
131+ ) ( chan error , func () ) {
133132 ds := dsI .(* kvcoord.DistSender )
134133 events := make (chan kvcoord.RangeFeedMessage )
135134 ctx , cancel := context .WithCancel (context .WithValue (context .Background (), testFeedCtxKey {}, struct {}{}))
136135
136+ errCh := make (chan error , 1 )
137137 g := ctxgroup .WithContext (ctx )
138- g .GoCtx (func (ctx context.Context ) (err error ) {
139- return ds .RangeFeed (ctx , []kvcoord.SpanTimePair {{Span : sp , StartAfter : startFrom }}, events , opts ... )
138+ g .GoCtx (func (ctx context.Context ) error {
139+ err := ds .RangeFeed (ctx , []kvcoord.SpanTimePair {{Span : sp , StartAfter : startFrom }}, events , opts ... )
140+ if err != nil {
141+ errCh <- err
142+ }
143+ return err
140144 })
141145 g .GoCtx (func (ctx context.Context ) error {
142146 for {
@@ -149,7 +153,7 @@ func rangeFeed(
149153 }
150154 })
151155
152- return func () {
156+ return errCh , func () {
153157 cancel ()
154158 _ = g .Wait ()
155159 }
@@ -176,19 +180,17 @@ func observeNValues(n int) (chan struct{}, func(ev kvcoord.RangeFeedMessage)) {
176180 }
177181}
178182
179- func channelWaitWithTimeout (t * testing.T , ch chan struct {}) {
183+ func channelWaitWithTimeout (t * testing.T , ch chan struct {}, errCh chan error ) {
180184 t .Helper ()
181- timeOut := 30 * time .Second
182- if util .RaceEnabled {
183- timeOut *= 10
184- }
185- if syncutil .DeadlockEnabled {
186- timeOut = 2 * deadlock .Opts .DeadlockTimeout
187- }
185+ timeOut := testutils .SucceedsSoonDuration ()
188186 select {
189187 case <- ch :
188+ case err := <- errCh :
189+ if err != nil {
190+ t .Fatalf ("unexpected error while waiting on channel: %v" , err )
191+ }
190192 case <- time .After (timeOut ):
191- t .Fatal ("test timed out" )
193+ t .Fatalf ("test timed out after %s" , timeOut )
192194 }
193195}
194196
@@ -220,7 +222,7 @@ func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) {
220222 // test cluster nodes.
221223 sqlDB .ExecMultiple (t ,
222224 `SET CLUSTER SETTING kv.rangefeed.enabled = true` ,
223- `ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1` ,
225+ `ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1` ,
224226 `CREATE TABLE foo (key INT PRIMARY KEY)` ,
225227 `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)` ,
226228 `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 100))` ,
@@ -241,9 +243,9 @@ func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) {
241243 fooSpan := fooDesc .PrimaryIndexSpan (keys .SystemSQLCodec )
242244
243245 allSeen , onValue := observeNValues (1000 )
244- closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , startTime , onValue )
246+ errCh , closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , startTime , onValue )
245247 defer closeFeed ()
246- channelWaitWithTimeout (t , allSeen )
248+ channelWaitWithTimeout (t , allSeen , errCh )
247249 closeFeed () // Explicitly shutdown the feed to make sure counters no longer change.
248250
249251 // Verify we connected to each node once.
@@ -254,7 +256,7 @@ func TestMuxRangeFeedConnectsToNodeOnce(t *testing.T) {
254256 }
255257}
256258
257- func TestMuxRangeCatchupScanQuotaReleased (t * testing.T ) {
259+ func TestMuxRangeFeedCatchupScanQuotaReleased (t * testing.T ) {
258260 defer leaktest .AfterTest (t )()
259261 defer log .Scope (t ).Close (t )
260262
@@ -289,20 +291,23 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
289291 noValuesExpected := func (event kvcoord.RangeFeedMessage ) {
290292 panic ("received value when none expected" )
291293 }
292- const numErrsToReturn = 100
294+ const numErrsToReturn = 42
293295 var numErrors atomic.Int32
294296 enoughErrors := make (chan struct {})
295- closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , startTime , noValuesExpected ,
297+ errCh , closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , startTime , noValuesExpected ,
296298 kvcoord .TestingWithOnRangefeedEvent (
297299 func (_ context.Context , _ roachpb.Span , _ int64 , event * kvpb.RangeFeedEvent ) (skip bool , _ error ) {
300+ if event .Error != nil {
301+ return false , nil
302+ }
298303 * event = transientErrEvent
299304 if numErrors .Add (1 ) == numErrsToReturn {
300305 close (enoughErrors )
301306 }
302307 return false , nil
303308 }))
304309 defer closeFeed ()
305- channelWaitWithTimeout (t , enoughErrors )
310+ channelWaitWithTimeout (t , enoughErrors , errCh )
306311}
307312
308313// TestMuxRangeFeedDoesNotStallOnError tests that the mux rangefeed
@@ -413,14 +418,14 @@ func TestMuxRangeFeedDoesNotStallOnError(t *testing.T) {
413418
414419 shouldError .Store (true )
415420 allSeen , onValue := observeNValues (100 )
416- closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , startFrom , onValue )
421+ errCh , closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , startFrom , onValue )
417422 defer closeFeed ()
418- channelWaitWithTimeout (t , allSeen )
423+ channelWaitWithTimeout (t , allSeen , errCh )
419424}
420425
421426// Test to make sure the various metrics used by rangefeed are correctly
422427// updated during the lifetime of the rangefeed and when the rangefeed completes.
423- func TestRangeFeedMetricsManagement (t * testing.T ) {
428+ func TestMuxRangeFeedMetricsManagement (t * testing.T ) {
424429 defer leaktest .AfterTest (t )()
425430 defer log .Scope (t ).Close (t )
426431
@@ -434,6 +439,9 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
434439 sqlDB := sqlutils .MakeSQLRunner (tc .ServerConn (0 ))
435440 startTime := ts .Clock ().Now ()
436441
442+ kvserver .RangefeedEnabled .Override (
443+ context .Background (), & tc .SystemLayer (0 ).ClusterSettings ().SV , true )
444+
437445 // Insert 1000 rows, and split them into 10 ranges.
438446 const numRanges = 10
439447 sqlDB .ExecMultiple (t ,
@@ -502,8 +510,9 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
502510 return skipSet .stuck .Contains (k )
503511 }
504512
513+ frontierAdvanced := make (chan struct {}, 1 )
505514 ignoreValues := func (event kvcoord.RangeFeedMessage ) {}
506- closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , startTime , ignoreValues ,
515+ errCh , closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , startTime , ignoreValues ,
507516 kvcoord .TestingWithRangeFeedMetrics (& metrics ),
508517 kvcoord .TestingWithOnRangefeedEvent (
509518 func (ctx context.Context , s roachpb.Span , _ int64 , event * kvpb.RangeFeedEvent ) (skip bool , _ error ) {
@@ -535,10 +544,16 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
535544 return false , nil
536545 }
537546
538- _ , err := frontier .Forward (checkpoint .Span , checkpoint .ResolvedTS )
547+ advanced , err := frontier .Forward (checkpoint .Span , checkpoint .ResolvedTS )
539548 if err != nil {
540549 return false , err
541550 }
551+ if advanced {
552+ select {
553+ case frontierAdvanced <- struct {}{}:
554+ default :
555+ }
556+ }
542557
543558 if numCatchupBlocked .Add (1 ) <= numCatchupToBlock {
544559 // Mux rangefeed can't block single range, so just skip this event
@@ -558,12 +573,7 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
558573
559574 // Wait for the test frontier to advance. Once it advances,
560575 // we know the rangefeed is started, all ranges are running (even if some of them are blocked).
561- testutils .SucceedsSoon (t , func () error {
562- if frontier .Frontier ().IsEmpty () {
563- return errors .Newf ("waiting for frontier advance: %s" , frontier .String ())
564- }
565- return nil
566- })
576+ channelWaitWithTimeout (t , frontierAdvanced , errCh )
567577
568578 // At this point, we know the rangefeed for all ranges are running.
569579 require .EqualValues (t , numRanges , metrics .RangefeedRanges .Value (), frontier .String ())
@@ -575,9 +585,9 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
575585 require .EqualValues (t , numCatchupToBlock , metrics .RangefeedCatchupRanges .Value ())
576586}
577587
578- // TestRangefeedRangeObserver ensures the kvcoord.WithRangeObserver option
588+ // TestMuxRangefeedRangeObserver ensures the kvcoord.WithRangeObserver option
579589// works correctly.
580- func TestRangefeedRangeObserver (t * testing.T ) {
590+ func TestMuxRangefeedRangeObserver (t * testing.T ) {
581591 defer leaktest .AfterTest (t )()
582592 defer log .Scope (t ).Close (t )
583593
@@ -640,7 +650,7 @@ func TestRangefeedRangeObserver(t *testing.T) {
640650 })
641651 }
642652
643- closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , ts .Clock ().Now (), ignoreValues ,
653+ _ , closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , ts .Clock ().Now (), ignoreValues ,
644654 kvcoord .WithRangeObserver (observer ))
645655 defer closeFeed ()
646656
@@ -735,7 +745,7 @@ func TestMuxRangeFeedCanCloseStream(t *testing.T) {
735745 ignoreValues := func (event kvcoord.RangeFeedMessage ) {}
736746 var numRestartStreams atomic.Int32
737747
738- closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , ts .Clock ().Now (), ignoreValues ,
748+ _ , closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , ts .Clock ().Now (), ignoreValues ,
739749 kvcoord .TestingWithMuxRangeFeedRequestSenderCapture (
740750 // We expect a single mux sender since we have 1 node in this test.
741751 func (nodeID roachpb.NodeID , capture func (request * kvpb.RangeFeedRequest ) error ) {
@@ -863,13 +873,13 @@ func TestMuxRangeFeedDoesNotDeadlockWithLocalStreams(t *testing.T) {
863873 fooSpan := fooDesc .PrimaryIndexSpan (keys .SystemSQLCodec )
864874
865875 allSeen , onValue := observeNValues (1000 )
866- closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , startFrom , onValue ,
876+ _ , closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , startFrom , onValue ,
867877 kvcoord .TestingWithBeforeSendRequest (func () {
868878 // Prior to sending rangefeed request, block for just a bit
869879 // to make deadlock more likely.
870880 time .Sleep (100 * time .Millisecond )
871881 }),
872882 )
873883 defer closeFeed ()
874- channelWaitWithTimeout (t , allSeen )
884+ channelWaitWithTimeout (t , allSeen , nil )
875885}
0 commit comments