-
Notifications
You must be signed in to change notification settings - Fork 599
Expand file tree
/
Copy pathkeepalive.py
More file actions
354 lines (280 loc) · 14.6 KB
/
keepalive.py
File metadata and controls
354 lines (280 loc) · 14.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Client implementation of the Keepalive service."""
from __future__ import print_function
import collections
import logging
import threading
import time
from typing import Callable, List, Union
import bosdyn.client.lease
import bosdyn.util
from bosdyn.api import lease_pb2
from bosdyn.api.keepalive import keepalive_pb2, keepalive_service_pb2_grpc
from bosdyn.client.common import (BaseClient, common_header_errors, error_factory, error_pair,
handle_common_header_errors, handle_unset_status_error)
from bosdyn.client.error_callback_result import ErrorCallbackResult
from bosdyn.client.exceptions import ResponseError, RetryableRpcError
from bosdyn.util import now_sec
class KeepaliveResponseError(ResponseError):
"""Error in Keepalive RPC"""
class InvalidLeaseError(KeepaliveResponseError):
"""A policy's associated lease was not the same, super, or sub lease of the active lease."""
class InvalidPolicyError(KeepaliveResponseError):
"""The specified policy ID was not valid."""
class Policy():
"""Helper class for API Policy."""
def __init__(self, proto: Union[None, 'keepalive_pb2.Policy'] = None):
"""Constructor"""
self.policy_proto = proto or keepalive_pb2.Policy()
@property
def name(self) -> str:
"""Get or set the name of the Policy"""
return self.policy_proto.name
@name.setter
def name(self, _name: str):
self.policy_proto.name = _name
def add_associated_lease(self, lease: Union['bosdyn.client.lease.Lease', 'lease_pb2.Lease']):
if isinstance(lease, bosdyn.client.lease.Lease):
self.policy_proto.associated_leases.append(lease.lease_proto)
else:
self.policy_proto.associated_leases.append(lease)
def add_controlled_motors_off_action(self, after: float):
"""Add a 'controlled motors off' action that triggers after specified time (seconds)."""
self._configure_action(after, lambda action: action.controlled_motors_off.SetInParent())
def add_immediate_robot_off_action(self, after: float):
"""Add an 'immediate robot off' action that triggers after specified time (seconds)."""
self._configure_action(after, lambda action: action.immediate_robot_off.SetInParent())
def add_record_event_action(self, events: List['bosdyn.api.Event'], after: float):
"""Add a 'record event' action that triggers after specified time (seconds)."""
def copy_events(action):
for event in events:
action.record_event.events.add().CopyFrom(event)
self._configure_action(after, copy_events)
def add_auto_return_action(self, leases: List['bosdyn.client.lease.Lease'],
params: 'bosdyn.api.auto_return.Params', after: float):
"""Add an 'auto return' action that triggers after specified time (seconds)."""
def copy_params_and_leases(action):
action.auto_return.leases.extend(lease.lease_proto for lease in leases)
action.auto_return.params.CopyFrom(params)
self._configure_action(after, copy_params_and_leases)
def add_lease_stale_action(self, leases: List['bosdyn.client.lease.Lease'], after: float):
"""Add a 'mark lease stale' action that triggers after specified time (seconds)."""
def copy_leases(action):
action.lease_stale.leases.extend(lease.lease_proto for lease in leases)
self._configure_action(after, copy_leases)
def shortest_action_delay(self) -> Union[None, float]:
"""Get the shortest delay on an action, or None if no actions are set.
For example:
pol = Policy()
pol.add_controlled_motors_off_action(2.5)
pol.add_immediate_robot_off_action(1.2)
assert pol.shortest_action_delay() == 1.2
"""
delay = None
for actionafter in self.policy_proto.actions:
tmp = bosdyn.util.duration_to_seconds(actionafter.after)
if delay is None or tmp < delay:
delay = tmp
return delay
def _configure_action(self, after: float, set_action: Callable[['keepalive_pb2.ActionAfter'],
None]):
"""Helper function to reduce boilerplate of adding an action."""
action = self.policy_proto.actions.add()
action.after.CopyFrom(bosdyn.util.seconds_to_duration(after))
set_action(action)
class KeepaliveClient(BaseClient):
"""A client for the Keepalive service.
This client is in BETA and may undergo changes in future releases.
"""
default_service_name = 'keepalive'
service_type = 'bosdyn.api.keepalive.KeepaliveService'
def __init__(self):
super().__init__(keepalive_service_pb2_grpc.KeepaliveServiceStub)
self._timesync_endpoint = None
def modify_policy(self, to_add: Union['Policy', 'keepalive_pb2.Policy'] = None,
policy_ids_to_remove: List[int] = None, **kwargs):
"""Add given policy and remove policies with given ids."""
request = self._modify_policy_request(to_add, policy_ids_to_remove)
return self.call(self._stub.ModifyPolicy, request, None, modify_policy_error, **kwargs)
def modify_policy_async(self, to_add: Union['Policy', 'keepalive_pb2.Policy'] = None,
policy_ids_to_remove: List[int] = None, **kwargs):
"""Async version of the modify_policy() RPC."""
request = self._modify_policy_request(to_add, policy_ids_to_remove)
return self.call_async(self._stub.ModifyPolicy, request, None, modify_policy_error,
**kwargs)
def check_in(self, policy_id: int, **kwargs):
"""Check in for given policy_id, refreshing that policy's timer."""
request = self._check_in_request(policy_id)
return self.call(self._stub.CheckIn, request, None, check_in_error, **kwargs)
def check_in_async(self, policy_id: int, **kwargs):
"""Async version of the check_in() RPC."""
request = self._check_in_request(policy_id)
return self.call_async(self._stub.CheckIn, request, None, check_in_error, **kwargs)
def get_status(self, **kwargs):
"""Get status on all policies."""
request = keepalive_pb2.GetStatusRequest()
return self.call(self._stub.GetStatus, request, None, common_header_errors, **kwargs)
def get_status_async(self, **kwargs):
"""Async version of the get_status() RPC."""
request = keepalive_pb2.GetStatusRequest()
return self.call_async(self._stub.GetStatus, request, None, common_header_errors, **kwargs)
@staticmethod
def _modify_policy_request(to_add: Union['Policy', 'keepalive_pb2.Policy'],
policy_ids_to_remove):
if isinstance(to_add, Policy):
request = keepalive_pb2.ModifyPolicyRequest(to_add=to_add.policy_proto,
policy_ids_to_remove=policy_ids_to_remove)
else:
request = keepalive_pb2.ModifyPolicyRequest(to_add=to_add,
policy_ids_to_remove=policy_ids_to_remove)
return request
@staticmethod
def _check_in_request(policy_id):
request = keepalive_pb2.CheckInRequest(policy_id=policy_id)
return request
_MODIFY_POLICY_STATUS_TO_ERROR = collections.defaultdict(lambda: (None, None))
_MODIFY_POLICY_STATUS_TO_ERROR.update({
keepalive_pb2.ModifyPolicyResponse.STATUS_INVALID_LEASE: error_pair(InvalidLeaseError),
keepalive_pb2.ModifyPolicyResponse.STATUS_INVALID_POLICY_ID: error_pair(InvalidPolicyError)
})
_CHECK_IN_STATUS_TO_ERROR = collections.defaultdict(lambda: (None, None))
_CHECK_IN_STATUS_TO_ERROR.update(
{keepalive_pb2.CheckInResponse.STATUS_INVALID_POLICY_ID: error_pair(InvalidPolicyError)})
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def modify_policy_error(response):
"""ModifyPolicy response to exception."""
return error_factory(response, response.status,
status_to_string=keepalive_pb2.ModifyPolicyResponse.Status.Name,
status_to_error=_MODIFY_POLICY_STATUS_TO_ERROR)
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def check_in_error(response):
"""CheckIn response to exception."""
return error_factory(response, response.status,
status_to_string=keepalive_pb2.CheckInResponse.Status.Name,
status_to_error=_CHECK_IN_STATUS_TO_ERROR)
#pylint: disable=too-many-instance-attributes
class PolicyKeepalive():
"""Specify a keepalive Policy that should be held to.
Meant to be used as a context manager. For example:
client = robot.ensure_client(KeepaliveClient.default_service_name)
pol = Policy()
# After 30 seconds of not hearing from this process, turn the robot motors off.
pol.add_controlled_motors_off_action(30)
with PolicyKeepalive(client, pol, rpc_interval_seconds=3) as policy_keepalive:
# A thread will attempt a CheckIn every 3 seconds.
run_my_code()
"""
#pylint: disable=too-many-arguments
def __init__(self, client: KeepaliveClient, policy: Policy, rpc_timeout_seconds: float = None,
rpc_interval_seconds: float = None, logger: 'logging.Logger' = None,
remove_policy_on_exit: bool = False, initial_retry_seconds: float = 1.0):
self.logger = logger or logging.getLogger()
self.remove_policy_on_exit = remove_policy_on_exit
self._client = client
self._policy = policy
self._policy_id = None
# If the interval isn't specified manually, try to get the interval from the policy,
# assuming the user wants to check in a few times before the earliest action.
# This will raise an exception if there's no action at all.
self._rpc_interval_seconds = rpc_interval_seconds or policy.shortest_action_delay() / 3
self._rpc_timeout_seconds = rpc_timeout_seconds
self._initial_retry_seconds = initial_retry_seconds
#: Callable[[Exception], ErrorCallbackResult] | None: Optional callback to be called when
#: an error occurs in the keepalive thread.
self.keepalive_error_callback = None
self._end_check_in_signal = threading.Event()
self._thread = threading.Thread(target=self._periodic_check_in)
self._thread.daemon = True
def __enter__(self):
"""Add this instance's policy and begin checking in."""
self._policy_id = self._client.modify_policy(to_add=self._policy).added_policy.policy_id
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Stop checking in, and optionally remove the policy."""
self.shutdown()
if self.remove_policy_on_exit:
self.remove_policy()
def remove_policy(self):
"""Remove this instance's policy, if it did manage to add one."""
if self._policy_id:
self._client.modify_policy(policy_ids_to_remove=[self._policy_id])
self._policy_id = None
def start(self):
"""Start the checkin thread."""
self._thread.start()
def shutdown(self):
"""Stop the checkin thread and block until it ends."""
self._end_periodic_check_in()
self._thread.join()
def _check_in(self):
self._client.check_in(self._policy_id, timeout=self._rpc_timeout_seconds)
def _end_periodic_check_in(self):
self._end_check_in_signal.set()
def _periodic_check_in(self):
retry_interval = self._initial_retry_seconds
wait_time = self._rpc_interval_seconds
# Block and wait for the stop signal. If we receive it within the check-in period,
# leave the loop. Under normal conditions, wait up to self._check_in_period seconds, minus
# the RPC processing time. (values < 0 are OK and unblock immediately)
while not self._end_check_in_signal.wait(wait_time):
exec_start = now_sec()
action = ErrorCallbackResult.RESUME_NORMAL_OPERATION
try:
self._check_in()
except RetryableRpcError as exc:
self.logger.warning('exception during check-in:\n%s\n', exc)
self.logger.info('continuing check-in')
except Exception as exc: # pylint: disable=broad-except
if self.keepalive_error_callback is not None:
action = ErrorCallbackResult.DEFAULT_ACTION
try:
action = self.keepalive_error_callback(exc)
except Exception: # pylint: disable=broad-except
self.logger.exception(
'Exception thrown in the provided keepalive error callback')
else:
raise
# How long did the RPC and processing of said RPC take?
exec_seconds = now_sec() - exec_start
if action == ErrorCallbackResult.ABORT:
self.logger.warning('Callback directed the keepalive thread to exit.')
break
elif action == ErrorCallbackResult.RETRY_IMMEDIATELY:
wait_time = 0
continue
elif action == ErrorCallbackResult.RETRY_WITH_EXPONENTIAL_BACK_OFF:
wait_time = retry_interval - exec_seconds
retry_interval = min(2 * retry_interval, self._rpc_interval_seconds)
else:
# Success path, or default action (resume normal operation)
wait_time = self._rpc_interval_seconds - exec_seconds
retry_interval = self._initial_retry_seconds
self.logger.info('Policy check-in stopped')
def remove_all_policies(keepalive_client, attempts=1):
"""Remove all policies on the robot.
Optionally do this over a few attempts, in case other things are also removing policies.
"""
last_exc = None
for i in range(attempts):
if last_exc:
time.sleep(0.5)
last_exc = None
all_policy_ids = [p.policy_id for p in keepalive_client.get_status().status]
if all_policy_ids:
try:
keepalive_client.modify_policy(policy_ids_to_remove=all_policy_ids)
except InvalidPolicyError as exc:
last_exc = exc
else:
break
else:
break
if last_exc:
raise last_exc