-
Notifications
You must be signed in to change notification settings - Fork 599
Expand file tree
/
Copy patharea_callback.py
More file actions
136 lines (104 loc) · 6.12 KB
/
area_callback.py
File metadata and controls
136 lines (104 loc) · 6.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
import collections
from bosdyn.api.graph_nav import area_callback_pb2, area_callback_service_pb2_grpc
from bosdyn.client.common import (BaseClient, common_header_errors, error_factory, error_pair,
handle_common_header_errors, handle_custom_params_errors,
handle_lease_use_result_errors, handle_unset_status_error)
from bosdyn.client.exceptions import LeaseUseError, ResponseError
class AreaCallbackClient(BaseClient):
service_type = 'bosdyn.api.graph_nav.AreaCallbackService'
default_service_name = None
def __init__(self):
super(AreaCallbackClient,
self).__init__(area_callback_service_pb2_grpc.AreaCallbackServiceStub)
def area_callback_information(self, request=None, **kwargs):
request = request or area_callback_pb2.AreaCallbackInformationRequest()
return self.call(self._stub.AreaCallbackInformation, request, value_from_response=None,
error_from_response=common_header_errors, copy_request=False, **kwargs)
def begin_callback(self, request, **kwargs):
return self.call(self._stub.BeginCallback, request, value_from_response=None,
error_from_response=_begin_callback_error, copy_request=False, **kwargs)
def begin_control(self, request, **kwargs):
return self.call(self._stub.BeginControl, request, value_from_response=None,
error_from_response=_begin_control_error, copy_request=False, **kwargs)
def update_callback(self, request, **kwargs):
return self.call(self._stub.UpdateCallback, request, value_from_response=None,
error_from_response=_update_callback_error, copy_request=False, **kwargs)
def end_callback(self, request, **kwargs):
return self.call(self._stub.EndCallback, request, value_from_response=None,
error_from_response=_end_callback_error, copy_request=False, **kwargs)
class AreaCallbackResponseError(ResponseError):
"""General class of errors for AreaCallback service."""
class InvalidCommandIdError(AreaCallbackResponseError):
"""Provided command id does not match the current command id."""
class InvalidConfigError(AreaCallbackResponseError):
"""The provided configuration does not provide the necessary data."""
class ExpiredEndTimeError(AreaCallbackResponseError):
"""The provided end time has already expired."""
class MissingLeaseResourcesError(AreaCallbackResponseError):
"""A required lease resource was not provided."""
class ShutdownCallbackFailedError(AreaCallbackResponseError):
"""The callback failed to shut down properly."""
# yapf: disable
_BEGIN_CALLBACK_TO_ERROR = collections.defaultdict(
lambda: (AreaCallbackResponseError, None))
_BEGIN_CALLBACK_TO_ERROR.update({
area_callback_pb2.BeginCallbackResponse.STATUS_OK: (None, None),
area_callback_pb2.BeginCallbackResponse.STATUS_INVALID_CONFIGURATION: error_pair(InvalidConfigError),
area_callback_pb2.BeginCallbackResponse.STATUS_EXPIRED_END_TIME: error_pair(ExpiredEndTimeError),
})
@handle_common_header_errors
@handle_custom_params_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _begin_callback_error(response):
return error_factory(response, response.status,
status_to_string=area_callback_pb2.BeginCallbackResponse.Status.Name,
status_to_error=_BEGIN_CALLBACK_TO_ERROR)
_BEGIN_CONTROL_TO_ERROR = collections.defaultdict(
lambda: (AreaCallbackResponseError, None))
_BEGIN_CONTROL_TO_ERROR.update({
area_callback_pb2.BeginControlResponse.STATUS_OK: (None, None),
area_callback_pb2.BeginControlResponse.STATUS_INVALID_COMMAND_ID: error_pair(InvalidCommandIdError),
area_callback_pb2.BeginControlResponse.STATUS_MISSING_LEASE_RESOURCES: error_pair(MissingLeaseResourcesError),
# This one generally shouldn't happen because it should get first caught
# by @handle_lease_use_result_errors
area_callback_pb2.BeginControlResponse.STATUS_LEASE_ERROR: error_pair(LeaseUseError),
})
@handle_common_header_errors
@handle_lease_use_result_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _begin_control_error(response):
return error_factory(response, response.status,
status_to_string=area_callback_pb2.BeginControlResponse.Status.Name,
status_to_error=_BEGIN_CONTROL_TO_ERROR)
_UPDATE_CALLBACK_TO_ERROR = collections.defaultdict(
lambda: (AreaCallbackResponseError, None))
_UPDATE_CALLBACK_TO_ERROR.update({
area_callback_pb2.UpdateCallbackResponse.STATUS_OK: (None, None),
area_callback_pb2.UpdateCallbackResponse.STATUS_INVALID_COMMAND_ID: error_pair(InvalidCommandIdError),
area_callback_pb2.UpdateCallbackResponse.STATUS_EXPIRED_END_TIME: error_pair(ExpiredEndTimeError),
})
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _update_callback_error(response):
return error_factory(response, response.status,
status_to_string=area_callback_pb2.UpdateCallbackResponse.Status.Name,
status_to_error=_UPDATE_CALLBACK_TO_ERROR)
_END_CALLBACK_TO_ERROR = collections.defaultdict(
lambda: (AreaCallbackResponseError, None))
_END_CALLBACK_TO_ERROR.update({
area_callback_pb2.EndCallbackResponse.STATUS_OK: (None, None),
area_callback_pb2.EndCallbackResponse.STATUS_INVALID_COMMAND_ID: error_pair(InvalidCommandIdError),
area_callback_pb2.EndCallbackResponse.STATUS_SHUTDOWN_CALLBACK_FAILED: error_pair(ShutdownCallbackFailedError),
})
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _end_callback_error(response):
return error_factory(response, response.status,
status_to_string=area_callback_pb2.EndCallbackResponse.Status.Name,
status_to_error=_END_CALLBACK_TO_ERROR)
# yapf: enable