Skip to content

Commit 6f1386f

Browse files
authored
RSDK-10422 Add world object store service (#5243)
1 parent 3ce4a9b commit 6f1386f

File tree

12 files changed

+1753
-2
lines changed

12 files changed

+1753
-2
lines changed

services/register/all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ import (
1010
_ "go.viam.com/rdk/services/shell/register"
1111
_ "go.viam.com/rdk/services/slam/register"
1212
_ "go.viam.com/rdk/services/vision/register"
13+
_ "go.viam.com/rdk/services/worldstatestore/register"
1314
)

services/register_apis/all.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ import (
1212
_ "go.viam.com/rdk/services/navigation"
1313
_ "go.viam.com/rdk/services/shell"
1414
_ "go.viam.com/rdk/services/slam"
15+
_ "go.viam.com/rdk/services/worldstatestore"
1516
)

services/worldstatestore/client.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package worldstatestore
2+
3+
import (
4+
"context"
5+
"errors"
6+
"io"
7+
8+
"go.opencensus.io/trace"
9+
commonPB "go.viam.com/api/common/v1"
10+
pb "go.viam.com/api/service/worldstatestore/v1"
11+
"go.viam.com/utils/protoutils"
12+
"go.viam.com/utils/rpc"
13+
14+
"go.viam.com/rdk/logging"
15+
rprotoutils "go.viam.com/rdk/protoutils"
16+
"go.viam.com/rdk/resource"
17+
)
18+
19+
type client struct {
20+
resource.Named
21+
resource.TriviallyReconfigurable
22+
resource.TriviallyCloseable
23+
name string
24+
client pb.WorldStateStoreServiceClient
25+
logger logging.Logger
26+
}
27+
28+
// NewClientFromConn constructs a new Client from the connection passed in.
29+
func NewClientFromConn(
30+
ctx context.Context,
31+
conn rpc.ClientConn,
32+
remoteName string,
33+
name resource.Name,
34+
logger logging.Logger,
35+
) (Service, error) {
36+
grpcClient := pb.NewWorldStateStoreServiceClient(conn)
37+
c := &client{
38+
Named: name.PrependRemote(remoteName).AsNamed(),
39+
name: name.ShortName(),
40+
client: grpcClient,
41+
logger: logger,
42+
}
43+
return c, nil
44+
}
45+
46+
// ListUUIDs lists all UUIDs in the world state store.
47+
func (c *client) ListUUIDs(ctx context.Context, extra map[string]interface{}) ([][]byte, error) {
48+
ctx, span := trace.StartSpan(ctx, "worldstatestore::client::ListUUIDs")
49+
defer span.End()
50+
ext, err := protoutils.StructToStructPb(extra)
51+
if err != nil {
52+
return nil, err
53+
}
54+
55+
req := &pb.ListUUIDsRequest{Name: c.name, Extra: ext}
56+
resp, err := c.client.ListUUIDs(ctx, req)
57+
if err != nil {
58+
return nil, err
59+
}
60+
uuids := resp.GetUuids()
61+
if uuids == nil {
62+
return nil, ErrNilResponse
63+
}
64+
65+
return uuids, nil
66+
}
67+
68+
// GetTransform gets the transform for a given UUID.
69+
func (c *client) GetTransform(ctx context.Context, uuid []byte, extra map[string]interface{}) (*commonPB.Transform, error) {
70+
ctx, span := trace.StartSpan(ctx, "worldstatestore::client::GetTransform")
71+
defer span.End()
72+
ext, err := protoutils.StructToStructPb(extra)
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
req := &pb.GetTransformRequest{Name: c.name, Uuid: uuid, Extra: ext}
78+
resp, err := c.client.GetTransform(ctx, req)
79+
if err != nil {
80+
return nil, err
81+
}
82+
obj := resp.GetTransform()
83+
if obj == nil {
84+
return nil, ErrNilResponse
85+
}
86+
87+
return obj, nil
88+
}
89+
90+
// StreamTransformChanges streams transform changes.
91+
func (c *client) StreamTransformChanges(ctx context.Context, extra map[string]interface{}) (*TransformChangeStream, error) {
92+
ctx, span := trace.StartSpan(ctx, "worldstatestore::client::StreamTransformChanges")
93+
defer span.End()
94+
95+
ext, err := protoutils.StructToStructPb(extra)
96+
if err != nil {
97+
return nil, err
98+
}
99+
100+
req := &pb.StreamTransformChangesRequest{Name: c.name, Extra: ext}
101+
stream, err := c.client.StreamTransformChanges(ctx, req)
102+
if err != nil {
103+
return nil, err
104+
}
105+
// Check the initial response immediately to catch early errors.
106+
_, err = stream.Recv()
107+
if err != nil {
108+
return nil, err
109+
}
110+
111+
iter := &TransformChangeStream{
112+
next: func() (TransformChange, error) {
113+
resp, err := stream.Recv()
114+
if err != nil {
115+
if errors.Is(err, io.EOF) {
116+
return TransformChange{}, io.EOF
117+
}
118+
if ctx.Err() != nil || errors.Is(err, context.Canceled) {
119+
return TransformChange{}, ctx.Err()
120+
}
121+
return TransformChange{}, err
122+
}
123+
change := TransformChange{
124+
ChangeType: resp.ChangeType,
125+
Transform: resp.Transform,
126+
}
127+
if resp.UpdatedFields != nil {
128+
change.UpdatedFields = resp.UpdatedFields.Paths
129+
}
130+
return change, nil
131+
},
132+
}
133+
134+
return iter, nil
135+
}
136+
137+
// DoCommand handles arbitrary commands.
138+
func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) {
139+
ctx, span := trace.StartSpan(ctx, "worldstatestore::client::DoCommand")
140+
defer span.End()
141+
142+
return rprotoutils.DoFromResourceClient(ctx, c.client, c.name, cmd)
143+
}

0 commit comments

Comments
 (0)