Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
35d243c
move proto to/from conversions to referenceframe
DTCurrie Jul 10, 2025
58bea5e
remove point proto
DTCurrie Jul 17, 2025
a109126
cleanup
DTCurrie Jul 17, 2025
167e93b
Merge branch 'main' of ssh://github.com/viamrobotics/rdk into APP-877…
DTCurrie Jul 17, 2025
1b1dac5
cleanup
DTCurrie Jul 17, 2025
ed9e15d
Merge branch 'main' of ssh://github.com/viamrobotics/rdk into APP-877…
DTCurrie Aug 12, 2025
6c2e826
pr comments
DTCurrie Aug 12, 2025
c67adc6
fixture cleanup
DTCurrie Aug 12, 2025
18ab6c6
fix linting issue in test files
DTCurrie Aug 12, 2025
16abc5b
fix test
DTCurrie Aug 12, 2025
3353dac
Merge branch 'main' of ssh://github.com/viamrobotics/rdk into APP-877…
DTCurrie Aug 12, 2025
c0b21cf
Merge branch 'main' of ssh://github.com/viamrobotics/rdk into APP-877…
DTCurrie Aug 20, 2025
e1662b3
Merge branch 'main' of ssh://github.com/viamrobotics/rdk into APP-877…
DTCurrie Aug 20, 2025
1037ceb
Merge branch 'main' of ssh://github.com/viamrobotics/rdk into APP-877…
DTCurrie Aug 26, 2025
6fd968a
undo breaking changes from API and simplify
DTCurrie Aug 26, 2025
daaed79
Merge branch 'main' of ssh://github.com/viamrobotics/rdk into APP-877…
DTCurrie Aug 26, 2025
5f42c75
add world state store service impl
DTCurrie Aug 26, 2025
ada9502
rename
DTCurrie Aug 27, 2025
bc4e0f3
update fake
DTCurrie Aug 27, 2025
e33470f
bump api
DTCurrie Aug 27, 2025
e9432ae
Merge branch 'main' of ssh://github.com/viamrobotics/rdk into APP-877…
DTCurrie Aug 27, 2025
d124815
Merge branch 'APP-8776-move-pointcloud-to-spatialmath' into RSDK-1042…
DTCurrie Aug 27, 2025
bebd69a
lint
DTCurrie Aug 27, 2025
e602452
Merge branch 'main' of ssh://github.com/viamrobotics/rdk into RSDK-10…
DTCurrie Aug 28, 2025
24d28cf
cleanup
DTCurrie Sep 3, 2025
ca73e11
update registered resources in entrypoint test
DTCurrie Sep 3, 2025
dd66b9f
cleanup
DTCurrie Sep 4, 2025
c728da2
Add readlock and configurable FPS, defaults to 10
DTCurrie Sep 4, 2025
95019b6
add tests
DTCurrie Sep 4, 2025
5dd282e
Update services/worldstatestore/fake/fake.go
DTCurrie Sep 4, 2025
94b3d04
cleanup and test fixes
DTCurrie Sep 4, 2025
4c703a4
Merge branch 'RSDK-10422-add-world-object-store-service' of github.co…
DTCurrie Sep 4, 2025
be2c61b
lint
DTCurrie Sep 4, 2025
a923535
fps -> float
DTCurrie Sep 4, 2025
647fc2c
return struct instead of stream
DTCurrie Sep 5, 2025
ed44bb2
lint
DTCurrie Sep 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/register/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ import (
_ "go.viam.com/rdk/services/shell/register"
_ "go.viam.com/rdk/services/slam/register"
_ "go.viam.com/rdk/services/vision/register"
_ "go.viam.com/rdk/services/worldstatestore/register"
)
1 change: 1 addition & 0 deletions services/register_apis/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ import (
_ "go.viam.com/rdk/services/navigation"
_ "go.viam.com/rdk/services/shell"
_ "go.viam.com/rdk/services/slam"
_ "go.viam.com/rdk/services/worldstatestore"
)
156 changes: 156 additions & 0 deletions services/worldstatestore/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package worldstatestore

import (
"context"
"errors"
"io"

"go.opencensus.io/trace"
commonPB "go.viam.com/api/common/v1"
pb "go.viam.com/api/service/worldstatestore/v1"
"go.viam.com/utils/protoutils"
"go.viam.com/utils/rpc"

"go.viam.com/rdk/logging"
rprotoutils "go.viam.com/rdk/protoutils"
"go.viam.com/rdk/resource"
)

type client struct {
resource.Named
resource.TriviallyReconfigurable
resource.TriviallyCloseable
name string
client pb.WorldStateStoreServiceClient
logger logging.Logger
}

// NewClientFromConn constructs a new Client from the connection passed in.
func NewClientFromConn(
ctx context.Context,
conn rpc.ClientConn,
remoteName string,
name resource.Name,
logger logging.Logger,
) (Service, error) {
grpcClient := pb.NewWorldStateStoreServiceClient(conn)
c := &client{
Named: name.PrependRemote(remoteName).AsNamed(),
name: name.ShortName(),
client: grpcClient,
logger: logger,
}
return c, nil
}

// ListUUIDs lists all UUIDs in the world state store.
func (c *client) ListUUIDs(ctx context.Context, extra map[string]interface{}) ([][]byte, error) {
ctx, span := trace.StartSpan(ctx, "worldstatestore::client::ListUUIDs")
defer span.End()
ext, err := protoutils.StructToStructPb(extra)
if err != nil {
return nil, err
}

req := &pb.ListUUIDsRequest{Name: c.name, Extra: ext}
resp, err := c.client.ListUUIDs(ctx, req)
if err != nil {
return nil, err
}
uuids := resp.GetUuids()
if uuids == nil {
return nil, ErrNilResponse
}

return uuids, nil
}

// GetTransform gets the transform for a given UUID.
func (c *client) GetTransform(ctx context.Context, uuid []byte, extra map[string]interface{}) (*commonPB.Transform, error) {
ctx, span := trace.StartSpan(ctx, "worldstatestore::client::GetTransform")
defer span.End()
ext, err := protoutils.StructToStructPb(extra)
if err != nil {
return nil, err
}

req := &pb.GetTransformRequest{Name: c.name, Uuid: uuid, Extra: ext}
resp, err := c.client.GetTransform(ctx, req)
if err != nil {
return nil, err
}
obj := resp.GetTransform()
if obj == nil {
return nil, ErrNilResponse
}

return obj, nil
}

// StreamTransformChanges streams transform changes.
func (c *client) StreamTransformChanges(ctx context.Context, extra map[string]interface{}) (<-chan TransformChange, error) {
ctx, span := trace.StartSpan(ctx, "worldstatestore::client::StreamTransformChanges")
defer span.End()

ext, err := protoutils.StructToStructPb(extra)
if err != nil {
return nil, err
}

req := &pb.StreamTransformChangesRequest{Name: c.name, Extra: ext}
stream, err := c.client.StreamTransformChanges(ctx, req)
if err != nil {
return nil, err
}
// Check the initial response immediately to catch early errors.
_, err = stream.Recv()
if err != nil {
return nil, err
}

changesChan := make(chan TransformChange, 1024)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can buy into the fact that this should be a buffered channel (not block on every send until a receive occurs). Why 1024? I assume that's just some arbitrary value? Any idea what the rate of production/consumption will look like? Point being that you may have to mess with this value to get good performance, although I have very little context on what the data even is here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly arbitrary, based off some quick googling and looking around at what we use elsewhere. I believe the input componet uses a similar size so I ran with that, but very much open to input on this as it is outside my wheelhouse.

As for the data, it will be completely up to the user and how they decide to emit changes. For example, in the fake I run an animation loop at about 10fps that triggers 3-4 updates each frame.

Maybe the best choice here is to not define the size?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not define the size

I think you'll certainly want some buffer, as defining no buffer will mean that any send to the channel blocks until there is a receive, which will not allow the client to store transform changes from the server while the user is not pulling from changesChan. 1024 seems like a reasonable starting value here, and if your "fake" setup seems to be streaming changes well enough, then I think it's a fine place to start. I realize we probably don't have a lot of data/intuition on exactly how this API is going to be used yet, but the channel size here is an internal detail that can certainly be changed after release.


go func() {
defer close(changesChan)

for {
resp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return
}
if ctx.Err() != nil || errors.Is(err, context.Canceled) {
c.logger.Debug(err)
return
}
c.logger.Errorw("failed to receive from stream", "error", err)
return
}

change := TransformChange{
ChangeType: resp.ChangeType,
Transform: resp.Transform,
}

if resp.UpdatedFields != nil {
change.UpdatedFields = resp.UpdatedFields.Paths
}

select {
case changesChan <- change:
case <-ctx.Done():
return
}
}
}()

return changesChan, nil
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benjirewis, you don't need to review everything, but mind taking a look at how we are managing this stream to make sure there are no glaring issues? I tried a few things, and it is shutting down cleanly and working e2e.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look here and in the server code and compared it to some server-streaming code I've seen; I see no glaring issues. Returning a channel seems pretty ergonomic to me, but I suppose you could also return a struct that implements Next() (TransformChange, error).

Also cc @viamrobotics/netcode since a whole new service type is being added here with a non-trivial amount of code. We'll have to change some small pieces of this (ShortName()) @jmatth.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, let me try that out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, returning a stream struct looks a little cleaner than returning the channel directly.


// DoCommand handles arbitrary commands.
func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) {
ctx, span := trace.StartSpan(ctx, "worldstatestore::client::DoCommand")
defer span.End()

return rprotoutils.DoFromResourceClient(ctx, c.client, c.name, cmd)
}
Loading
Loading