-
Notifications
You must be signed in to change notification settings - Fork 126
RSDK-10422 Add world object store service #5243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 34 commits
35d243c
58bea5e
a109126
167e93b
1b1dac5
ed9e15d
6c2e826
c67adc6
18ab6c6
16abc5b
3353dac
c0b21cf
e1662b3
1037ceb
6fd968a
daaed79
5f42c75
ada9502
bc4e0f3
e33470f
e9432ae
d124815
bebd69a
e602452
24d28cf
ca73e11
dd66b9f
c728da2
95019b6
5dd282e
94b3d04
4c703a4
be2c61b
a923535
647fc2c
ed44bb2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,156 @@ | ||
| package worldstatestore | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "io" | ||
|
|
||
| "go.opencensus.io/trace" | ||
| commonPB "go.viam.com/api/common/v1" | ||
| pb "go.viam.com/api/service/worldstatestore/v1" | ||
| "go.viam.com/utils/protoutils" | ||
| "go.viam.com/utils/rpc" | ||
|
|
||
| "go.viam.com/rdk/logging" | ||
| rprotoutils "go.viam.com/rdk/protoutils" | ||
| "go.viam.com/rdk/resource" | ||
| ) | ||
|
|
||
| type client struct { | ||
| resource.Named | ||
| resource.TriviallyReconfigurable | ||
| resource.TriviallyCloseable | ||
| name string | ||
| client pb.WorldStateStoreServiceClient | ||
| logger logging.Logger | ||
| } | ||
|
|
||
| // NewClientFromConn constructs a new Client from the connection passed in. | ||
| func NewClientFromConn( | ||
| ctx context.Context, | ||
| conn rpc.ClientConn, | ||
| remoteName string, | ||
| name resource.Name, | ||
| logger logging.Logger, | ||
| ) (Service, error) { | ||
| grpcClient := pb.NewWorldStateStoreServiceClient(conn) | ||
| c := &client{ | ||
| Named: name.PrependRemote(remoteName).AsNamed(), | ||
| name: name.ShortName(), | ||
| client: grpcClient, | ||
| logger: logger, | ||
| } | ||
| return c, nil | ||
| } | ||
|
|
||
| // ListUUIDs lists all UUIDs in the world state store. | ||
| func (c *client) ListUUIDs(ctx context.Context, extra map[string]interface{}) ([][]byte, error) { | ||
| ctx, span := trace.StartSpan(ctx, "worldstatestore::client::ListUUIDs") | ||
| defer span.End() | ||
| ext, err := protoutils.StructToStructPb(extra) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| req := &pb.ListUUIDsRequest{Name: c.name, Extra: ext} | ||
| resp, err := c.client.ListUUIDs(ctx, req) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| uuids := resp.GetUuids() | ||
| if uuids == nil { | ||
| return nil, ErrNilResponse | ||
| } | ||
|
|
||
| return uuids, nil | ||
| } | ||
|
|
||
| // GetTransform gets the transform for a given UUID. | ||
| func (c *client) GetTransform(ctx context.Context, uuid []byte, extra map[string]interface{}) (*commonPB.Transform, error) { | ||
| ctx, span := trace.StartSpan(ctx, "worldstatestore::client::GetTransform") | ||
| defer span.End() | ||
| ext, err := protoutils.StructToStructPb(extra) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| req := &pb.GetTransformRequest{Name: c.name, Uuid: uuid, Extra: ext} | ||
| resp, err := c.client.GetTransform(ctx, req) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| obj := resp.GetTransform() | ||
| if obj == nil { | ||
| return nil, ErrNilResponse | ||
| } | ||
|
|
||
| return obj, nil | ||
| } | ||
|
|
||
| // StreamTransformChanges streams transform changes. | ||
| func (c *client) StreamTransformChanges(ctx context.Context, extra map[string]interface{}) (<-chan TransformChange, error) { | ||
| ctx, span := trace.StartSpan(ctx, "worldstatestore::client::StreamTransformChanges") | ||
| defer span.End() | ||
|
|
||
| ext, err := protoutils.StructToStructPb(extra) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| req := &pb.StreamTransformChangesRequest{Name: c.name, Extra: ext} | ||
| stream, err := c.client.StreamTransformChanges(ctx, req) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| // Check the initial response immediately to catch early errors. | ||
| _, err = stream.Recv() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| changesChan := make(chan TransformChange, 1024) | ||
|
|
||
| go func() { | ||
| defer close(changesChan) | ||
|
|
||
| for { | ||
| resp, err := stream.Recv() | ||
| if err != nil { | ||
| if errors.Is(err, io.EOF) { | ||
| return | ||
| } | ||
| if ctx.Err() != nil || errors.Is(err, context.Canceled) { | ||
| c.logger.Debug(err) | ||
| return | ||
| } | ||
| c.logger.Errorw("failed to receive from stream", "error", err) | ||
| return | ||
| } | ||
|
|
||
| change := TransformChange{ | ||
| ChangeType: resp.ChangeType, | ||
| Transform: resp.Transform, | ||
| } | ||
|
|
||
| if resp.UpdatedFields != nil { | ||
| change.UpdatedFields = resp.UpdatedFields.Paths | ||
| } | ||
|
|
||
| select { | ||
| case changesChan <- change: | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| return changesChan, nil | ||
| } | ||
|
||
|
|
||
| // DoCommand handles arbitrary commands. | ||
| func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { | ||
| ctx, span := trace.StartSpan(ctx, "worldstatestore::client::DoCommand") | ||
| defer span.End() | ||
|
|
||
| return rprotoutils.DoFromResourceClient(ctx, c.client, c.name, cmd) | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can buy into the fact that this should be a buffered channel (not block on every send until a receive occurs). Why 1024? I assume that's just some arbitrary value? Any idea what the rate of production/consumption will look like? Point being that you may have to mess with this value to get good performance, although I have very little context on what the data even is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly arbitrary, based off some quick googling and looking around at what we use elsewhere. I believe the input componet uses a similar size so I ran with that, but very much open to input on this as it is outside my wheelhouse.
As for the data, it will be completely up to the user and how they decide to emit changes. For example, in the fake I run an animation loop at about 10fps that triggers 3-4 updates each frame.
Maybe the best choice here is to not define the size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'll certainly want some buffer, as defining no buffer will mean that any send to the channel blocks until there is a receive, which will not allow the client to store transform changes from the server while the user is not pulling from
changesChan. 1024 seems like a reasonable starting value here, and if your "fake" setup seems to be streaming changes well enough, then I think it's a fine place to start. I realize we probably don't have a lot of data/intuition on exactly how this API is going to be used yet, but the channel size here is an internal detail that can certainly be changed after release.