-
Notifications
You must be signed in to change notification settings - Fork 599
Expand file tree
/
Copy pathaccess_controlled_door_util.py
More file actions
206 lines (172 loc) · 8.62 KB
/
access_controlled_door_util.py
File metadata and controls
206 lines (172 loc) · 8.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
import json
from pathlib import Path
from string import Template
import requests
from bosdyn.client.url_validation_util import safe_api_call
API_TIMEOUT_DEFAULT = 30
def file_to_json(file_path):
"""
Helper to read and parse JSON files.
Args:
file_path (Path): Path to JSON file.
Returns:
dict: Parsed JSON data.
"""
return json.loads(file_path.read_text())
# Functions to handle api calls to access control systems
def door_action(api_calls, door_id, action, path_to_cert=None, is_robot=True):
"""Executes a sequence of API calls required to perform an action (e.g., open or close) on a specified door,
handling data substitutions and certificate verification as needed.
api_calls (list of dict): List of API call specifications, where each dict contains information
such as 'method', 'url', 'action', 'sni_hostname', 'route', 'request_data', and 'responses'.
door_id (str): Identifier of the door to perform the action on.
action (list of str): The action(s) to perform (e.g., "open", "close"). Only calls matching
the specified action(s) will be executed.
path_to_cert (str, optional): Path to a certificate file for SSL verification. If None, no certificate
is used.
is_robot (bool, optional): Indicates if the API calls are being made on behalf of a robot. Defaults to True.
Returns:
dict: An error message dictionary containing details about any error encountered during the API calls.
If all calls succeed, returns an empty dictionary.
"""
error_msg = {}
cross_call_substitutions = {"door_id": door_id}
for call_data in api_calls:
if call_data.get("action") not in action:
continue
method = call_data.get("method")
url = call_data.get("url")
sni_hostname = call_data.get("sni_hostname")
route = call_data.get("route")
request_data = call_data.get("request_data", {})
responses = call_data.get("responses", {})
call_action = call_data.get("action", "")
if not method or not url:
error_msg["extra_message"] = "API call error: missing method or url"
break
# Format URL and data - note: substitutions are case-INsensitive
try:
# This will only replace instances of values formatted like $variable or ${variable}
url = Template(url).safe_substitute(cross_call_substitutions)
templated_request_data = {}
for k, v in request_data.items():
if isinstance(v, dict): # Handle nested dicts, like headers
templated_request_data[k] = {
k2:
Template(v2).safe_substitute(cross_call_substitutions) if isinstance(
v2, str) else v2 for k2, v2 in v.items()
}
elif isinstance(v, str): # Handle top level string values
templated_request_data[k] = Template(v).safe_substitute(
cross_call_substitutions)
else:
templated_request_data[k] = v
# Check if cert file was included at the configuration, if yes use it
if path_to_cert:
templated_request_data["verify"] = path_to_cert
except Exception as e:
print(f"{e} has no value, couldn't make substitution")
result_of_call, api_error_msg = make_access_control_system_api_call(
method, url, templated_request_data, responses, sni_hostname=sni_hostname,
is_robot=is_robot, route=route)
if api_error_msg:
error_msg["action"] = call_action
error_msg["api_error"] = api_error_msg
break
# If the call returned data, store it for subsequent calls
if result_of_call:
try:
for result in result_of_call:
tag = result[0]
content = result[1]
cross_call_substitutions[tag] = content
except Exception as e:
print(f"API call error: couldn't read data, got exception {e}")
error_msg["extra_message"] = f"API call error: couldn't read data, got exception"
break
return error_msg
def make_access_control_system_api_call(method, url, request_data, store_responses=None,
sni_hostname=None, is_robot=True, route=None):
"""Makes an HTTP request and optionally extracts specific fields from the JSON response.
Args:
method (str): HTTP method to use (e.g., 'GET', 'POST')
url (str): The endpoint URL for the API call
request_data (dict): Request configuration including headers, body, etc.
store_responses (Optional[Dict[str, str]]): Dictionary mapping response field names to
JSON paths. For example:
{
"token": "auth.token", # Store response's auth.token as "token"
"session_id": "data.session" # Store response's data.session as "session_id"
}
If None, no data will be extracted from the response.
sni_hostname (str|None): If specified, this parameter provides the hostname declared by
and expected by the access control server during TLS negotiation. This should only be
required if the server's hostname is not resolvable via DNS.
route (str|None): Route type to use ("WIFI", "LTE").
If None, default interface (WIFI) will be used.
Returns:
Tuple[Optional[List[Tuple[str, Any]]], Optional[Dict]]:
- First element: If store_responses was provided and matching data was found,
returns list of tuples [(field_name, value), ...]. None otherwise.
- Second element: If error occurred, returns dict with error details:
{
'status_code': int,
'reason': str,
'elapsed': float
}
None if successful.
Example:
>>> store_responses = {"auth_token": "data.token"}
>>> data, error = make_access_control_system_api_call("POST", "https://api.door/auth",
... {"json": {"key": "value"}},
... store_responses)
>>> if data:
... # data might be [("auth_token", "abc123")]
... token = dict(data)["auth_token"]
"""
try:
response, status_message = safe_api_call(method, url, sni_hostname,
timeout=API_TIMEOUT_DEFAULT, is_robot=is_robot,
interface=route, **request_data)
if response is None:
# There was an error during the API call
return None, status_message
if response.status_code == 200:
if store_responses:
# Only try to parse JSON and extract data if we need to store responses
data_json = response.json()
return [(tag, get_value_by_path(data_json, path))
for tag, path in store_responses.items()
if get_value_by_path(data_json, path) is not None], None
# If we don't need to store responses, just return success
return None, None
# If the status code isn't 200, something went wrong
return None, {
'status_code': response.status_code,
'reason': response.reason,
'elapsed': response.elapsed.total_seconds(),
}
except requests.exceptions.RequestException as e:
print(f"API call failed: {str(e)}")
return None, {
'status_code': getattr(e.response, 'status_code', None),
'reason': getattr(e.response, 'reason', str(e)),
'elapsed': getattr(e.response, 'elapsed', None)
}
def get_value_by_path(data_json, path_to_info):
"""Takes in a json representing the data of a call response, and a string representing the
path through the json to the desired data. Traverses the json and returns the data.
"""
keys = path_to_info.split('.')
result = data_json
for key in keys:
result = result.get(key, None)
if result is not None:
return result
else:
return False