-
Notifications
You must be signed in to change notification settings - Fork 599
Expand file tree
/
Copy pathworld_object.py
More file actions
339 lines (280 loc) · 16.6 KB
/
world_object.py
File metadata and controls
339 lines (280 loc) · 16.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""For clients to use the world object service"""
from bosdyn.api import geometry_pb2 as geom
from bosdyn.api import world_object_pb2
from bosdyn.api import world_object_service_pb2_grpc as world_object_service
from bosdyn.client.common import BaseClient, common_header_errors
from bosdyn.client.frame_helpers import *
from bosdyn.client.robot_command import NoTimeSyncError
from bosdyn.client.time_sync import update_time_filter, update_timestamp_filter
from bosdyn.util import now_timestamp
class WorldObjectClient(BaseClient):
"""Client for World Object service."""
default_service_name = 'world-objects'
service_type = 'bosdyn.api.WorldObjectService'
def __init__(self):
super(WorldObjectClient, self).__init__(world_object_service.WorldObjectServiceStub)
self._timesync_endpoint = None
def update_from(self, other):
super(WorldObjectClient, self).update_from(other)
# Grab a timesync endpoint if it is available.
try:
self._timesync_endpoint = other.time_sync.endpoint
except AttributeError:
pass # other doesn't have a time_sync accessor
@property
def timesync_endpoint(self):
"""Accessor for timesync-endpoint that is grabbed via 'update_from()'.
Raises:
bosdyn.client.robot_command.NoTimeSyncError: Could not find the timesync endpoint for
the robot.
"""
if not self._timesync_endpoint:
raise NoTimeSyncError("[world object service] No timesync endpoint set for the robot")
return self._timesync_endpoint
def list_world_objects(self, object_type=None, time_start_point=None, **kwargs):
"""Get a list of World Objects.
Args:
object_type (list of bosdyn.api.WorldObjectType): Specific types to include in the
response, all other types will be
filtered out.
time_start_point (float): A client timestamp to filter objects in the response. All objects
will have a timestamp after this time.
Returns:
The response message, which includes the filtered list of all world objects.
Raises:
RpcError: Problem communicating with the robot.
bosdyn.client.robot_command.NoTimeSyncError: Couldn't convert the timestamp into robot
time.
"""
if time_start_point is not None:
time_start_point = update_time_filter(self, time_start_point, self.timesync_endpoint)
req = world_object_pb2.ListWorldObjectRequest(object_type=object_type,
timestamp_filter=time_start_point)
return self.call(self._stub.ListWorldObjects, req,
value_from_response=_get_world_object_value,
error_from_response=common_header_errors, copy_request=False, **kwargs)
def list_world_objects_async(self, object_type=None, time_start_point=None, **kwargs):
"""Async version of list_world_objects()."""
if time_start_point is not None:
time_start_point = update_time_filter(self, time_start_point, self.timesync_endpoint)
req = world_object_pb2.ListWorldObjectRequest(object_type=object_type,
timestamp_filter=time_start_point)
return self.call_async(self._stub.ListWorldObjects, req,
value_from_response=_get_world_object_value,
error_from_response=common_header_errors, copy_request=False,
**kwargs)
def mutate_world_objects(self, mutation_req, **kwargs):
"""Mutate (add, change, delete) world objects.
Args:
mutation_req (world_object_pb2.MutateWorldObjectRequest): The request including
the object to be mutated and the
type of mutation.
Returns:
The response, which includes the id of the mutated object.
Raises:
RpcError: Problem communicating with the robot.
bosdyn.client.robot_command.NoTimeSyncError: Couldn't convert the timestamp into robot
time.
"""
if mutation_req.mutation.object.HasField("acquisition_time"):
# Ensure the mutation request's object's time of detection is in robot time.
client_timestamp = mutation_req.mutation.object.acquisition_time
mutation_req.mutation.object.acquisition_time.CopyFrom(
update_timestamp_filter(self, client_timestamp, self.timesync_endpoint))
return self.call(self._stub.MutateWorldObjects, mutation_req,
value_from_response=_get_status, error_from_response=common_header_errors,
**kwargs)
def mutate_world_objects_async(self, mutation_req, **kwargs):
"""Async version of mutate_world_objects()."""
if mutation_req.mutation.object.HasField("acquisition_time"):
# Ensure the mutation request's object's time of detection is in robot time.
client_timestamp = mutation_req.mutation.object.acquisition_time
mutation_req.mutation.object.acquisition_time.CopyFrom(
update_timestamp_filter(self, client_timestamp, self.timesync_endpoint))
return self.call_async(self._stub.MutateWorldObjects, mutation_req,
value_from_response=_get_status,
error_from_response=common_header_errors, **kwargs)
def draw_sphere(self, name, x_rt_frame_name, y_rt_frame_name, z_rt_frame_name, frame_name,
radius=0.05, rgba=(255, 0, 0, 1), list_objects_now=True):
"""Create a drawable sphere world object that will be sent to the world object service
with a mutation request.
Args:
name (string): The human-readable name of the world object.
x_rt_frame_name,y_rt_frame_name,z_rt_frame_name (int): The coordinate position (x,y,z) of
the drawable sphere.
frame_name (string): the frame in which the sphere's position is described.
radius (float): The radius for the drawn sphere.
rgba (4 valued tuple): The RGBA color, where RGB are int values in [0,255] and A is a float in [0,1].
list_objects_now (boolean): Should the ListWorldObjects request be made after creating
the sphere world object.
Returns:
The MutateWorldObjectResponse for the addition of the sphere world object.
"""
vision_tform_drawable = geom.SE3Pose(
position=geom.Vec3(x=x_rt_frame_name, y=y_rt_frame_name, z=z_rt_frame_name),
rotation=geom.Quaternion(w=1, x=0, y=0, z=0))
# Create a map between the child frame name and the parent frame name/SE3Pose parent_tform_child
edges = {}
# Create an edge in the frame tree snapshot that includes vision_tform_drawable
drawable_frame_name = name
edges = add_edge_to_tree(edges, vision_tform_drawable, frame_name, drawable_frame_name)
snapshot = geom.FrameTreeSnapshot(child_to_parent_edge_map=edges)
# Set the acquisition time for the sphere using a function to get google.protobuf.Timestamp of the current system time.
time_now = now_timestamp()
# Create the sphere drawable object
sphere = world_object_pb2.DrawableSphere(radius=radius)
draw_color = world_object_pb2.DrawableProperties.Color(r=rgba[0], g=rgba[1], b=rgba[2],
a=rgba[3])
sphere_drawable_prop = world_object_pb2.DrawableProperties(
color=draw_color, label=name, wireframe=False, sphere=sphere,
frame_name_drawable=drawable_frame_name)
# Create the complete world object with transform information, a unique name, and the drawable sphere properties.
sphere_to_add = world_object_pb2.WorldObject(name=name, transforms_snapshot=snapshot,
acquisition_time=time_now,
drawable_properties=[sphere_drawable_prop])
# Add the sphere to the robot's world object service
add_sphere = make_add_world_object_req(sphere_to_add)
resp = self.mutate_world_objects(mutation_req=add_sphere)
if list_objects_now:
# Request a listing of the world objects so that the sphere shows up in the log.
self.list_world_objects()
return resp
def draw_oriented_bounding_box(self, name, drawable_box_frame_name, frame_name,
frame_name_tform_drawable_box, size_ewrt_box_vec3,
rgba=(255, 0, 0, 1), wireframe=True, list_objects_now=False):
"""Create a drawable 3D box world object that will be sent to the world object service
with a mutation request.
Args:
name (string): The human-readable name of the world object.
drawable_box_frame_name (string): The frame name for the drawable box frame.
frame_name (string): The frame name which the drawable box is described relative to.
frame_name_tform_drawable_box (geometry_pb2.SE3Pose): the SE3 pose of the drawable box relative to frame name.
size_ewrt_box_vec3 (float): The size of the box (x,y,z) expressed with respect to the
drawable box frame.
rgba (4 valued tuple): The RGBA color, where RGB are int values in [0,255] and A is a float in [0,1].
wireframe (boolean): Should this be drawn as a wireframe [wireframe=true] or a solid object [wireframe=false].
list_objects_now (boolean): Should the ListWorldObjects request be made after creating
the sphere world object.
Returns:
The MutateWorldObjectResponse for the addition of the sphere world object.
"""
# Create a map between the child frame name and the parent frame name/SE3Pose parent_tform_child
edges = {}
# Create an edge in the frame tree snapshot that includes frame_tform_box
drawable_frame_name = name
edges = add_edge_to_tree(edges, frame_name_tform_drawable_box, frame_name,
drawable_frame_name)
snapshot = geom.FrameTreeSnapshot(child_to_parent_edge_map=edges)
# Set the acquisition time for the box using a function to get google.protobuf.Timestamp of the current system time.
time_now = now_timestamp()
# Create the box drawable object
box = world_object_pb2.DrawableBox(size=size_ewrt_box_vec3)
draw_color = world_object_pb2.DrawableProperties.Color(r=rgba[0], g=rgba[1], b=rgba[2],
a=rgba[3])
box_drawable_prop = world_object_pb2.DrawableProperties(
color=draw_color, label=name, wireframe=wireframe, box=box,
frame_name_drawable=drawable_box_frame_name)
# Create the complete world object with transform information, a unique name, and the drawable box properties.
box_to_add = world_object_pb2.WorldObject(name=name, transforms_snapshot=snapshot,
acquisition_time=time_now,
drawable_properties=[box_drawable_prop])
# Add the box to the robot's world object service
add_box = make_add_world_object_req(box_to_add)
resp = self.mutate_world_objects(mutation_req=add_box)
if list_objects_now:
# Request a listing of the world objects so that the box shows up in the log.
self.list_world_objects()
return resp
def _get_world_object_value(response):
return response
def _get_status(response):
if (response.status != world_object_pb2.MutateWorldObjectResponse.STATUS_OK):
if (response.status == world_object_pb2.MutateWorldObjectResponse.STATUS_INVALID_MUTATION_ID
):
print("Object id not found, and could not be mutated.")
if (response.status == world_object_pb2.MutateWorldObjectResponse.STATUS_NO_PERMISSION):
print(
"Cannot change/delete objects detected by Spot's perception system, only client objects."
)
return response
'''
Static helper methods for constructing and sending mutation requests for a given world object.
'''
def make_add_world_object_req(world_obj):
"""Add a world object to the scene.
Args:
world_obj (WorldObject): The world object to be added into the robot's perception scene.
Returns:
A MutateWorldObjectRequest where the action is to "add" the object to the scene.
"""
add_obj = world_object_pb2.MutateWorldObjectRequest.Mutation(
action=world_object_pb2.MutateWorldObjectRequest.ACTION_ADD, object=world_obj)
req = world_object_pb2.MutateWorldObjectRequest(mutation=add_obj)
return req
def make_delete_world_object_req(world_obj):
"""Delete a world object from the scene.
Args:
world_obj (WorldObject): The world object to be deleted in the robot's perception scene. The
object must be a client-added object and have the correct world object
id returned by the service after adding the object.
Returns:
A MutateWorldObjectRequest where the action is to "delete" the object to the scene.
"""
del_obj = world_object_pb2.MutateWorldObjectRequest.Mutation(
action=world_object_pb2.MutateWorldObjectRequest.ACTION_DELETE, object=world_obj)
req = world_object_pb2.MutateWorldObjectRequest(mutation=del_obj)
return req
def make_change_world_object_req(world_obj):
"""Change/update an existing world object in the scene.
Args:
world_obj (WorldObject): The world object to be changed/updated in the robot's perception scene.
The object must be a client-added object and have the correct world object
id returned by the service after adding the object.
Returns:
A MutateWorldObjectRequest where the action is to "change" the object to the scene.
"""
change_obj = world_object_pb2.MutateWorldObjectRequest.Mutation(
action=world_object_pb2.MutateWorldObjectRequest.ACTION_CHANGE, object=world_obj)
req = world_object_pb2.MutateWorldObjectRequest(mutation=change_obj)
return req
def send_add_mutation_requests(world_object_client, world_object_array):
"""
Create and send an "add" mutation request for each world object in an array. Return a matching
array of the object id's that are assigned when the object is created, so that each object we add
can be identified and removed individually (if desired) later.
Args:
world_object_client (WorldObjectClient): Client for World Object service.
world_object_array (List): List of objects to add.
Returns:
A List containing the object ids associated with the objects created.
"""
obj_id = [-1] * len(world_object_array)
for i, obj in enumerate(world_object_array):
add_req = make_add_world_object_req(obj)
add_resp = world_object_client.mutate_world_objects(mutation_req=add_req)
obj_id[i] = add_resp.mutated_object_id
return obj_id
def send_delete_mutation_requests(world_object_client, delete_object_id_array):
"""
Create and send a "delete" mutation request for each world object successfully identified from a
given list of object id's.
Args:
world_object_client (WorldObjectClient): Client for World Object service.
delete_object_id_array (List): List of object id's to send delete requests for.
"""
world_objects = world_object_client.list_world_objects().world_objects
for obj in world_objects:
this_object_id = obj.id
if isinstance(delete_object_id_array, int):
delete_object_id_array = [delete_object_id_array]
for i in range(len(delete_object_id_array)):
delete_id = delete_object_id_array[i]
if this_object_id == delete_id:
del_req = make_delete_world_object_req(obj)
del_resp = world_object_client.mutate_world_objects(mutation_req=del_req)
continue