-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathstart.py
More file actions
71 lines (60 loc) · 2.24 KB
/
start.py
File metadata and controls
71 lines (60 loc) · 2.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from dotenv import load_dotenv
from os import environ
from DigitalTwin.PayloadModels import ControllerPayload, SimulatorPayload
load_dotenv()
import sys
from os import environ, path
import logging
from os.path import exists
from DigitalTwin.SimulationFactory import new_simulation
from DigitalTwin.error_codes import *
from DigitalTwin.ExitHandler import ExitHandler
logger = logging.getLogger()
logging.basicConfig(level=logging.INFO)
def check_exported_px4():
local_px4 = 'Dependencies/PX4-Autopilot'
if not 'PX4_ROOT_FOLDER' in environ:
if path.exists(local_px4):
environ['PX4_ROOT_FOLDER'] = local_px4
else:
logger.error(f'You must export your local copy of the CAELUS px4 fork folder (export PX4_ROOT_FOLDER=<px4 folder>)')
exit(-1)
def start_with_payload(payload, headless=False):
check_exported_px4()
print('Staring simulation...')
exit_handler: ExitHandler = ExitHandler.shared()
try:
sim_payload = SimulatorPayload(payload)
controller_payload = ControllerPayload(payload)
gui, controller, sstack = new_simulation(sim_payload, controller_payload, headless=headless)
sstack.start()
if gui is not None:
gui.start()
except Exception as e:
logging.getLogger().error(f'Digital Twin Top Level Error:')
logging.getLogger().error(e)
import traceback
traceback.print_exc()
finally:
code, msg = exit_handler.block_until_exit()
if msg is not None:
logging.getLogger().error(f'Exiting with error message: {msg}')
sys.exit(code)
import json
import sys
if __name__ == '__main__':
if len(sys.argv) == 1 and 'PAYLOAD' not in environ:
print("Usage `python3 start.py <mission_payload_json>`")
print("or")
print("Export 'PAYLOAD={...}' and then issue `python3 start.py`")
exit(-1)
if 'PAYLOAD' in environ:
json_payload = environ['PAYLOAD']
else:
_, json_payload = sys.argv
headless = True if 'IN_DOCKER' in environ else False
try:
start_with_payload(json.loads(json_payload), headless=headless)
except Exception as e:
print(f'Failed in reading json payload ({e})')
exit(JSON_READ_EC)