-
Notifications
You must be signed in to change notification settings - Fork 599
Expand file tree
/
Copy pathdata_service.py
More file actions
130 lines (94 loc) · 5.08 KB
/
data_service.py
File metadata and controls
130 lines (94 loc) · 5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Client for the data-service.
"""
import bosdyn.api.data_index_pb2 as data_index_protos
import bosdyn.api.data_service_pb2_grpc as data_service
from bosdyn.client.common import BaseClient, common_header_errors
from bosdyn.client.exceptions import Error
class InvalidArgument(Error):
"""A given argument could not be used."""
class DataServiceClient(BaseClient):
"""A client for adding to robot data buffer."""
default_service_name = 'data'
service_type = 'bosdyn.api.DataService'
def __init__(self):
super(DataServiceClient, self).__init__(data_service.DataServiceStub)
self.log_tick_schemas = {}
self._timesync_endpoint = None
def update_from(self, other):
super(DataServiceClient, self).update_from(other)
# Grab a timesync endpoint if it is available.
try:
self._timesync_endpoint = other.time_sync.endpoint
except AttributeError:
pass # other doesn't have a time_sync accessor
def get_data_index(self, query, **kwargs):
"""Query for data index
Args:
query: DataQuery
Raises:
RpcError: Problem communicating with the robot.
"""
return self._do_get_data_index(self.call, query, **kwargs)
def get_data_index_async(self, query, **kwargs):
"""Async version of get_data_index."""
return self._do_get_data_index(self.call_async, query, **kwargs)
def _do_get_data_index(self, func, query, **kwargs):
"""Internal get_data_index RPC stub call."""
request = data_index_protos.GetDataIndexRequest(data_query=query)
return func(self._stub.GetDataIndex, request, value_from_response=None,
error_from_response=common_header_errors, **kwargs)
def get_data_pages(self, time_range, **kwargs):
return self._do_get_data_pages(self.call, time_range, **kwargs)
def get_data_pages_async(self, time_range, **kwargs):
return self._do_get_data_pages(self.call_async, time_range, **kwargs)
def _do_get_data_pages(self, func, time_range, **kwargs):
"""Internal get_data_pages RPC stub call."""
request = data_index_protos.GetDataPagesRequest(time_range=time_range)
return func(self._stub.GetDataPages, request, value_from_response=None,
error_from_response=common_header_errors, **kwargs)
def delete_data_pages(self, time_range, page_ids, **kwargs):
return self._do_delete_data_pages(self.call, time_range, page_ids, **kwargs)
def delete_data_pages_async(self, time_range, page_ids, **kwargs):
return self._do_delete_data_pages(self.call_async, time_range, page_ids, **kwargs)
def _do_delete_data_pages(self, func, time_range, page_ids, **kwargs):
"""Internal delete_data_pages RPC stub call."""
request = data_index_protos.DeleteDataPagesRequest(time_range=time_range, page_ids=page_ids)
return func(self._stub.DeleteDataPages, request, value_from_response=None,
error_from_response=common_header_errors, **kwargs)
def get_events_comments(self, query, **kwargs):
"""Query for operator comments and events
Args:
query: EventsCommentsSpec
Raises:
RpcError: Problem communicating with the robot.
"""
return self._do_get_events_comments(self.call, query, **kwargs)
def get_events_comments_async(self, query, **kwargs):
"""Async version of get_data_index."""
return self._do_get_events_comments(self.call_async, query, **kwargs)
def _do_get_events_comments(self, func, query, **kwargs):
"""Internal get_data_index RPC stub call."""
request = data_index_protos.GetEventsCommentsRequest(event_comment_request=query)
return func(self._stub.GetEventsComments, request, value_from_response=None,
error_from_response=common_header_errors, **kwargs)
def get_data_buffer_status(self, get_blob_specs=False, **kwargs):
"""Query for operator comments and events
Args:
get_blob_specs (bool): whether to list message series.
Raises:
RpcError: Problem communicating with the robot.
"""
return self._do_get_data_buffer_status(self.call, get_blob_specs, **kwargs)
def get_data_buffer_status_async(self, get_blob_specs=False, **kwargs):
"""Async version of get_data_index."""
return self._do_get_data_buffer_status(self.call_async, get_blob_specs, **kwargs)
def _do_get_data_buffer_status(self, func, get_blob_specs, **kwargs):
"""Internal get_data_buffer_status RPC stub call."""
request = data_index_protos.GetDataBufferStatusRequest(get_blob_specs=get_blob_specs)
return func(self._stub.GetDataBufferStatus, request, value_from_response=None,
error_from_response=common_header_errors, **kwargs)