-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtelemetry_stream.py
More file actions
121 lines (99 loc) · 4.1 KB
/
telemetry_stream.py
File metadata and controls
121 lines (99 loc) · 4.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# telemetry_stream.py
import asyncio
from datetime import datetime, timezone
from mavsdk import System
class ConnectionManager:
def __init__(self):
self.is_connected = False
self.drone = None
telemetry_data = {
"position": None,
"velocity": None,
"altitude": None,
"odometry": None,
}
async def stream_position(queue, retry_delay=1.0):
"""Stream telemetry data from drone to queue with connection management."""
conn = ConnectionManager()
while True: # Infinite retry loop
try:
conn.drone = System()
print("Attempting to connect to drone...")
await conn.drone.connect(system_address="udp://:14540")
# Connection state handler
async def handle_connection():
previous_state = None
async for state in conn.drone.core.connection_state():
if state.is_connected != previous_state:
conn.is_connected = state.is_connected
if conn.is_connected:
print("Connection established!")
else:
print("Connection lost!")
break
previous_state = state.is_connected
# Start connection monitoring
connection_task = asyncio.create_task(handle_connection())
# Set telemetry rates
await asyncio.gather(
conn.drone.telemetry.set_rate_position(10.0),
conn.drone.telemetry.set_rate_velocity_ned(10.0),
conn.drone.telemetry.set_rate_altitude(10.0),
conn.drone.telemetry.set_rate_odometry(10.0),
conn.drone.telemetry.set_rate_attitude_quaternion(10.0)
)
# Start telemetry consumers
tasks = [
connection_task,
asyncio.create_task(consume_position(conn)),
asyncio.create_task(consume_velocity(conn)),
asyncio.create_task(consume_altitude(conn)),
asyncio.create_task(consume_odometry(conn)),
asyncio.create_task(publish_at_interval(queue, conn)),
]
# Wait until connection is lost
while conn.is_connected:
await asyncio.sleep(1)
except Exception as e:
print(f"Connection error: {str(e)}")
finally:
# Cleanup
conn.is_connected = False
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
print(f"Reconnecting in {retry_delay} seconds...")
await asyncio.sleep(retry_delay)
async def consume_position(conn):
"""Consume position telemetry data."""
async for position in conn.drone.telemetry.position():
if conn.is_connected and position.latitude_deg:
telemetry_data["position"] = position
async def consume_velocity(conn):
"""Consume velocity telemetry data."""
async for velocity in conn.drone.telemetry.position_velocity_ned():
if conn.is_connected:
telemetry_data["velocity"] = velocity
async def consume_altitude(conn):
"""Consume altitude telemetry data."""
async for altitude in conn.drone.telemetry.altitude():
if conn.is_connected:
telemetry_data["altitude"] = altitude
async def consume_odometry(conn):
"""Consume odometry telemetry data."""
async for odometry in conn.drone.telemetry.odometry():
if conn.is_connected:
telemetry_data["odometry"] = odometry
async def publish_at_interval(queue, conn, interval_sec=1.0):
"""Publish telemetry data to queue at regular intervals."""
while True:
if conn.is_connected and all(telemetry_data.values()):
await queue.put(
(
telemetry_data["position"],
telemetry_data["velocity"],
telemetry_data["altitude"],
telemetry_data["odometry"],
)
)
await asyncio.sleep(interval_sec)