-
Notifications
You must be signed in to change notification settings - Fork 598
Expand file tree
/
Copy pathutils.py
More file actions
350 lines (297 loc) · 14.8 KB
/
utils.py
File metadata and controls
350 lines (297 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Utility functions for bosdyn-orbit"""
import datetime
import hashlib
import hmac
import json
import os
import secrets
import shutil
import sys
import time
from typing import Dict, List
from bosdyn.orbit.exceptions import WebhookSignatureVerificationError
API_TOKEN_ENV_VAR = "BOSDYN_ORBIT_CLIENT_API_TOKEN"
DEFAULT_MAX_MESSAGE_AGE_MS = 5 * 60 * 1000
def get_api_token() -> str:
""" Obtains an API token from either an environment variable or terminal input
Returns
api_token: the API token obtained from the instance
"""
api_token = os.environ.get(API_TOKEN_ENV_VAR)
if not api_token:
if sys.stdin.isatty():
print('API Token: ', end='', file=sys.stderr)
api_token = input()
return api_token
def add_base_arguments(parser):
""" Adds the most common arguments to the parser
This includes the hostname, verify, and cert arguments.
Args:
parser: the argument parser
"""
parser.add_argument('--hostname', help='IP address associated with the Orbit instance',
required=True, type=str)
parser.add_argument(
'--verify',
help=
"verify(path to a CA bundle or Boolean): controls whether we verify the server's TLS certificate",
default=True,
)
parser.add_argument(
'--cert', help=
"a client certificate file for authentication (a .pem file containing the certificate and "
"key pair, or two separate files containing the certificate and key respectively and in "
"that order)", nargs='+', default=None)
def get_latest_created_at_for_run_events(client: 'bosdyn.orbit.client.Client',
params: Dict = {}) -> datetime.datetime:
""" Given a dictionary of query params, returns the max created at time for run events
Args:
client: the client for the web API
params: the query params associated with the get request
Raises:
RequestExceptions: exceptions thrown by the Requests library
UnauthenticatedClientError: indicates that the client is not authenticated properly
Returns:
The max created at time for run events in datetime
"""
base_params = {'limit': 1, 'orderBy': '-created_at'}
base_params.update(params)
latest_resource = client.get_run_events(params=base_params).json()
if not latest_resource["resources"]:
client_timestamp_response = client.get_system_time()
ms_since_epoch = int(client_timestamp_response.json()["msSinceEpoch"])
return datetime.datetime.utcfromtimestamp(ms_since_epoch / 1000)
return datetime_from_isostring(latest_resource["resources"][0]["createdAt"])
def get_latest_run_capture_resources(client: 'bosdyn.orbit.client.Client',
params: Dict = {}) -> List:
""" Given a dictionary of query params, returns the latest run capture resources in json format
Args:
client: the client for Orbit web API
params: the query params associated with the get request
Raises:
RequestExceptions: exceptions thrown by the Requests library
UnauthenticatedClientError: indicates that the client is not authenticated properly
Returns:
A list of resources obtained from a RESTful endpoint
"""
base_params = {'orderBy': '-created_at'}
base_params.update(params)
run_captures = client.get_run_captures(params=base_params).json()
return run_captures["resources"]
def get_latest_created_at_for_run_captures(client: 'bosdyn.orbit.client.Client',
params: Dict = {}) -> datetime.datetime:
""" Given a dictionary of query params, returns the max created at time for run captures
Args:
client: the client for Orbit web API
params: the query params associated with the get request
Raises:
RequestExceptions: exceptions thrown by the Requests library
UnauthenticatedClientError: indicates that the client is not authenticated properly
Returns:
The max created at time for run captures in datetime
"""
base_params = {'limit': 1, 'orderBy': '-created_at'}
base_params.update(params)
latest_resource = client.get_run_captures(params=base_params).json()
if not latest_resource["resources"]:
client_timestamp_response = client.get_system_time()
ms_since_epoch = int(client_timestamp_response.json()["msSinceEpoch"])
return datetime.datetime.utcfromtimestamp(ms_since_epoch / 1000)
return datetime_from_isostring(latest_resource["resources"][0]["createdAt"])
def get_latest_run_resource(client: 'bosdyn.orbit.client.Client', params: Dict = {}) -> List:
""" Given a dictionary of query params, returns the latest run resource in json format
Args:
client: the client for Orbit web API
params: the query params associated with the get request
Raises:
RequestExceptions: exceptions thrown by the Requests library
UnauthenticatedClientError: indicates that the client is not authenticated properly
Returns:
A list corresponding to a run resource obtained from a RESTful endpoint in json
"""
base_params = {'limit': 1, 'orderBy': 'newest'}
base_params.update(params)
latest_run_json = client.get_runs(params=base_params).json()
if not latest_run_json['resources']:
return None
return latest_run_json['resources'][0]
def get_latest_run_in_progress(client: 'bosdyn.orbit.client.Client', params: Dict = {}) -> List:
""" Given a dictionary of query params, returns the latest running resource in json format
Args:
client: the client for Orbit web API
params: the query params associated with the get request
Raises:
RequestExceptions: exceptions thrown by the Requests library
UnauthenticatedClientError: indicates that the client is not authenticated properly
Returns:
A list corresponding to a run obtained from a RESTful endpoint in json
"""
base_params = {'orderBy': 'newest'}
base_params.update(params)
latest_resources = client.get_runs(params=base_params).json()["resources"]
for resource in latest_resources:
if resource["missionStatus"] not in [
"SUCCESS", "FAILURE", "ERROR", "STOPPED", "NONE", "UNKNOWN"
]:
return resource
return None
def get_latest_end_time_for_runs(client: 'bosdyn.orbit.client.Client',
params: Dict = {}) -> datetime.datetime:
""" Given a dictionary of query params, returns the max end time for runs
Args:
client: the client for Orbit web API
params: the query params associated with the get request
Raises:
RequestExceptions: exceptions thrown by the Requests library
UnauthenticatedClientError: indicates that the client is not authenticated properly
Returns:
The max end time for runs in datetime
"""
base_params = {'limit': 1, 'orderBy': 'newest'}
base_params.update(params)
latest_resource = client.get_runs(params=base_params).json()
if latest_resource.get("resources"):
latest_end_time = latest_resource.get("resources")[0]["endTime"]
if latest_end_time:
return datetime_from_isostring(latest_end_time)
client_timestamp_response = client.get_system_time()
ms_since_epoch = int(client_timestamp_response.json()["msSinceEpoch"])
return datetime.datetime.utcfromtimestamp(ms_since_epoch / 1000)
def write_image(img_raw, image_fp: str) -> None:
""" Given a raw image and a desired output directory, writes the image to a file
Args:
img_raw(Raw image object): the input raw image
image_fp: the output filepath for the image
"""
os.makedirs(os.path.dirname(image_fp), exist_ok=True)
with open(image_fp, 'wb') as out_file:
shutil.copyfileobj(img_raw, out_file)
def data_capture_urls_from_run_events(client: 'bosdyn.orbit.client.Client', run_events: List,
list_of_channel_names: List = None) -> List:
""" Given run events and list of desired channel names, returns the list of data capture urls
Args:
client: the client for Orbit web API
run_events: a json representation of run events obtained from a RESTful endpoint
list_of_channel_names: a list of channel names associated with the desired data captures.
Defaults to None which returns all the available channels.
Returns:
data_urls: a list of urls
"""
all_run_events_resources = run_events["resources"]
data_urls = []
for resource in all_run_events_resources:
all_data_captures = resource["dataCaptures"]
for data_capture in all_data_captures:
if list_of_channel_names is None:
# check if exists in unique_list or not
if list_of_channel_names not in data_urls:
data_urls.append(f'https://{client._hostname}' + data_capture["dataUrl"])
elif data_capture["channelName"] in list_of_channel_names:
# check if exists in unique_list or not
if list_of_channel_names not in data_urls:
data_urls.append(f'https://{client._hostname}' + data_capture["dataUrl"])
return data_urls
def data_capture_url_from_run_capture_resources(client: 'bosdyn.orbit.client.Client',
run_capture_resources: List,
list_of_channel_names: List = None) -> List:
""" Given run capture resources and list of desired channel names, returns the list of data capture urls
Args:
client: the client for Orbit web API
run_capture_resources: a list of resources obtained from a RESTful endpoint
list_of_channel_names: a list of channel names associated with the desired data captures.
Defaults to None which returns all the available channels.
Returns:
data_urls: a list of urls
"""
data_urls = []
for data_capture in run_capture_resources:
if list_of_channel_names is None:
# check if exists in unique_list or not
if list_of_channel_names not in data_urls:
data_urls.append(f'https://{client._hostname}' + data_capture["dataUrl"])
elif data_capture["channelName"] in list_of_channel_names:
# check if exists in unique_list or not
if list_of_channel_names not in data_urls:
data_urls.append(f'https://{client._hostname}' + data_capture["dataUrl"])
return data_urls
def get_action_names_from_run_events(run_events: Dict) -> List:
""" Given run events, returns a list of action names
Args:
run_events: a representation of run events obtained from a RESTful endpoint
Returns:
action_names: a list of action names
"""
all_run_events_resources = run_events["resources"]
action_names = []
for resource in all_run_events_resources:
action_names.append(resource["actionName"])
return action_names
def datetime_from_isostring(datetime_isostring: str) -> datetime.datetime:
""" Returns the datetime representation of the iso string representation of time
Args:
datetime_isostring: the iso string representation of time
Returns:
The datetime representation of the iso string representation of time
"""
if "Z" in datetime_isostring:
return datetime.datetime.strptime(datetime_isostring, "%Y-%m-%dT%H:%M:%S.%fZ")
if "+" in datetime_isostring:
return datetime.datetime.strptime(datetime_isostring[0:datetime_isostring.index("+")],
"%Y-%m-%dT%H:%M:%S.%f")
def validate_webhook_payload(payload: Dict, signature_header: str, secret: str,
max_age_ms: int = DEFAULT_MAX_MESSAGE_AGE_MS) -> None:
""" Verifies that the webhook payload came from
Args:
payload: the JSON body of the webhooks req
signature_header: the value of the signature header
secret: the configured secret value for this webhook
max_age_ms: the maximum age of the message before it's considered invalid (default is 5 minutes)
Raises:
bosdyn.orbit.exceptions.WebhookSignatureVerificationError: thrown if the webhook signature is invalid
"""
if not signature_header:
raise WebhookSignatureVerificationError("Signature header cannot be empty")
header_components = dict(entry.split('=') for entry in signature_header.split(','))
send_time = header_components.get('t')
send_time_ms = int(send_time) if send_time is not None and send_time.isdigit() else None
received_hmac = header_components.get('v1')
if not send_time_ms or not received_hmac:
raise WebhookSignatureVerificationError(
"Missing either send time or HMAC in signature header")
current_time_ns = time.time()
current_time_ms = round(current_time_ns) * 1000
time_diff_ms = current_time_ms - send_time_ms
if time_diff_ms > max_age_ms:
raise WebhookSignatureVerificationError(
f"The payload is {time_diff_ms}ms old, which is greater than the maximum age {max_age_ms}ms"
)
full_payload_string = f'{send_time}.{json.dumps(payload, separators=(",",":"))}'
calculated_hmac = hmac.new(bytes.fromhex(secret), full_payload_string.encode('utf-8'),
hashlib.sha256).hexdigest()
time_safe_equal = secrets.compare_digest(received_hmac, calculated_hmac)
if not time_safe_equal:
raise WebhookSignatureVerificationError(
"The received HMAC did not match the expected value")
def print_json_response(response: 'requests.Response') -> bool:
""" A helper function to print the json response.
Args:
options(Namespace) : parsed args used for configuration options.
Returns:
Boolean that indicates the response is okay with status code 200.
"""
if response.ok:
try:
json_data = response.json()
print(f"JSON Response: {json_data}")
return True
except ValueError:
print("Response is ok but not in JSON format.")
return False
else:
print(f"Request failed with status : {response.text}")
return False