-
Notifications
You must be signed in to change notification settings - Fork 87
Expand file tree
/
Copy pathhello_helicopter.py
More file actions
133 lines (95 loc) · 4.75 KB
/
hello_helicopter.py
File metadata and controls
133 lines (95 loc) · 4.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
Copyright (C) Microsoft Corporation. All rights reserved.
Demonstrates flying a quadrotor drone with camera sensors.
"""
import asyncio
from projectairsim import ProjectAirSimClient, Drone, World
from projectairsim.utils import projectairsim_log
from projectairsim.image_utils import ImageDisplay
# Async main function to wrap async drone commands
async def main():
# Create a Project AirSim client
client = ProjectAirSimClient()
# Initialize an ImageDisplay object to display camera sub-windows
image_display = ImageDisplay()
try:
# Connect to simulation environment
client.connect()
# Create a World object to interact with the sim world and load a scene
world = World(client, "scene_basic_helicopter.jsonc", delay_after_load_sec=2)
# Create a Drone object to interact with a drone in the loaded sim world
drone = Drone(client, world, "Drone1")
# ------------------------------------------------------------------------------
# Subscribe to chase camera sensor as a client-side pop-up window
chase_cam_window = "ChaseCam"
image_display.add_chase_cam(chase_cam_window)
client.subscribe(
drone.sensors["Chase"]["scene_camera"],
lambda _, chase: image_display.receive(chase, chase_cam_window),
)
# Subscribe to the downward-facing camera sensor's RGB and Depth images
rgb_name = "RGB-Image"
image_display.add_image(rgb_name, subwin_idx=0)
client.subscribe(
drone.sensors["DownCamera"]["scene_camera"],
lambda _, rgb: image_display.receive(rgb, rgb_name),
)
depth_name = "Depth-Image"
image_display.add_image(depth_name, subwin_idx=2)
client.subscribe(
drone.sensors["DownCamera"]["depth_camera"],
lambda _, depth: image_display.receive(depth, depth_name),
)
image_display.start()
# ------------------------------------------------------------------------------
# Set the drone to be ready to fly
# JSBSim robot currently does not support control the drone at runtime
# drone.enable_api_control()
# drone.arm()
# # ------------------------------------------------------------------------------
# projectairsim_log().info("takeoff_async: starting")
# takeoff_task = (
# await drone.takeoff_async()
# ) # schedule an async task to start the command
# # Example 1: Wait on the result of async operation using 'await' keyword
# await takeoff_task
# projectairsim_log().info("takeoff_async: completed")
# # ------------------------------------------------------------------------------
# # Command the drone to move up in NED coordinate system at 1 m/s for 4 seconds
# move_up_task = await drone.move_by_velocity_async(
# v_north=0.0, v_east=0.0, v_down=-1.0, duration=4.0
# )
# projectairsim_log().info("Move-Up invoked")
# await move_up_task
# projectairsim_log().info("Move-Up completed")
# # ------------------------------------------------------------------------------
# # Command the Drone to move down in NED coordinate system at 1 m/s for 4 seconds
# move_down_task = await drone.move_by_velocity_async(
# v_north=0.0, v_east=0.0, v_down=1.0, duration=4.0
# ) # schedule an async task to start the command
# projectairsim_log().info("Move-Down invoked")
# # Example 2: Wait for move_down_task to complete before continuing
# while not move_down_task.done():
# await asyncio.sleep(0.005)
# projectairsim_log().info("Move-Down completed")
# # ------------------------------------------------------------------------------
# projectairsim_log().info("land_async: starting")
# land_task = await drone.land_async()
# await land_task
# projectairsim_log().info("land_async: completed")
# # # ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# sleep for 5 minutes to contiue seeing the camera images
await asyncio.sleep(300)
# Shut down the drone
#drone.disarm()
#drone.disable_api_control()
# logs exception on the console
except Exception as err:
projectairsim_log().error(f"Exception occurred: {err}", exc_info=True)
finally:
# Always disconnect from the simulation environment to allow next connection
client.disconnect()
image_display.stop()
if __name__ == "__main__":
asyncio.run(main()) # Runner for async main function