-
Notifications
You must be signed in to change notification settings - Fork 599
Expand file tree
/
Copy pathspot_check.py
More file actions
370 lines (254 loc) · 14.3 KB
/
spot_check.py
File metadata and controls
370 lines (254 loc) · 14.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
import collections
import time
from bosdyn.api.spot import spot_check_pb2, spot_check_service_pb2_grpc
from bosdyn.client.common import (BaseClient, error_factory, handle_common_header_errors,
handle_lease_use_result_errors, handle_unset_status_error)
from bosdyn.client.exceptions import LeaseUseError, ResponseError, TimedOutError
from bosdyn.util import now_sec
class SpotCheckError(ResponseError):
"""General class of errors for SpotCheck service."""
class SpotCheckResponseError(SpotCheckError):
"""General class of errors for spot check routines."""
class SpotCheckUnexpectedPowerChangeError(SpotCheckResponseError):
"""Power error occurred while running spot check."""
class SpotCheckImuCheckError(SpotCheckResponseError):
"""IMU reports robot is not on flat round."""
class SpotCheckNotSittingError(SpotCheckResponseError):
"""Robot not started in sitting configuration."""
class SpotCheckLoadcellTimeoutError(SpotCheckResponseError):
"""Internal time out during spot check loadcell cal."""
class SpotCheckPowerOnFailure(SpotCheckResponseError):
"""Power on error occurred while running spot check."""
class SpotCheckEndstopTimeoutError(SpotCheckResponseError):
"""Internal time out during spot check endstop cal."""
class SpotCheckStandFailureError(SpotCheckResponseError):
"""Robot failed to stand during spotcheck."""
class SpotCheckCameraTimeoutError(SpotCheckResponseError):
"""Internal time out during spot check camera check."""
class SpotCheckGroundCheckError(SpotCheckResponseError):
"""Robot failed flat ground check."""
class SpotCheckTimedOutError(Exception):
"""Timed out waiting for SUCCESS response from spot check."""
class CameraSpotCheckTimedOutError(Exception):
"""Timed out waiting for SUCCESS response from camera spot check."""
class CameraSpotCheckFeedbackError(Exception):
"""General class of errors for camera spot check feedback."""
class CameraCalibrationResponseError(SpotCheckError):
"""General class of errors for camera calibration routines."""
class CameraCalibrationUserCanceledError(CameraCalibrationResponseError):
"""API client canceled calibration."""
class CameraCalibrationPowerError(CameraCalibrationResponseError):
"""The robot is not powered on."""
class CameraCalibrationTargetNotCenteredError(CameraCalibrationResponseError):
"""Invalid starting configuration of robot."""
class CameraCalibrationRobotCommandError(CameraCalibrationResponseError):
"""Robot command error occurred while running calibration."""
class CameraCalibrationCalibrationError(CameraCalibrationResponseError):
"""Calibration algorithm failure occurred."""
class CameraCalibrationInternalError(CameraCalibrationResponseError):
"""Internal error occurred ."""
class CameraCalibrationTimedOutError(Exception):
"""Timed out waiting for SUCCESS response from calibration."""
class SpotCheckClient(BaseClient):
"""A client for verifying robot health and running calibration routines."""
default_service_name = 'spot-check'
service_type = 'bosdyn.api.spot.SpotCheckService'
def __init__(self):
super(SpotCheckClient, self).__init__(spot_check_service_pb2_grpc.SpotCheckServiceStub)
def spot_check_command(self, request, **kwargs):
"""Issue a spot check command to the robot.
Raises:
Error on header error or lease use result error.
"""
return self.call(self._stub.SpotCheckCommand, request, None,
_spotcheck_command_error_from_response, **kwargs)
def spot_check_command_async(self, request, **kwargs):
"""Async version of spot_check_command()."""
return self.call_async(self._stub.SpotCheckCommand, request, None,
_spotcheck_command_error_from_response, **kwargs)
def spot_check_feedback(self, request, **kwargs):
"""Check the current status of spot check.
Raises:
SpotCheckResponseError on any feedback error.
"""
return self.call(self._stub.SpotCheckFeedback, request, None,
_spotcheck_feedback_error_from_response, **kwargs)
def spot_check_feedback_async(self, request, **kwargs):
"""Async version of spot_check_feedback()."""
return self.call_async(self._stub.SpotCheckFeedback, request, None,
_spotcheck_feedback_error_from_response, **kwargs)
def camera_calibration_command(self, request, **kwargs):
"""Issue a camera calibration command to the robot.
Raises:
Error on header error or lease use result error.
"""
return self.call(self._stub.CameraCalibrationCommand, request, None,
_calibration_command_error_from_response, **kwargs)
def camera_calibration_command_async(self, request, **kwargs):
"""Async version of camera_calibration_command()."""
return self.call_async(self._stub.CameraCalibrationCommand, request, None,
_calibration_command_error_from_response, **kwargs)
def camera_calibration_feedback(self, request, **kwargs):
"""Check the current status of camera calibration.
Raises:
CameraCalibrationResponseError on any feedback error.
"""
return self.call(self._stub.CameraCalibrationFeedback, request, None,
_calibration_feedback_error_from_response, **kwargs)
def camera_calibration_feedback_async(self, request, **kwargs):
"""Async version of camera_calibration_feedback()."""
return self.call_async(self._stub.CameraCalibrationFeedback, request, None,
_calibration_feedback_error_from_response, **kwargs)
def run_spot_check(spot_check_client, lease, timeout_sec=212, update_frequency=0.25, verbose=False):
"""Run full spot check routine. The robot should be sitting on flat ground when this routine is
started. This routine calibrates robot joints and checks camera health.
Args:
spot_check_client (SpotCheckClient): client for calling calibration service.
lease (Lease): A active lease. Spot check can be overridden at any time with another command.
timeout_sec (float): Max time this function will block for.
update_frequency (float): How often this function will query feedback.
verbose (bool): Periodically print status.
Returns:
SpotCheckFeedbackResponse: Joint and camera check and cal results.
Raises:
bosdyn.client.exceptions.Error: Throws on any error failure.
"""
start_time = now_sec()
end_time = start_time + timeout_sec
update_time = 1.0 / update_frequency
# Start spot check procedure.
req = spot_check_pb2.SpotCheckCommandRequest()
req.command = spot_check_pb2.SpotCheckCommandRequest.COMMAND_START
req.lease.CopyFrom(lease.lease_proto)
spot_check_client.spot_check_command(req)
# Check spot check feedback.
feedback_req = spot_check_pb2.SpotCheckFeedbackRequest()
while (now_sec() < end_time):
time.sleep(update_time)
res = spot_check_client.spot_check_feedback(feedback_req)
if (res.state == spot_check_pb2.SpotCheckFeedbackResponse.STATE_WAITING_FOR_COMMAND or
res.state == spot_check_pb2.SpotCheckFeedbackResponse.STATE_FINISHED):
if (verbose):
spot_check_client.logger.info("Spot check routine complete!")
return res
if (verbose):
spot_check_client.logger.info("SpotCheck {:.2f}% complete!".format(res.progress * 100))
raise SpotCheckTimedOutError
def run_camera_calibration(spot_check_client, lease, timeout_sec=1200, update_frequency=0.25,
verbose=False):
"""Run full camera calibration routine for robot. This function blocks until calibration has
completed. This function should be called once the robot is powered on and standing in the
configuration described in user documentation.
Args:
spot_check_client (SpotCheckClient): client for calling calibration service.
lease (Lease): A active lease, used by calibration routine to issue robot commands. Lease
keep alive internally managed by service. Revoke lease to end routine
at any time.
timeout_sec (float): Max time this function will block for.
update_frequency (float): How often this function will query feedback.
verbose (bool): Periodically print status.
Raises:
bosdyn.client.exceptions.Error: Throws on any calibration failure.
"""
start_time = now_sec()
end_time = start_time + timeout_sec
update_time = 1.0 / update_frequency
# Start camera calibration procedure.
req = spot_check_pb2.CameraCalibrationCommandRequest()
req.command = spot_check_pb2.CameraCalibrationCommandRequest.COMMAND_START
req.lease.CopyFrom(lease.lease_proto)
spot_check_client.camera_calibration_command(req)
# Check camera calibration feedback.
feedback_req = spot_check_pb2.CameraCalibrationFeedbackRequest()
while (now_sec() < end_time):
time.sleep(update_time)
res = spot_check_client.camera_calibration_feedback(feedback_req)
if (res.status == spot_check_pb2.CameraCalibrationFeedbackResponse.STATUS_SUCCESS):
if (verbose):
spot_check_client.logger.info("Camera calibration success!")
return
if (res.status == spot_check_pb2.CameraCalibrationFeedbackResponse.STATUS_PROCESSING):
if (verbose):
spot_check_client.logger.info("Camera calibration {:.2f}% complete!".format(
res.progress * 100))
raise CameraCalibrationTimedOutError
# Spot check error handlers.
@handle_common_header_errors
@handle_lease_use_result_errors
def _spotcheck_command_error_from_response(response):
return None
@handle_common_header_errors
def _spotcheck_feedback_error_from_response(response):
return _spot_check_error_from_response(response)
_SC_ERROR_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_SC_ERROR_TO_ERROR.update({ # noqa
spot_check_pb2.SpotCheckFeedbackResponse.ERROR_NONE: (None, None),
spot_check_pb2.SpotCheckFeedbackResponse.ERROR_UNEXPECTED_POWER_CHANGE:
(SpotCheckUnexpectedPowerChangeError,
SpotCheckUnexpectedPowerChangeError.__doc__),
spot_check_pb2.SpotCheckFeedbackResponse.ERROR_INIT_IMU_CHECK:
(SpotCheckImuCheckError, SpotCheckImuCheckError.__doc__),
spot_check_pb2.SpotCheckFeedbackResponse.ERROR_INIT_NOT_SITTING:
(SpotCheckNotSittingError, SpotCheckNotSittingError.__doc__),
spot_check_pb2.SpotCheckFeedbackResponse.ERROR_LOADCELL_TIMEOUT:
(SpotCheckLoadcellTimeoutError, SpotCheckLoadcellTimeoutError.__doc__),
spot_check_pb2.SpotCheckFeedbackResponse.ERROR_POWER_ON_FAILURE:
(SpotCheckPowerOnFailure, SpotCheckPowerOnFailure.__doc__),
spot_check_pb2.SpotCheckFeedbackResponse.ERROR_ENDSTOP_TIMEOUT:
(SpotCheckEndstopTimeoutError, SpotCheckEndstopTimeoutError.__doc__),
spot_check_pb2.SpotCheckFeedbackResponse.ERROR_FAILED_STAND:
(SpotCheckStandFailureError, SpotCheckStandFailureError.__doc__),
spot_check_pb2.SpotCheckFeedbackResponse.ERROR_CAMERA_TIMEOUT:
(SpotCheckCameraTimeoutError, SpotCheckCameraTimeoutError.__doc__),
spot_check_pb2.SpotCheckFeedbackResponse.ERROR_GROUND_CHECK:
(SpotCheckGroundCheckError, SpotCheckGroundCheckError.__doc__),
})
@handle_unset_status_error(unset='STATE_UNKNOWN', field='state',
statustype=spot_check_pb2.SpotCheckFeedbackResponse)
def _spot_check_error_from_response(response):
"""Return a custom exception based on response, None if no error."""
return error_factory(response, response.error,
status_to_string=spot_check_pb2.SpotCheckFeedbackResponse.Error.Name,
status_to_error=_SC_ERROR_TO_ERROR)
# Camera calibration error handlers.
@handle_common_header_errors
@handle_lease_use_result_errors
def _calibration_command_error_from_response(response):
return None
@handle_common_header_errors
def _calibration_feedback_error_from_response(response):
# Special handling of lease case.
if response.status == spot_check_pb2.CameraCalibrationFeedbackResponse.STATUS_LEASE_ERROR:
return LeaseUseError(response, None)
return _cal_status_error_from_response(response)
_CAL_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_CAL_STATUS_TO_ERROR.update({ # noqa
spot_check_pb2.CameraCalibrationFeedbackResponse.STATUS_SUCCESS: (None, None),
spot_check_pb2.CameraCalibrationFeedbackResponse.STATUS_PROCESSING: (None, None),
spot_check_pb2.CameraCalibrationFeedbackResponse.STATUS_USER_CANCELED:
(CameraCalibrationUserCanceledError, CameraCalibrationUserCanceledError.__doc__),
spot_check_pb2.CameraCalibrationFeedbackResponse.STATUS_POWER_ERROR:
(CameraCalibrationPowerError, CameraCalibrationPowerError.__doc__),
spot_check_pb2.CameraCalibrationFeedbackResponse.STATUS_TARGET_NOT_CENTERED:
(CameraCalibrationTargetNotCenteredError,
CameraCalibrationTargetNotCenteredError.__doc__),
spot_check_pb2.CameraCalibrationFeedbackResponse.STATUS_ROBOT_COMMAND_ERROR:
(CameraCalibrationRobotCommandError, CameraCalibrationRobotCommandError.__doc__),
spot_check_pb2.CameraCalibrationFeedbackResponse.STATUS_CALIBRATION_ERROR:
(CameraCalibrationCalibrationError, CameraCalibrationCalibrationError.__doc__),
spot_check_pb2.CameraCalibrationFeedbackResponse.STATUS_INTERNAL_ERROR:
(CameraCalibrationInternalError, CameraCalibrationInternalError.__doc__),
})
@handle_unset_status_error(unset='STATUS_UNKNOWN',
statustype=spot_check_pb2.CameraCalibrationFeedbackResponse)
def _cal_status_error_from_response(response):
"""Return a custom exception based on response, None if no error."""
return error_factory(
response, response.status,
status_to_string=spot_check_pb2.CameraCalibrationFeedbackResponse.Status.Name,
status_to_error=_CAL_STATUS_TO_ERROR)