-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtrajectory_viz.py
More file actions
104 lines (81 loc) · 3.17 KB
/
trajectory_viz.py
File metadata and controls
104 lines (81 loc) · 3.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python3
"""
Subscribes to /vo/odometry and plots the estimated trajectory in real time.
When the publisher finishes (no message for 3 s), saves the final plot to
/workspace/results/trajectory.png and exits.
Usage:
mkdir -p /workspace/results
python3 /workspace/trajectory_viz.py
"""
import os
import time
import matplotlib
matplotlib.use("Agg") # no display needed inside container
import matplotlib.pyplot as plt
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
SAVE_PATH = "/workspace/results/trajectory.png"
TIMEOUT_SEC = 3.0 # save + exit after this many idle seconds
class TrajectoryViz(Node):
def __init__(self):
super().__init__("trajectory_viz")
self.xs_: list[float] = []
self.zs_: list[float] = []
self.last_msg_time_ = time.time()
self.sub_ = self.create_subscription(
Odometry, "/vo/odometry", self.odom_cb, 50)
# Watchdog timer: checks every second if the publisher has gone quiet
self.watchdog_ = self.create_timer(1.0, self.watchdog_cb)
os.makedirs(os.path.dirname(SAVE_PATH), exist_ok=True)
self.get_logger().info(
f"Listening on /vo/odometry — will save to {SAVE_PATH} "
f"when stream stops for {TIMEOUT_SEC}s ...")
def odom_cb(self, msg: Odometry):
self.xs_.append(msg.pose.pose.position.x)
self.zs_.append(msg.pose.pose.position.z)
self.last_msg_time_ = time.time()
if len(self.xs_) % 100 == 0:
self.get_logger().info(
f" {len(self.xs_)} poses received | "
f"latest x={self.xs_[-1]:.1f} z={self.zs_[-1]:.1f}")
def watchdog_cb(self):
if not self.xs_:
return # haven't received anything yet
if time.time() - self.last_msg_time_ > TIMEOUT_SEC:
self.get_logger().info("Stream ended — saving trajectory plot ...")
self._save_plot()
raise SystemExit(0)
def _save_plot(self):
fig, ax = plt.subplots(figsize=(10, 8))
ax.plot(self.xs_, self.zs_,
color="#E24B4A", linewidth=1.8, linestyle="--",
label="Estimated VO (C++)")
# Mark start and end
ax.scatter(self.xs_[0], self.zs_[0], s=80,
color="#3B8BD4", zorder=5, label="Start")
ax.scatter(self.xs_[-1], self.zs_[-1], s=80,
color="#1D9E75", zorder=5, label="End")
ax.set_title("Visual Odometry — C++ / ROS2 (KITTI Sequence 00)",
fontsize=14)
ax.set_xlabel("X (unit scale)", fontsize=11)
ax.set_ylabel("Z (unit scale)", fontsize=11)
ax.legend(fontsize=11)
ax.grid(True, alpha=0.4)
ax.set_aspect("equal", adjustable="datalim")
fig.tight_layout()
fig.savefig(SAVE_PATH, dpi=150)
plt.close(fig)
self.get_logger().info(f"Saved → {SAVE_PATH} ({len(self.xs_)} poses)")
def main():
rclpy.init()
node = TrajectoryViz()
try:
rclpy.spin(node)
except SystemExit:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()