-
Notifications
You must be signed in to change notification settings - Fork 599
Expand file tree
/
Copy pathpoint_cloud.py
More file actions
206 lines (151 loc) · 8.42 KB
/
point_cloud.py
File metadata and controls
206 lines (151 loc) · 8.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Client for the point cloud service.
This allows client code to read from a point cloud service.
"""
import collections
import logging
import bosdyn.api.point_cloud_pb2 as point_cloud_protos
import bosdyn.api.point_cloud_service_pb2_grpc as point_cloud_service
from bosdyn.client.common import (common_header_errors, error_factory, error_pair,
handle_common_header_errors)
from bosdyn.client.exceptions import ResponseError, UnsetStatusError
from .common import BaseClient
LOGGER = logging.getLogger('point_cloud_client')
class PointCloudResponseError(ResponseError):
"""General class of errors for PointCloud service."""
class UnknownPointCloudSourceError(PointCloudResponseError):
"""System cannot find the requested point cloud source name."""
class SourceDataError(PointCloudResponseError):
"""System cannot generate the PointCloudSource at this time."""
class PointCloudDataError(PointCloudResponseError):
"""System cannot generate point cloud data at this time."""
class PointCloudTypeError(PointCloudResponseError):
"""System cannot generate point cloud with the request cloud_type."""
_STATUS_TO_ERROR = collections.defaultdict(lambda: (PointCloudResponseError, None))
_STATUS_TO_ERROR.update({
point_cloud_protos.PointCloudResponse.STATUS_OK: (None, None),
point_cloud_protos.PointCloudResponse.STATUS_UNKNOWN_SOURCE:
error_pair(UnknownPointCloudSourceError),
point_cloud_protos.PointCloudResponse.STATUS_SOURCE_DATA_ERROR:
error_pair(SourceDataError),
point_cloud_protos.PointCloudResponse.STATUS_UNKNOWN:
error_pair(UnsetStatusError),
point_cloud_protos.PointCloudResponse.STATUS_POINT_CLOUD_DATA_ERROR:
error_pair(PointCloudDataError),
point_cloud_protos.PointCloudResponse.STATUS_UNSUPPORTED_CLOUD_TYPE:
error_pair(PointCloudTypeError),
})
@handle_common_header_errors
def _error_from_response(response):
"""Return a custom exception based on the first invalid point_cloud response, None if no error."""
for point_cloud_response in response.point_cloud_responses:
result = error_factory(response, point_cloud_response.status,
status_to_string=point_cloud_protos.PointCloudResponse.Status.Name,
status_to_error=_STATUS_TO_ERROR)
if result is not None:
return result
return None
class PointCloudClient(BaseClient):
"""A client handling point clouds."""
default_service_name = 'point-cloud'
service_type = 'bosdyn.api.PointCloudService'
def __init__(self):
super(PointCloudClient, self).__init__(point_cloud_service.PointCloudServiceStub)
def list_point_cloud_sources(self, **kwargs):
""" Obtain the list of PointCloudSources.
Returns:
A list of the different point cloud sources as strings.
Raises:
RpcError: Problem communicating with the robot.
"""
req = self._get_list_point_cloud_source_request()
return self.call(self._stub.ListPointCloudSources, req, _list_point_cloud_sources_value,
common_header_errors, copy_request=False, **kwargs)
def list_point_cloud_sources_async(self, **kwargs):
"""Async version of list_point_cloud_sources()"""
req = self._get_list_point_cloud_source_request()
return self.call_async(self._stub.ListPointCloudSources, req,
_list_point_cloud_sources_value, common_header_errors,
copy_request=False, **kwargs)
def get_point_cloud_from_sources(self, point_cloud_sources, **kwargs):
"""Obtain point clouds from sources using default parameters.
Args:
point_cloud_sources (list of strings): The source names to request point clouds from.
Returns:
A list of point cloud responses for each of the requested sources.
Raises:
RpcError: Problem communicating with the robot.
UnknownPointCloudSourceError: Provided point cloud source was invalid or not found
point_cloud.SourceDataError: Failed to fill out PointCloudSource. All other fields
are not filled
UnsetStatusError: An internal PointCloudService issue has happened
PointCloudDataError: Problem with the point cloud data. Only PointCloudSource is filled
"""
return self.get_point_cloud([build_pc_request(src) for src in point_cloud_sources],
**kwargs)
def get_point_cloud_from_sources_async(self, point_cloud_sources, **kwargs):
"""Obtain point clouds from sources using default parameters."""
return self.get_point_cloud_async([build_pc_request(src) for src in point_cloud_sources],
**kwargs)
def get_point_cloud(self, point_cloud_requests, **kw_args):
"""Get the most recent point cloud
Args:
point_cloud_requests (list of PointCloudRequest): A list of PointCloudRequest protobuf
messages which specify which point clouds to collect
kw_args: Extra arguments to pass to grpc call invocation.
Returns:
A list of point cloud responses for each of the requested sources.
Raises:
RpcError: Problem communicating with the robot.
UnknownPointCloudSourceError: Provided point cloud source was invalid or not found
point_cloud.SourceDataError: Failed to fill out PointCloudSource. All other fields
are not filled
UnsetStatusError: An internal PointCloudService issue has happened
PointCloudDataError: Problem with the point cloud data. Only PointCloudSource is filled
"""
request = self._get_point_cloud_request(point_cloud_requests)
return self.call(self._stub.GetPointCloud, request,
value_from_response=_get_point_cloud_value,
error_from_response=_error_from_response, **kw_args)
def get_point_cloud_async(self, point_cloud_requests, **kw_args):
"""Get the most recent point cloud
Args:
point_cloud_requests (list of PointCloudRequest): A list of PointCloudRequest protobuf
messages which specify which point clouds to collect
kw_args: Extra arguments to pass to grpc call invocation.
Raises:
RpcError: Problem communicating with the robot.
UnknownPointCloudSourceError: Provided point cloud source was invalid or not found
point_cloud.SourceDataError: Failed to fill out PointCloudSource. All other fields
are not filled
UnsetStatusError: An internal PointCloudService issue has happened
PointCloudDataError: Problem with the point cloud data. Only PointCloudSource is filled
Returns:
A list of point cloud responses for each of the requested sources.
"""
request = self._get_point_cloud_request(point_cloud_requests)
return self.call_async(self._stub.GetPointCloud, request,
value_from_response=_get_point_cloud_value,
error_from_response=_error_from_response, **kw_args)
@staticmethod
def _get_point_cloud_request(point_cloud_requests):
return point_cloud_protos.GetPointCloudRequest(point_cloud_requests=point_cloud_requests)
@staticmethod
def _get_list_point_cloud_source_request():
return point_cloud_protos.ListPointCloudSourcesRequest()
def build_pc_request(point_cloud_source_name):
"""Helper function which builds an PointCloudRequest from a point cloud source name.
Args:
point_cloud_source_name (string): The point cloud source to query.
Returns:
The PointCloudRequest protobuf message for the given parameters.
"""
return point_cloud_protos.PointCloudRequest(point_cloud_source_name=point_cloud_source_name)
def _list_point_cloud_sources_value(response):
return response.point_cloud_sources
def _get_point_cloud_value(response):
return response.point_cloud_responses