-
Notifications
You must be signed in to change notification settings - Fork 599
Expand file tree
/
Copy pathfault.py
More file actions
133 lines (103 loc) · 5.89 KB
/
fault.py
File metadata and controls
133 lines (103 loc) · 5.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""For clients to use the fault service."""
import collections
from bosdyn.api import fault_service_pb2_grpc, service_fault_pb2
from bosdyn.client.common import (BaseClient, error_factory, error_pair,
handle_common_header_errors, handle_unset_status_error)
from .exceptions import ResponseError
class FaultResponseError(ResponseError):
"""General class of errors for directory registration responses."""
class ServiceFaultAlreadyExistsError(FaultResponseError):
"""The specified service fault id already exists as an active fault on the robot."""
class ServiceFaultDoesNotExistError(FaultResponseError):
"""The specified service fault id does not match any active service faults on the robot."""
class FaultClient(BaseClient):
"""Client for the Fault service."""
default_service_name = 'fault'
service_type = 'bosdyn.api.FaultService'
def __init__(self):
super(FaultClient, self).__init__(fault_service_pb2_grpc.FaultServiceStub)
def trigger_service_fault(self, service_fault, **kwargs):
"""Broadcast a new service fault through the robot.
Args:
service_fault (bosdyn.api.ServiceFault): Populated fault message to broadcast.
Returns:
An instance of bosdyn.api.TriggerServiceFaultResponse
Raises:
RpcError: Problem communicating with the robot.
ServiceFaultAlreadyExistsError: The service fault already exists.
FaultResponseError: Something went wrong during the fault trigger.
"""
req = service_fault_pb2.TriggerServiceFaultRequest()
req.fault.CopyFrom(service_fault)
return self.call(self._stub.TriggerServiceFault, req, None,
error_from_response=_trigger_service_fault_error, copy_request=False,
**kwargs)
def trigger_service_fault_async(self, service_fault, **kwargs):
"""Async version of trigger_service_fault()"""
req = service_fault_pb2.TriggerServiceFaultRequest()
req.fault.CopyFrom(service_fault)
return self.call_async(self._stub.TriggerServiceFault, req, None,
error_from_response=_trigger_service_fault_error, copy_request=False,
**kwargs)
def clear_service_fault(self, service_fault_id, clear_all_service_faults=False,
clear_all_payload_faults=False, **kwargs):
"""Clear a service fault from the robot state.
Args:
service_fault_id (bosdyn.api.ServiceFaultId): ServiceFault to clear.
clear_all_service_faults (bool): Clear all faults associated with the service name.
clear_all_payload_faults (bool): Clear all faults associated with the payload guid.
Returns:
An instance of bosdyn.api.ClearServiceFaultResponse
Raises:
RpcError: Problem communicating with the robot.
ServiceFaultDoesNotExistError: The service fault does not exist in active service faults.
FaultResponseError: Something went wrong during the fault clear.
"""
req = service_fault_pb2.ClearServiceFaultRequest()
req.fault_id.CopyFrom(service_fault_id)
req.clear_all_service_faults = clear_all_service_faults
req.clear_all_payload_faults = clear_all_payload_faults
return self.call(self._stub.ClearServiceFault, req, None,
error_from_response=_clear_service_fault_error, copy_request=False,
**kwargs)
def clear_service_fault_async(self, service_fault_id, clear_all_service_faults=False,
clear_all_payload_faults=False, **kwargs):
"""Async version of clear_service_fault()"""
req = service_fault_pb2.ClearServiceFaultRequest()
req.fault_id.CopyFrom(service_fault_id)
req.clear_all_service_faults = clear_all_service_faults
req.clear_all_payload_faults = clear_all_payload_faults
return self.call_async(self._stub.ClearServiceFault, req, None,
error_from_response=_clear_service_fault_error, copy_request=False,
**kwargs)
_TRIGGER_STATUS_TO_ERROR = collections.defaultdict(lambda: (FaultResponseError, None))
_TRIGGER_STATUS_TO_ERROR.update({
service_fault_pb2.TriggerServiceFaultResponse.STATUS_OK: (None, None),
service_fault_pb2.TriggerServiceFaultResponse.STATUS_FAULT_ALREADY_ACTIVE:
error_pair(ServiceFaultAlreadyExistsError),
})
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _trigger_service_fault_error(response):
"""Return an exception based on response from Trigger RPC, None if no error."""
return error_factory(response, response.status,
status_to_string=service_fault_pb2.TriggerServiceFaultResponse.Status.Name,
status_to_error=_TRIGGER_STATUS_TO_ERROR)
_CLEAR_STATUS_TO_ERROR = collections.defaultdict(lambda: (FaultResponseError, None))
_CLEAR_STATUS_TO_ERROR.update({
service_fault_pb2.ClearServiceFaultResponse.STATUS_OK: (None, None),
service_fault_pb2.ClearServiceFaultResponse.STATUS_FAULT_NOT_ACTIVE:
error_pair(ServiceFaultDoesNotExistError),
})
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def _clear_service_fault_error(response):
"""Return an exception based on response from Clear RPC, None if no error."""
return error_factory(response, response.status,
status_to_string=service_fault_pb2.ClearServiceFaultResponse.Status.Name,
status_to_error=_CLEAR_STATUS_TO_ERROR)