Skip to content

Conversation

stevendanna
Copy link
Collaborator

Since we do have a window after an error where the unbufferedRegistration may send events, here we move to explicit initialization of the stream status to avoid leaking any streamStatus structs.

Epic: none
Release note: None

@stevendanna stevendanna requested a review from a team as a code owner October 4, 2025 08:41
Copy link

blathers-crl bot commented Oct 4, 2025

It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR?

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

Since we do have a window after an error where the
unbufferedRegistration may send events, here we move to explicit
initialization of the stream status to avoid leaking any streamStatus
structs.

Epic: none
Release note: None
@stevendanna stevendanna force-pushed the ssd/more-explicit-stream-status branch from bbd09cf to e9963de Compare October 4, 2025 09:58
@stevendanna
Copy link
Collaborator Author

Easier to review without whitespace: https://github.com/cockroachdb/cockroach/pull/154828/files?diff=unified&w=1

Copy link
Contributor

@wenyihu6 wenyihu6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained


pkg/kv/kvserver/rangefeed/buffered_sender.go line 153 at r1 (raw file):
Can you help me understand what exactly this change gives us - does it allow more assertions? Or that it helps us avoid adding a stream that’s already erroring to the byStream map?

Such events are possible while the registration publishes the catch up scan buffer.

Is it just for an error event? Do you mind specifying the scenario in the comment? We shouldn’t set the catch-up buffer to nil while publishing, so normal events should still go to the buffer, is that right?


pkg/server/node.go line 2310 at r1 (raw file):

			// stream manager. If rangefeed disconnects with an error after being
			// successfully registered, it calls streamSink.SendError.
			sm.RegisteringStream(req.StreamID)

I remember we spent some time avoiding adding a RegisterCleanUp method to the Stream interface, which would be called when a registration is added to the processor. With both RegisteringStream and AddStream in play now, I’m starting to think if adding another method to the Stream interface might help unify them.


pkg/kv/kvserver/rangefeed/buffered_sender.go line 269 at r1 (raw file):

	defer bs.queueMu.Unlock()
	if _, ok := bs.queueMu.byStream[streamID]; !ok {
		bs.queueMu.byStream[streamID] = streamStatus{}

Is it possible for the stream to already exist here? Can we panic if it does when buildutil.CrdbTestBuild is set? (I know some panics caused a bunch of test failures earlier, but it helped me understand the system better, so I still like using panic in general.)

Copy link
Collaborator Author

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @wenyihu6)


pkg/kv/kvserver/rangefeed/buffered_sender.go line 153 at r1 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

Can you help me understand what exactly this change gives us - does it allow more assertions? Or that it helps us avoid adding a stream that’s already erroring to the byStream map?

Such events are possible while the registration publishes the catch up scan buffer.

Is it just for an error event? Do you mind specifying the scenario in the comment? We shouldn’t set the catch-up buffer to nil while publishing, so normal events should still go to the buffer, is that right?

It isn't just error events. Here is the screnario (I hope to write a test that demonstrates some of this):

  1. Catch-up scan finishes, we have a non-empty catchUpBuf
  2. We begin sending the catchUpBuf to the buffered sender not under lock.
  3. The processor delivers an error to the registration, disconnecting us.
  4. The loop publishing the catchUpBuf only learns of this via a context cancellation that may not be seen immediately. (In another branch I'm making it more likely we see the cancellation with priority, but the basic issiue will still exist).

pkg/kv/kvserver/rangefeed/buffered_sender.go line 269 at r1 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

Is it possible for the stream to already exist here? Can we panic if it does when buildutil.CrdbTestBuild is set? (I know some panics caused a bunch of test failures earlier, but it helped me understand the system better, so I still like using panic in general.)

Yes, we should be able to add such an assertion. I'll do so in my next push and stress it for a bit. I agree, we want as many of these test only assertions as possible.


pkg/server/node.go line 2310 at r1 (raw file):

Previously, wenyihu6 (Wenyi Hu) wrote…

I remember we spent some time avoiding adding a RegisterCleanUp method to the Stream interface, which would be called when a registration is added to the processor. With both RegisteringStream and AddStream in play now, I’m starting to think if adding another method to the Stream interface might help unify them.

I definitely think we should revisit the various components. I like this a bit more than a callback we send down the stack because it is all very local. We are about to register the stream so we tell the stream manager, then we inform it of the success for failure right here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants