@@ -7,7 +7,6 @@ package rangefeed
77
88import (
99 "context"
10- "sync/atomic"
1110 "testing"
1211
1312 "github.com/cockroachdb/cockroach/pkg/kv/kvpb"
@@ -22,73 +21,6 @@ import (
2221 "github.com/stretchr/testify/require"
2322)
2423
25- // TestBufferedSenderWithSendBufferedError tests that BufferedSender can handle stream
26- // disconnects properly including context canceled, metrics updates, rangefeed
27- // cleanup.
28- func TestBufferedSenderDisconnectStream (t * testing.T ) {
29- defer leaktest .AfterTest (t )()
30- defer log .Scope (t ).Close (t )
31- ctx := context .Background ()
32-
33- stopper := stop .NewStopper ()
34- defer stopper .Stop (ctx )
35- testServerStream := newTestServerStream ()
36- smMetrics := NewStreamManagerMetrics ()
37- st := cluster .MakeTestingClusterSettings ()
38- bs := NewBufferedSender (testServerStream , st , NewBufferedSenderMetrics ())
39- sm := NewStreamManager (bs , smMetrics )
40- require .NoError (t , sm .Start (ctx , stopper ))
41- defer sm .Stop (ctx )
42-
43- var sid int64
44- nextStreamID := func () int64 {
45- sid ++
46- return sid
47- }
48-
49- err := kvpb .NewError (kvpb .NewRangeFeedRetryError (kvpb .RangeFeedRetryError_REASON_NO_LEASEHOLDER ))
50- errEvent := func (streamID int64 ) * kvpb.MuxRangeFeedEvent {
51- return makeMuxRangefeedErrorEvent (streamID , 1 , err )
52- }
53-
54- t .Run ("basic operation" , func (t * testing.T ) {
55- var num atomic.Int32
56- streamID := nextStreamID ()
57- sm .RegisteringStream (streamID )
58- sm .AddStream (streamID , & cancelCtxDisconnector {
59- cancel : func () {
60- num .Add (1 )
61- require .NoError (t , sm .sender .sendBuffered (errEvent (streamID ), nil ))
62- },
63- })
64- require .Equal (t , int64 (1 ), smMetrics .ActiveMuxRangeFeed .Value ())
65- require .Equal (t , 0 , bs .len ())
66- sm .DisconnectStream (streamID , err )
67- testServerStream .waitForEvent (t , errEvent (streamID ))
68- require .Equal (t , int32 (1 ), num .Load ())
69- require .Equal (t , 1 , testServerStream .totalEventsSent ())
70- waitForRangefeedCount (t , smMetrics , 0 )
71- testServerStream .reset ()
72- })
73- t .Run ("disconnect stream on the same stream is idempotent" , func (t * testing.T ) {
74- streamID := nextStreamID ()
75- sm .RegisteringStream (streamID )
76- sm .AddStream (streamID , & cancelCtxDisconnector {
77- cancel : func () {
78- require .NoError (t , sm .sender .sendBuffered (errEvent (streamID ), nil ))
79- },
80- })
81- require .Equal (t , int64 (1 ), smMetrics .ActiveMuxRangeFeed .Value ())
82- sm .DisconnectStream (streamID , err )
83- testServerStream .waitForEvent (t , errEvent (streamID ))
84- sm .DisconnectStream (streamID , err )
85- require .NoError (t , bs .waitForEmptyBuffer (ctx ))
86- require .Equalf (t , 1 , testServerStream .totalEventsSent (),
87- "expected only 1 error event in %s" , testServerStream .String ())
88- waitForRangefeedCount (t , smMetrics , 0 )
89- })
90- }
91-
9224func TestBufferedSenderReturnsErrorAfterManagerStop (t * testing.T ) {
9325 defer leaktest .AfterTest (t )()
9426 defer log .Scope (t ).Close (t )
0 commit comments