Skip to content

Conversation

@DTCurrie
Copy link
Member

@DTCurrie DTCurrie commented Aug 26, 2025

@DTCurrie DTCurrie self-assigned this Aug 26, 2025
@viambot viambot added the safe to test This pull request is marked safe to test from a trusted zone label Aug 26, 2025
@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Aug 27, 2025
@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Aug 27, 2025
@viambot viambot removed the safe to test This pull request is marked safe to test from a trusted zone label Aug 27, 2025
Comment on lines 91 to 148
func (c *client) StreamTransformChanges(ctx context.Context, extra map[string]interface{}) (<-chan TransformChange, error) {
ctx, span := trace.StartSpan(ctx, "worldstatestore::client::StreamTransformChanges")
defer span.End()

ext, err := protoutils.StructToStructPb(extra)
if err != nil {
return nil, err
}

req := &pb.StreamTransformChangesRequest{Name: c.name, Extra: ext}
stream, err := c.client.StreamTransformChanges(ctx, req)
if err != nil {
return nil, err
}
// Check the initial response immediately to catch early errors.
_, err = stream.Recv()
if err != nil {
return nil, err
}

changesChan := make(chan TransformChange, 1024)

go func() {
defer close(changesChan)

for {
resp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return
}
if ctx.Err() != nil || errors.Is(err, context.Canceled) {
c.logger.Debug(err)
return
}
c.logger.Errorw("failed to receive from stream", "error", err)
return
}

change := TransformChange{
ChangeType: resp.ChangeType,
Transform: resp.Transform,
}

if resp.UpdatedFields != nil {
change.UpdatedFields = resp.UpdatedFields.Paths
}

select {
case changesChan <- change:
case <-ctx.Done():
return
}
}
}()

return changesChan, nil
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benjirewis, you don't need to review everything, but mind taking a look at how we are managing this stream to make sure there are no glaring issues? I tried a few things, and it is shutting down cleanly and working e2e.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look here and in the server code and compared it to some server-streaming code I've seen; I see no glaring issues. Returning a channel seems pretty ergonomic to me, but I suppose you could also return a struct that implements Next() (TransformChange, error).

Also cc @viamrobotics/netcode since a whole new service type is being added here with a non-trivial amount of code. We'll have to change some small pieces of this (ShortName()) @jmatth.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, let me try that out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, returning a stream struct looks a little cleaner than returning the channel directly.

@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Sep 4, 2025
@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Sep 4, 2025
…m:DTCurrie/viam-rdk into RSDK-10422-add-world-object-store-service
@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Sep 4, 2025
@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Sep 4, 2025
@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Sep 4, 2025
return nil, err
}

changesChan := make(chan TransformChange, 1024)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can buy into the fact that this should be a buffered channel (not block on every send until a receive occurs). Why 1024? I assume that's just some arbitrary value? Any idea what the rate of production/consumption will look like? Point being that you may have to mess with this value to get good performance, although I have very little context on what the data even is here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly arbitrary, based off some quick googling and looking around at what we use elsewhere. I believe the input componet uses a similar size so I ran with that, but very much open to input on this as it is outside my wheelhouse.

As for the data, it will be completely up to the user and how they decide to emit changes. For example, in the fake I run an animation loop at about 10fps that triggers 3-4 updates each frame.

Maybe the best choice here is to not define the size?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not define the size

I think you'll certainly want some buffer, as defining no buffer will mean that any send to the channel blocks until there is a receive, which will not allow the client to store transform changes from the server while the user is not pulling from changesChan. 1024 seems like a reasonable starting value here, and if your "fake" setup seems to be streaming changes well enough, then I think it's a fine place to start. I realize we probably don't have a lot of data/intuition on exactly how this API is going to be used yet, but the channel size here is an internal detail that can certainly be changed after release.

Comment on lines 91 to 148
func (c *client) StreamTransformChanges(ctx context.Context, extra map[string]interface{}) (<-chan TransformChange, error) {
ctx, span := trace.StartSpan(ctx, "worldstatestore::client::StreamTransformChanges")
defer span.End()

ext, err := protoutils.StructToStructPb(extra)
if err != nil {
return nil, err
}

req := &pb.StreamTransformChangesRequest{Name: c.name, Extra: ext}
stream, err := c.client.StreamTransformChanges(ctx, req)
if err != nil {
return nil, err
}
// Check the initial response immediately to catch early errors.
_, err = stream.Recv()
if err != nil {
return nil, err
}

changesChan := make(chan TransformChange, 1024)

go func() {
defer close(changesChan)

for {
resp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return
}
if ctx.Err() != nil || errors.Is(err, context.Canceled) {
c.logger.Debug(err)
return
}
c.logger.Errorw("failed to receive from stream", "error", err)
return
}

change := TransformChange{
ChangeType: resp.ChangeType,
Transform: resp.Transform,
}

if resp.UpdatedFields != nil {
change.UpdatedFields = resp.UpdatedFields.Paths
}

select {
case changesChan <- change:
case <-ctx.Done():
return
}
}
}()

return changesChan, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look here and in the server code and compared it to some server-streaming code I've seen; I see no glaring issues. Returning a channel seems pretty ergonomic to me, but I suppose you could also return a struct that implements Next() (TransformChange, error).

Also cc @viamrobotics/netcode since a whole new service type is being added here with a non-trivial amount of code. We'll have to change some small pieces of this (ShortName()) @jmatth.

@viambot viambot added safe to test This pull request is marked safe to test from a trusted zone and removed safe to test This pull request is marked safe to test from a trusted zone labels Sep 5, 2025
@DTCurrie DTCurrie merged commit 6f1386f into viamrobotics:main Sep 5, 2025
18 checks passed
danielbotros pushed a commit to danielbotros/rdk that referenced this pull request Sep 8, 2025
danielbotros pushed a commit to danielbotros/rdk that referenced this pull request Sep 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

safe to test This pull request is marked safe to test from a trusted zone

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants