-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmission_generator.py
More file actions
191 lines (169 loc) · 6.82 KB
/
mission_generator.py
File metadata and controls
191 lines (169 loc) · 6.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import enum
import os
from uuid import uuid4
from PySmartSkies.Models.Operation import Operation
from dotenv import load_dotenv
from PySmartSkies.DIS_API import DIS_API
from PySmartSkies.CVMS_API import CVMS_API
from PySmartSkies.Credentials import DIS_Credentials, CVMS_Credentials
from PySmartSkies.Session import Session
import json
from matplotlib.style import available
from Dependencies.CAELUS_SmartSkies.PySmartSkies.Models.Product import Product
from start import start_with_payload
import time
import pickle
from datetime import datetime, timedelta
def authenticate(cvms_credentials, dis_credentials):
session = Session(cvms_credentials, dis_credentials)
dis_api = DIS_API(session)
dis_api.authenticate()
cvms_api = CVMS_API(session)
cvms_api.authenticate()
return cvms_api, dis_api
def pp_vendors_and_products(vendors, products):
vids = {str(v.id):v for v in vendors}
for v_id, ps in products.items():
if len(ps) == 0:
print(f'No products for vendor {v_id}')
print(f'Vendor: {vids[v_id].name} (id: {v_id})')
for p in ps:
print(f"\t> {p}")
def get_possible_vendors_and_products(dis_api: DIS_API, cvms_api: CVMS_API):
VENDORS_PICKLE_F = 'vendors.pickle'
try:
with open(VENDORS_PICKLE_F, 'rb') as f:
vs_ps = pickle.loads(f.read())
return vs_ps['vendors'], vs_ps['products']
except Exception as e:
print(e)
vendors = cvms_api.get_vendor_list(items_per_page=400)
products = {
str(v.id):cvms_api.get_product_list_from_vendor(v.id) for v in vendors
}
with open(VENDORS_PICKLE_F, 'wb') as f:
s = pickle.dumps({'vendors':vendors, 'products':products})
f.write(s)
return vendors, products
def make_order(cvms_api: CVMS_API, product, vendor):
res = cvms_api.place_order(vendor, [product])
o = cvms_api.checkout_orders(res)
return o, product
def get_drone_base_weight():
while True:
try:
kg = float(input("Drone base weight (kg): "))
if kg <= 0:
raise Exception("Can't be lower or equal to zero")
return kg
except:
pass
def prompt_selection(options: dict):
keys = list(options.keys())
while True:
try:
for i,k in enumerate(keys):
print(f'{i}) {k}')
selection = int(input(f'(0-{len(keys)-1}): '))
if selection >= len(keys):
raise Exception()
return options[keys[selection]]
except Exception as e:
print(e)
import os
def drone_selection():
AVAILABLE_DRONES_DIR = './available_drones'
drone_files = os.listdir(AVAILABLE_DRONES_DIR)
while True:
try:
print("Choose one of the available drones:")
for i,drone in enumerate(drone_files):
print(f'{i}) {drone.split(".json")[0]}')
return f'{drone_files[int(input(f"(0-{len(drone_files)}): "))]}'
except:
print(f"Please choose a number between 0 and {len(drone_files)}")
pass
def make_operation(dis_api: DIS_API, product: Product):
deliveries, pilots, drones, control_areas = dis_api.get_requested_deliveries()
deliveries = sorted(deliveries, key=
lambda d: datetime.strptime(d.submit_time, '%Y-%m-%dT%H:%M:%S')
, reverse=True)
if len(deliveries) == 0:
print('Smartskies failed in creating delivery. Hospitals may be too far apart!')
exit(-1)
drone = drones[-1]
# Operation begin in smartskies request
effective_time_begin = datetime.utcnow()
effective_time_begin += timedelta(minutes=1)
# Orchestrator effective time start
time_begin_unix = time.time() + 2 * 60 + 60
ops = dis_api.create_operation(deliveries[-1], drone, control_areas[-1], effective_time_begin.isoformat())
op_details = dis_api.get_operation_details_with_delivery_id(deliveries[-1].id)
op: Operation = ops[0]
wps = op.get_waypoints()
payload_mass = product.per_item_weight
drone_config_file = drone_selection()
# Takeoff location must be rounded up / increased slightly
# To prevent collision of the drone with the bottom of the
# takeoff volume
initial_lon_lat_alt_corrected = list(op.get_takeoff_location())
initial_lon_lat_alt_corrected[-1] += 0.5
# --------
group_id = input("Group id (blank for auto generated ID): ")
group_id = group_id if group_id != "" else str(uuid4())
payload = {
'waypoints': wps,
"operation_id": op_details.operation_id,
"control_area_id": control_areas[-1].control_area_id,
"operation_reference_number": ops[-1].reference_number,
"drone_id": drone.drone_id,
"drone_registration_number": drone.registration_number,
"dis_auth_token": dis_api._session.get_dis_token(),
"dis_refresh_token": dis_api._session.get_dis_refresh_token(),
"cvms_auth_token": dis_api._session.get_cvms_token(),
"delivery_id": deliveries[-1].id,
"thermal_model_timestep": 1,
"aeroacoustic_model_timestep": 0.004,
"drone_config_file":drone_config_file,
"payload_mass":payload_mass,
"g_acceleration": 9.81,
"group_id":group_id,
"effective_start_time": time_begin_unix,
"initial_lon_lat_alt": initial_lon_lat_alt_corrected,
"final_lon_lat_alt":op.get_landing_location()
}
return json.dumps(payload)
def parse_product_and_vendor(s, vendors, products):
vendor_id, product_id = s.split('-')
print(f'Chosen vendor: {vendor_id}, product: {product_id}')
return list(filter(lambda v: str(v.id) == vendor_id, vendors))[0], list(filter(lambda p: str(p.id) == product_id, products[vendor_id]))[0]
def mission_generator(cvms, dis):
try:
vs, ps = get_possible_vendors_and_products(dis, cvms)
pp_vendors_and_products(vs, ps)
s = input("Vendor number - Product number: ")
chosen_vendor, chosen_product = parse_product_and_vendor(s, vs, ps)
print(f'Chosen vendor: {chosen_vendor}')
order, product = make_order(cvms,chosen_product, chosen_vendor)
if order is not None and order['result']:
print(f"Order successful ({order})")
payload = make_operation(dis, product)
print(payload)
except Exception as e:
print(e)
input("Press enter to continue")
if __name__ == '__main__':
load_dotenv()
dis_credentials = DIS_Credentials(
os.environ['DIS_GRANT_TYPE'],
os.environ['DIS_CLIENT_ID'],
os.environ['DIS_USERNAME'],
os.environ['DIS_PASSWORD']
)
cvms_credentials = CVMS_Credentials(
os.environ['CVMS_PHONE'],
os.environ['CVMS_PASSWORD'],
os.environ['CVMS_DEVICE_ID']
)
cvms, dis = authenticate(cvms_credentials, dis_credentials)
mission_generator(cvms, dis)