-
Notifications
You must be signed in to change notification settings - Fork 599
Expand file tree
/
Copy pathlog_status.py
More file actions
346 lines (272 loc) · 15.1 KB
/
log_status.py
File metadata and controls
346 lines (272 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
# Boston Dynamics, Inc. Confidential Information.
# Copyright 2025. All Rights Reserved.
"""Client for the log-status service.
This allows client code to start, extend or terminate experiment logs and start retro logs.
"""
import collections
import re
import time
import bosdyn.util
from bosdyn.api.log_status import log_status_pb2 as log_status
from bosdyn.api.log_status import log_status_service_pb2_grpc as log_status_service
from bosdyn.client.common import (BaseClient, error_factory, error_pair,
handle_common_header_errors, handle_unset_status_error)
from bosdyn.client.exceptions import ResponseError
class LogStatusResponseError(ResponseError):
"""Error in Log Status RPC"""
class ExperimentAlreadyRunningError(LogStatusResponseError):
"""The log status request could not be started, an experiment is already running."""
class RequestIdDoesNotExistError(LogStatusResponseError):
"""The provided request id does not exist or is invalid."""
class InactiveLogError(LogStatusResponseError):
"""The log has already terminated and cannot be updated."""
class ConcurrencyLimitReachedError(LogStatusResponseError):
"""The limit of concurrent retro logs has be reached, a new log cannot be started."""
class NoDataForEventError(LogStatusResponseError):
"""No data is available for the provided event, so a log cannot be started."""
class LogStatusClient(BaseClient):
"""A client for interacting with robot logs."""
# Typical name of the service in the robot's directory listing.
default_service_name = 'log-status'
# gRPC service proto definition implemented by this service
service_type = 'bosdyn.api.log_status.LogStatusService'
def __init__(self):
super(LogStatusClient, self) \
.__init__(log_status_service.LogStatusServiceStub)
def get_log_status(self, id, **kwargs):
"""Synchronously get status of a log.
Args:
id (string): Id of log to retrieve
Raises:
RequestIdDoesNotExistError: Id was not found on robot
"""
req = log_status.GetLogStatusRequest()
req.id = id
return self.call(self._stub.GetLogStatus, req, error_from_response=get_log_status_error,
copy_request=False, **kwargs)
def get_log_status_async(self, id, **kwargs):
"""Asynchronously get status of a log."""
req = log_status.GetLogStatusRequest()
req.id = id
return self.call_async(self._stub.GetLogStatus, req,
error_from_response=get_log_status_error, copy_request=False,
**kwargs)
def get_active_log_statuses(self, **kwargs):
"""Synchronously retrieve status of active logs."""
req = log_status.GetActiveLogStatusesRequest()
return self.call(self._stub.GetActiveLogStatuses, req,
error_from_response=get_active_log_statuses_error, copy_request=False,
**kwargs)
def get_active_log_statuses_async(self, **kwargs):
"""Asynchronously retrieve status of active logs."""
req = log_status.GetActiveLogStatusesRequest()
return self.call_async(self._stub.GetActiveLogStatuses, req,
error_from_response=get_active_log_statuses_error,
copy_request=False, **kwargs)
def start_experiment_log(self, seconds, past_textlog_duration=0, **kwargs):
"""Start an experiment log, to run for a specified duration.
Args:
seconds: Number of seconds to gather data for the experiment log
Raises:
ExperimentAlreadyRunningError: Only 1 experiment log can be run at a time
"""
req = log_status.StartExperimentLogRequest()
req.keep_alive.CopyFrom(bosdyn.util.seconds_to_duration(seconds))
req.past_textlog_duration.CopyFrom(bosdyn.util.seconds_to_duration(past_textlog_duration))
return self.call(self._stub.StartExperimentLog, req,
error_from_response=start_experiment_log_error, copy_request=False,
**kwargs)
def start_experiment_log_async(self, seconds, past_textlog_duration=0, **kwargs):
"""Start an experiment log, to run for a specified duration."""
req = log_status.StartExperimentLogRequest()
req.keep_alive.CopyFrom(bosdyn.util.seconds_to_duration(seconds))
req.past_textlog_duration.CopyFrom(bosdyn.util.seconds_to_duration(past_textlog_duration))
return self.call_async(self._stub.StartExperimentLog, req,
error_from_response=start_experiment_log_error, copy_request=False,
**kwargs)
def start_retro_log(self, seconds, **kwargs):
"""Start a retro log, to run for a specified duration.
Args:
seconds: Number of seconds to gather data for the retro log
Raises:
ExperimentAlreadyRunningError: Retro logs cannot be run while an experiment log is running
ConcurrencyLimitReachedError: Maximum number of retro logs are already running, another cannot be started
"""
req = log_status.StartRetroLogRequest()
req.past_duration.CopyFrom(bosdyn.util.seconds_to_duration(-seconds))
return self.call(self._stub.StartRetroLog, req, error_from_response=start_retro_log_error,
copy_request=False, **kwargs)
def start_retro_log_async(self, seconds, **kwargs):
"""Start a retro log, to run for a specified duration."""
req = log_status.StartRetroLogRequest()
req.past_duration.CopyFrom(bosdyn.util.seconds_to_duration(-seconds))
return self.call_async(self._stub.StartRetroLog, req,
error_from_response=start_retro_log_error, copy_request=False,
**kwargs)
def start_concurrent_log(self, duration_seconds, event=None, **kwargs):
"""Start an experiment log that allows concurrency, to run based on a particular data_set, as derived from the recipe corresponding to the provided event. An event must be provided!"""
req = log_status.StartConcurrentLogRequest()
req.keep_alive.CopyFrom(bosdyn.util.seconds_to_duration(duration_seconds))
if event:
req.event.CopyFrom(event)
return self.call(self._stub.StartConcurrentLog, req,
error_from_response=start_concurrent_log_error, copy_request=False,
**kwargs)
def start_concurrent_log_async(self, duration_seconds, data_set_names=None, properties=None,
event=None, **kwargs):
"""Start an experiment log that allows concurrency, to run based on a particular data_set, as derived from the recipe corresponding to the provided event. An event must be provided!"""
req = log_status.StartConcurrentLogRequest()
req.keep_alive.CopyFrom(bosdyn.util.seconds_to_duration(duration_seconds))
if event:
req.event.CopyFrom(event)
return self.call_async(self._stub.StartConcurrentLog, req,
error_from_response=start_concurrent_log_error, copy_request=False,
**kwargs)
def update_experiment(self, id, seconds, **kwargs):
"""Update an experiment log to run for a specified duration.
Args:
id (string): Id of log to retrieve
seconds (float): Number of seconds to gather data for the experiment log
Raises:
RequestIdDoesNotExistError: Id was not found on robot
InactiveLogError: Cannot update log, it is already terminated
"""
req = log_status.UpdateExperimentLogRequest()
req.id = id
req.keep_alive.CopyFrom(bosdyn.util.seconds_to_duration(seconds))
return self.call(self._stub.UpdateExperimentLog, req,
error_from_response=update_experiment_log_error, copy_request=False,
**kwargs)
def update_experiment_async(self, id, seconds, **kwargs):
"""Update an experiment log to run for a specified duration."""
req = log_status.UpdateExperimentLogRequest()
req.id = id
req.keep_alive.CopyFrom(bosdyn.util.seconds_to_duration(seconds))
return self.call_async(self._stub.UpdateExperimentLog, req,
error_from_response=update_experiment_log_error, copy_request=False,
**kwargs)
def terminate_log(self, id, **kwargs):
"""Terminate an experiment log.
Args:
id (string): Id of log to terminate
Raises:
RequestIdDoesNotExistError: Id was not found on robot
"""
req = log_status.TerminateLogRequest()
req.id = id
return self.call(self._stub.TerminateLog, req, error_from_response=terminate_log_error,
copy_request=False, **kwargs)
def terminate_log_async(self, id, **kwargs):
"""Terminate an experiment log."""
req = log_status.TerminateLogRequest()
req.id = id
return self.call_async(self._stub.TerminateLog, req,
error_from_response=terminate_log_error, copy_request=False,
**kwargs)
_GET_LOG_STATUS_STATUS_TO_ERROR = \
collections.defaultdict(lambda: (LogStatusResponseError, None))
_GET_LOG_STATUS_STATUS_TO_ERROR.update({
log_status.GetLogStatusResponse.STATUS_OK: (None, None),
log_status.GetLogStatusResponse.STATUS_ID_NOT_FOUND: error_pair(RequestIdDoesNotExistError),
})
_GET_ACTIVE_LOG_STATUSES_STATUS_TO_ERROR = \
collections.defaultdict(lambda: (LogStatusResponseError, None))
_GET_ACTIVE_LOG_STATUSES_STATUS_TO_ERROR.update({
log_status.GetActiveLogStatusesResponse.STATUS_OK: (None, None),
})
_START_EXPERIMENT_LOG_STATUS_TO_ERROR = \
collections.defaultdict(lambda: (LogStatusResponseError, None))
_START_EXPERIMENT_LOG_STATUS_TO_ERROR.update({
log_status.StartExperimentLogResponse.STATUS_OK: (None, None),
log_status.StartExperimentLogResponse.STATUS_EXPERIMENT_LOG_RUNNING:
error_pair(ExperimentAlreadyRunningError),
})
_START_RETRO_LOG_STATUS_TO_ERROR = \
collections.defaultdict(lambda: (LogStatusResponseError, None))
_START_RETRO_LOG_STATUS_TO_ERROR.update({
log_status.StartRetroLogResponse.STATUS_OK: (None, None),
log_status.StartRetroLogResponse.STATUS_EXPERIMENT_LOG_RUNNING:
error_pair(ExperimentAlreadyRunningError),
log_status.StartRetroLogResponse.STATUS_CONCURRENCY_LIMIT_REACHED:
error_pair(ConcurrencyLimitReachedError),
})
_START_CONCURRENT_LOG_STATUS_TO_ERROR = \
collections.defaultdict(lambda: (LogStatusResponseError, None))
_START_CONCURRENT_LOG_STATUS_TO_ERROR.update({
log_status.StartConcurrentLogResponse.STATUS_OK: (None, None),
log_status.StartConcurrentLogResponse.STATUS_EXPERIMENT_LOG_RUNNING:
error_pair(ExperimentAlreadyRunningError),
log_status.StartConcurrentLogResponse.STATUS_CONCURRENCY_LIMIT_REACHED:
error_pair(ConcurrencyLimitReachedError),
log_status.StartConcurrentLogResponse.STATUS_NO_DATA_FOR_EVENT:
error_pair(NoDataForEventError),
})
_UPDATE_EXPERIMENT_LOG_STATUS_TO_ERROR = \
collections.defaultdict(lambda: (LogStatusResponseError, None))
_UPDATE_EXPERIMENT_LOG_STATUS_TO_ERROR.update({
log_status.UpdateExperimentLogResponse.STATUS_OK: (None, None),
log_status.UpdateExperimentLogResponse.STATUS_ID_NOT_FOUND:
error_pair(RequestIdDoesNotExistError),
log_status.UpdateExperimentLogResponse.STATUS_LOG_TERMINATED:
error_pair(InactiveLogError),
})
_TERMINATE_LOG_STATUS_TO_ERROR = \
collections.defaultdict(lambda: (LogStatusResponseError, None))
_TERMINATE_LOG_STATUS_TO_ERROR.update({
log_status.TerminateLogResponse.STATUS_OK: (None, None),
log_status.TerminateLogResponse.STATUS_ID_NOT_FOUND: error_pair(RequestIdDoesNotExistError),
})
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def get_log_status_error(response):
"""Return a custom exception based on the GetLogStatus response, None if no error."""
return error_factory(response, response.status,
status_to_string=log_status.GetLogStatusResponse.Status.Name,
status_to_error=_GET_LOG_STATUS_STATUS_TO_ERROR)
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def get_active_log_statuses_error(response):
"""Return a custom exception based on the GetActiveLogStatuses response, None if no error."""
return error_factory(response, response.status,
status_to_string=log_status.GetActiveLogStatusesResponse.Status.Name,
status_to_error=_GET_ACTIVE_LOG_STATUSES_STATUS_TO_ERROR)
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def start_experiment_log_error(response):
"""Return a custom exception based on the StartExperimentLog response, None if no error."""
return error_factory(response, response.status,
status_to_string=log_status.StartExperimentLogResponse.Status.Name,
status_to_error=_START_EXPERIMENT_LOG_STATUS_TO_ERROR)
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def start_retro_log_error(response):
"""Return a custom exception based on the StartRetroLog response, None if no error."""
return error_factory(response, response.status,
status_to_string=log_status.StartRetroLogResponse.Status.Name,
status_to_error=_START_RETRO_LOG_STATUS_TO_ERROR)
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def start_concurrent_log_error(response):
"""Return a custom exception based on the StartConcurrentLog response, None if no error."""
return error_factory(response, response.status,
status_to_string=log_status.StartConcurrentLogResponse.Status.Name,
status_to_error=_START_CONCURRENT_LOG_STATUS_TO_ERROR)
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def update_experiment_log_error(response):
"""Return a custom exception based on the UpdateExperimentLog response, None if no error."""
return error_factory(response, response.status,
status_to_string=log_status.UpdateExperimentLogResponse.Status.Name,
status_to_error=_UPDATE_EXPERIMENT_LOG_STATUS_TO_ERROR)
@handle_common_header_errors
@handle_unset_status_error(unset='STATUS_UNKNOWN')
def terminate_log_error(response):
"""Return a custom exception based on the TerminateLog response, None if no error."""
return error_factory(response, response.status,
status_to_string=log_status.TerminateLogResponse.Status.Name,
status_to_error=_TERMINATE_LOG_STATUS_TO_ERROR)