@@ -12,6 +12,8 @@ package kvcoord_test
1212
1313import (
1414 "context"
15+ "math/rand"
16+ "sync"
1517 "sync/atomic"
1618 "testing"
1719 "time"
@@ -629,7 +631,7 @@ func TestMuxRangeCatchupScanQuotaReleased(t *testing.T) {
629631 enoughErrors := make (chan struct {})
630632 closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , startTime , noValuesExpected , true ,
631633 kvcoord .TestingWithOnRangefeedEvent (
632- func (_ context.Context , _ roachpb.Span , event * kvpb.RangeFeedEvent ) (skip bool , _ error ) {
634+ func (_ context.Context , _ roachpb.Span , _ int64 , event * kvpb.RangeFeedEvent ) (skip bool , _ error ) {
633635 * event = transientErrEvent
634636 if numErrors .Add (1 ) == numErrsToReturn {
635637 close (enoughErrors )
@@ -727,7 +729,7 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
727729 closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , startTime , ignoreValues , useMux ,
728730 kvcoord .TestingWithRangeFeedMetrics (& metrics ),
729731 kvcoord .TestingWithOnRangefeedEvent (
730- func (ctx context.Context , s roachpb.Span , event * kvpb.RangeFeedEvent ) (skip bool , _ error ) {
732+ func (ctx context.Context , s roachpb.Span , _ int64 , event * kvpb.RangeFeedEvent ) (skip bool , _ error ) {
731733 switch t := event .GetValue ().(type ) {
732734 case * kvpb.RangeFeedValue :
733735 // If we previously arranged for the range to be skipped (stuck catchup scan),
@@ -802,3 +804,135 @@ func TestRangeFeedMetricsManagement(t *testing.T) {
802804 require .EqualValues (t , numCatchupToBlock , metrics .RangefeedCatchupRanges .Value ())
803805 })
804806}
807+
808+ // TestMuxRangeFeedCanCloseStream verifies stream termination functionality in mux rangefeed.
809+ func TestMuxRangeFeedCanCloseStream (t * testing.T ) {
810+ defer leaktest .AfterTest (t )()
811+ defer log .Scope (t ).Close (t )
812+
813+ ctx := context .Background ()
814+ tc := testcluster .StartTestCluster (t , 1 , base.TestClusterArgs {
815+ ReplicationMode : base .ReplicationManual ,
816+ })
817+ defer tc .Stopper ().Stop (ctx )
818+
819+ ts := tc .Server (0 )
820+ sqlDB := sqlutils .MakeSQLRunner (tc .ServerConn (0 ))
821+
822+ // Insert 1000 rows, and split them into 10 ranges.
823+ sqlDB .ExecMultiple (t ,
824+ `SET CLUSTER SETTING kv.rangefeed.enabled = true` ,
825+ `SET CLUSTER SETTING kv.closed_timestamp.target_duration='100ms'` ,
826+ `ALTER DATABASE defaultdb CONFIGURE ZONE USING num_replicas = 1` ,
827+ `CREATE TABLE foo (key INT PRIMARY KEY)` ,
828+ `INSERT INTO foo (key) SELECT * FROM generate_series(1, 1000)` ,
829+ `ALTER TABLE foo SPLIT AT (SELECT * FROM generate_series(100, 900, 100))` ,
830+ )
831+
832+ fooDesc := desctestutils .TestingGetPublicTableDescriptor (
833+ ts .DB (), keys .SystemSQLCodec , "defaultdb" , "foo" )
834+ fooSpan := fooDesc .PrimaryIndexSpan (keys .SystemSQLCodec )
835+
836+ frontier , err := span .MakeFrontier (fooSpan )
837+ require .NoError (t , err )
838+
839+ expectFrontierAdvance := func () {
840+ t .Helper ()
841+ // Closed timestamp for range advances every100ms. We'll require frontier to
842+ // advance a bit more thn that.
843+ threshold := frontier .Frontier ().AddDuration (250 * time .Millisecond )
844+ testutils .SucceedsWithin (t , func () error {
845+ if frontier .Frontier ().Less (threshold ) {
846+ return errors .Newf ("waiting for frontier advance to at least %s" , threshold )
847+ }
848+ return nil
849+ }, 10 * time .Second )
850+ }
851+
852+ var observedStreams sync.Map
853+ var capturedSender atomic.Value
854+
855+ ignoreValues := func (event kvcoord.RangeFeedMessage ) {}
856+ var numRestartStreams atomic.Int32
857+
858+ closeFeed := rangeFeed (ts .DistSenderI (), fooSpan , ts .Clock ().Now (), ignoreValues , true ,
859+ kvcoord .WithMuxRangeFeed (),
860+ kvcoord .TestingWithMuxRangeFeedRequestSenderCapture (
861+ // We expect a single mux sender since we have 1 node in this test.
862+ func (nodeID roachpb.NodeID , capture func (request * kvpb.RangeFeedRequest ) error ) {
863+ capturedSender .Store (capture )
864+ },
865+ ),
866+ kvcoord .TestingWithOnRangefeedEvent (
867+ func (ctx context.Context , s roachpb.Span , streamID int64 , event * kvpb.RangeFeedEvent ) (skip bool , _ error ) {
868+ switch t := event .GetValue ().(type ) {
869+ case * kvpb.RangeFeedCheckpoint :
870+ observedStreams .Store (streamID , nil )
871+ _ , err := frontier .Forward (t .Span , t .ResolvedTS )
872+ if err != nil {
873+ return true , err
874+ }
875+ case * kvpb.RangeFeedError :
876+ // Keep track of mux errors due to RangeFeedRetryError_REASON_RANGEFEED_CLOSED.
877+ // Those results when we issue CloseStream request.
878+ err := t .Error .GoError ()
879+ log .Infof (ctx , "Got err: %v" , err )
880+ var retryErr * kvpb.RangeFeedRetryError
881+ if ok := errors .As (err , & retryErr ); ok && retryErr .Reason == kvpb .RangeFeedRetryError_REASON_RANGEFEED_CLOSED {
882+ numRestartStreams .Add (1 )
883+ }
884+ }
885+
886+ return false , nil
887+ }),
888+ )
889+ defer closeFeed ()
890+
891+ // Wait until we capture mux rangefeed request sender. There should only be 1.
892+ var muxRangeFeedRequestSender func (req * kvpb.RangeFeedRequest ) error
893+ testutils .SucceedsWithin (t , func () error {
894+ v , ok := capturedSender .Load ().(func (request * kvpb.RangeFeedRequest ) error )
895+ if ok {
896+ muxRangeFeedRequestSender = v
897+ return nil
898+ }
899+ return errors .New ("waiting to capture mux rangefeed request sender." )
900+ }, 10 * time .Second )
901+
902+ cancelledStreams := make (map [int64 ]struct {})
903+ for i := 0 ; i < 5 ; i ++ {
904+ // Wait for the test frontier to advance. Once it advances,
905+ // we know the rangefeed is started, all ranges are running.
906+ expectFrontierAdvance ()
907+
908+ // Pick some number of streams to close. Since sync.Map iteration order is non-deterministic,
909+ // we'll pick few random streams.
910+ initialClosed := numRestartStreams .Load ()
911+ numToCancel := 1 + rand .Int31n (3 )
912+ var numCancelled int32 = 0
913+ observedStreams .Range (func (key any , _ any ) bool {
914+ streamID := key .(int64 )
915+ if _ , wasCancelled := cancelledStreams [streamID ]; wasCancelled {
916+ return true // try another stream.
917+ }
918+ numCancelled ++
919+ cancelledStreams [streamID ] = struct {}{}
920+ req , err := kvcoord .NewCloseStreamRequest (ctx , ts .ClusterSettings (), streamID )
921+ require .NoError (t , err )
922+ require .NoError (t , muxRangeFeedRequestSender (req ))
923+ return numCancelled < numToCancel
924+ })
925+
926+ // Observe numToCancel errors.
927+ testutils .SucceedsWithin (t , func () error {
928+ numRestarted := numRestartStreams .Load ()
929+ if numRestarted == initialClosed + numCancelled {
930+ return nil
931+ }
932+ return errors .Newf ("waiting for %d streams to be closed (%d so far)" , numCancelled , numRestarted - initialClosed )
933+ }, 10 * time .Second )
934+
935+ // When we close the stream(s), the rangefeed server responds with a retryable error.
936+ // Mux rangefeed should retry, and thus we expect frontier to keep advancing.
937+ }
938+ }
0 commit comments