From e9963de2901e1c9bd1c0b6e95e8d39f362a12e6a Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Sat, 4 Oct 2025 09:39:26 +0100 Subject: [PATCH] rangefeed: explicitly initialize stream status Since we do have a window after an error where the unbufferedRegistration may send events, here we move to explicit initialization of the stream status to avoid leaking any streamStatus structs. Epic: none Release note: None --- pkg/kv/kvserver/rangefeed/buffered_sender.go | 83 +++++++++++-------- .../rangefeed/buffered_sender_test.go | 22 +++-- pkg/kv/kvserver/rangefeed/stream_manager.go | 10 ++- .../kvserver/rangefeed/stream_manager_test.go | 12 ++- .../rangefeed/unbuffered_registration_test.go | 2 + .../kvserver/rangefeed/unbuffered_sender.go | 3 + pkg/server/node.go | 1 + 7 files changed, 84 insertions(+), 49 deletions(-) diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender.go b/pkg/kv/kvserver/rangefeed/buffered_sender.go index 3dace785aa95..41abd6f9ce40 100644 --- a/pkg/kv/kvserver/rangefeed/buffered_sender.go +++ b/pkg/kv/kvserver/rangefeed/buffered_sender.go @@ -147,47 +147,50 @@ func (bs *BufferedSender) sendBuffered( // registration. This error should be the next event that is sent to // stream. // - // NB: The zero-value of streamStatus is the valid state of a newly seen - // stream. - status := bs.queueMu.byStream[ev.StreamID] - switch status.state { - case streamActive: - if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity { + // NB: We don't error if the stream status is not found as this may be an + // event for an already closed stream. Such events are possible while the + // registration publishes the catch up scan buffer. + status, ok := bs.queueMu.byStream[ev.StreamID] + if ok { + switch status.state { + case streamActive: + if bs.queueMu.perStreamCapacity > 0 && status.queueItems == bs.queueMu.perStreamCapacity { + if ev.Error != nil { + // If _this_ event is an error, no use sending another error. This stream + // is going down. Admit this error and mark the stream as overflowed. + status.state = streamOverflowed + } else { + // This stream is at capacity, return an error to the registration that it + // should send back to us after cleaning up. + status.state = streamOverflowing + return newRetryErrBufferCapacityExceeded() + } + } + case streamOverflowing: + // The unbufferedRegistration is the only component that sends non-error + // events to our stream. In response to the error we return when moving to + // stateOverflowing, it should immediately send us an error and mark itself + // as disconnected. + // + // The only unfortunate exception is if we get disconnected while flushing + // the catch-up scan buffer. if ev.Error != nil { - // If _this_ event is an error, no use sending another error. This stream - // is going down. Admit this error and mark the stream as overflowed. status.state = streamOverflowed - } else { - // This stream is at capacity, return an error to the registration that it - // should send back to us after cleaning up. - status.state = streamOverflowing - return newRetryErrBufferCapacityExceeded() } + case streamOverflowed: + // If we are overflowed, we don't expect any further events because the + // registration should have disconnected in response to the error. + // + // TODO(ssd): Consider adding an assertion here. + return nil + default: + panic(fmt.Sprintf("unhandled stream state: %v", status.state)) } - case streamOverflowing: - // The unbufferedRegistration is the only component that sends non-error - // events to our stream. In response to the error we return when moving to - // stateOverflowing, it should immediately send us an error and mark itself - // as disconnected. - // - // The only unfortunate exception is if we get disconnected while flushing - // the catch-up scan buffer. - if ev.Error != nil { - status.state = streamOverflowed - } - case streamOverflowed: - // If we are overflowed, we don't expect any further events because the - // registration should have disconnected in response to the error. - // - // TODO(ssd): Consider adding an assertion here. - return nil - default: - panic(fmt.Sprintf("unhandled stream state: %v", status.state)) - } + // We are admitting this event. + status.queueItems++ + bs.queueMu.byStream[ev.StreamID] = status - // We are admitting this event. - status.queueItems++ - bs.queueMu.byStream[ev.StreamID] = status + } // TODO(wenyihu6): pass an actual context here alloc.Use(context.Background()) @@ -259,6 +262,14 @@ func (bs *BufferedSender) popFront() (e sharedMuxEvent, success bool) { return event, ok } +func (bs *BufferedSender) addStream(streamID int64) { + bs.queueMu.Lock() + defer bs.queueMu.Unlock() + if _, ok := bs.queueMu.byStream[streamID]; !ok { + bs.queueMu.byStream[streamID] = streamStatus{} + } +} + func (bs *BufferedSender) removeStream(streamID int64) { bs.queueMu.Lock() defer bs.queueMu.Unlock() diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender_test.go b/pkg/kv/kvserver/rangefeed/buffered_sender_test.go index 987054470e53..50ff548b9263 100644 --- a/pkg/kv/kvserver/rangefeed/buffered_sender_test.go +++ b/pkg/kv/kvserver/rangefeed/buffered_sender_test.go @@ -182,14 +182,17 @@ func TestBufferedSenderOnOverflow(t *testing.T) { st := cluster.MakeTestingClusterSettings() queueCap := int64(24) + streamID := int64(1) + RangefeedSingleBufferedSenderQueueMaxPerReg.Override(ctx, &st.SV, queueCap) bs := NewBufferedSender(testServerStream, st, NewBufferedSenderMetrics()) + bs.addStream(streamID) require.Equal(t, queueCap, bs.queueMu.perStreamCapacity) val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} ev1 := new(kvpb.RangeFeedEvent) ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1}) - muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: 1} + muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID} for range queueCap { require.NoError(t, bs.sendBuffered(muxEv, nil)) @@ -235,12 +238,13 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) { p, h, pStopper := newTestProcessor(t, withRangefeedTestType(scheduledProcessorWithBufferedSender)) defer pStopper.Stop(ctx) - streamID := int64(42) + streamID1 := int64(42) + streamID2 := streamID1 + 1 val1 := roachpb.Value{RawBytes: []byte("val"), Timestamp: hlc.Timestamp{WallTime: 1}} ev1 := new(kvpb.RangeFeedEvent) ev1.MustSetValue(&kvpb.RangeFeedValue{Key: keyA, Value: val1}) - muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID} + muxEv := &kvpb.MuxRangeFeedEvent{RangeFeedEvent: *ev1, RangeID: 0, StreamID: streamID1} // Block the stream so that we can overflow later. unblock := testServerStream.BlockSend() @@ -256,18 +260,20 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) { } // Add our stream to the stream manager. + sm.RegisteringStream(streamID1) registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */ false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery, - sm.NewStream(streamID, 1 /*rangeID*/)) + sm.NewStream(streamID1, 1 /*rangeID*/)) require.True(t, registered) - sm.AddStream(streamID, d) + sm.AddStream(streamID1, d) // Add a second stream to the stream manager. + sm.RegisteringStream(streamID2) registered, d, _ = p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */ false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery, - sm.NewStream(streamID+1, 1 /*rangeID*/)) + sm.NewStream(streamID2, 1 /*rangeID*/)) require.True(t, registered) - sm.AddStream(streamID+1, d) + sm.AddStream(streamID2, d) // At this point we actually have sent 2 events, one for each checkpoint sent // by the registrations. One of these should get pulled off the queue and block. @@ -283,7 +289,7 @@ func TestBufferedSenderOnOverflowMultiStream(t *testing.T) { require.EqualError(t, err, capExceededErrStr) // A write to a different stream should be fine - muxEv.StreamID = streamID + 2 + muxEv.StreamID = streamID2 err = sm.sender.sendBuffered(muxEv, nil) require.NoError(t, err) diff --git a/pkg/kv/kvserver/rangefeed/stream_manager.go b/pkg/kv/kvserver/rangefeed/stream_manager.go index 1b6662a5bd64..f105263df508 100644 --- a/pkg/kv/kvserver/rangefeed/stream_manager.go +++ b/pkg/kv/kvserver/rangefeed/stream_manager.go @@ -76,7 +76,9 @@ type sender interface { // all streams in StreamManager. run(ctx context.Context, stopper *stop.Stopper, onError func(int64)) error - // Remove stream is called when an individual stream is being removed. + // addStream is called when an individual stream is being added. + addStream(streamID int64) + // removeStream is called when an individual stream is being removed. removeStream(streamID int64) // cleanup is called when the sender is stopped. It is expected to clean up @@ -139,6 +141,12 @@ func (sm *StreamManager) DisconnectStream(streamID int64, err *kvpb.Error) { } } +// RegisteringStream is called once a stream will be registered. After this +// point, the stream may start to see event. +func (sm *StreamManager) RegisteringStream(streamID int64) { + sm.sender.addStream(streamID) +} + // AddStream adds a streamID with its disconnector to the StreamManager. // StreamManager can use the disconnector to shut down the rangefeed stream // later on. diff --git a/pkg/kv/kvserver/rangefeed/stream_manager_test.go b/pkg/kv/kvserver/rangefeed/stream_manager_test.go index fbd215088b3a..2fca87008458 100644 --- a/pkg/kv/kvserver/rangefeed/stream_manager_test.go +++ b/pkg/kv/kvserver/rangefeed/stream_manager_test.go @@ -211,10 +211,11 @@ func TestStreamManagerErrorHandling(t *testing.T) { t.Run("Fail to register rangefeed with the processor", func(t *testing.T) { p, _, stopper := newTestProcessor(t, withRangefeedTestType(rt)) defer stopper.Stop(ctx) - sm.NewStream(sID, rID) - // We mock failed registration by not calling p.Register. - // node.MuxRangefeed would call sendBuffered with error event. - require.NoError(t, sm.sender.sendBuffered(makeMuxRangefeedErrorEvent(sID, rID, disconnectErr), nil)) + streamSink := sm.NewStream(sID, rID) + // We mock failed registration by not calling p.Register and calling + // SendError just as (*Node).muxRangeFeed does. + sm.RegisteringStream(sID) + streamSink.SendError(disconnectErr) expectErrorHandlingInvariance(p) testServerStream.reset() }) @@ -223,6 +224,7 @@ func TestStreamManagerErrorHandling(t *testing.T) { p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt)) defer stopper.Stop(ctx) stream := sm.NewStream(sID, rID) + sm.RegisteringStream(sID) registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */ false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery, stream) @@ -237,6 +239,7 @@ func TestStreamManagerErrorHandling(t *testing.T) { stream := sm.NewStream(sID, rID) p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt)) defer stopper.Stop(ctx) + sm.RegisteringStream(sID) registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */ false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery, stream) @@ -252,6 +255,7 @@ func TestStreamManagerErrorHandling(t *testing.T) { stream := sm.NewStream(sID, rID) p, h, stopper := newTestProcessor(t, withRangefeedTestType(rt)) defer stopper.Stop(ctx) + sm.RegisteringStream(sID) registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */ false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery, stream) diff --git a/pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go b/pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go index 0f418d4e2db7..56e64f753b3d 100644 --- a/pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go +++ b/pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go @@ -48,6 +48,7 @@ func TestUnbufferedRegWithStreamManager(t *testing.T) { }) t.Run("register 50 streams", func(t *testing.T) { for id := int64(0); id < 50; id++ { + sm.RegisteringStream(id) registered, d, _ := p.Register(ctx, h.span, hlc.Timestamp{}, nil, /* catchUpIter */ false /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery, sm.NewStream(id, r1)) @@ -142,6 +143,7 @@ func TestUnbufferedRegCorrectnessOnDisconnect(t *testing.T) { evErr.MustSetValue(&kvpb.RangeFeedError{Error: *discErr}) // Register one stream. + sm.RegisteringStream(s1) registered, d, _ := p.Register(ctx, h.span, startTs, makeCatchUpIterator(catchUpIter, span, startTs), /* catchUpIter */ true /* withDiff */, false /* withFiltering */, false /* withOmitRemote */, noBulkDelivery, diff --git a/pkg/kv/kvserver/rangefeed/unbuffered_sender.go b/pkg/kv/kvserver/rangefeed/unbuffered_sender.go index 17d9a3620f43..91dafba79135 100644 --- a/pkg/kv/kvserver/rangefeed/unbuffered_sender.go +++ b/pkg/kv/kvserver/rangefeed/unbuffered_sender.go @@ -132,6 +132,9 @@ func (ubs *UnbufferedSender) sendUnbuffered(event *kvpb.MuxRangeFeedEvent) error return ubs.sender.Send(event) } +// addStream implements sender. +func (ubs *UnbufferedSender) addStream(int64) {} + // removeStream implements sender. func (ubs *UnbufferedSender) removeStream(int64) {} diff --git a/pkg/server/node.go b/pkg/server/node.go index 132497bf09d8..80b0dc5cebe1 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -2307,6 +2307,7 @@ func (n *Node) muxRangeFeed(muxStream kvpb.RPCInternal_MuxRangeFeedStream) error // Disconnector returned can be used to shut down rangefeed from the // stream manager. If rangefeed disconnects with an error after being // successfully registered, it calls streamSink.SendError. + sm.RegisteringStream(req.StreamID) if disconnector, err := n.stores.RangeFeed(streamCtx, req, streamSink, limiter); err != nil { streamSink.SendError(kvpb.NewError(err)) } else {