-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspot_controls_toggle.py
More file actions
executable file
·187 lines (155 loc) · 6.09 KB
/
spot_controls_toggle.py
File metadata and controls
executable file
·187 lines (155 loc) · 6.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#!/usr/bin/python3
import os
import subprocess
import signal
import time
import uuid
import libtmux
import traceback
import gi
import threading
gi.require_version('AppIndicator3', '0.1')
from gi.repository import AppIndicator3 as appindicator, Gtk
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
home_dir = os.path.expanduser("~")
script_dir = os.path.dirname(os.path.abspath(__file__))
def load_env_file(path):
with open(path) as f:
for line in f:
line = line.strip()
if line.startswith("export "):
key, _, value = line[len("export "):].partition("=")
os.environ.setdefault(key.strip(), value.strip())
log_path = os.path.join(script_dir, "crash_log.txt")
def log(msg):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
with open(log_path, "a") as f:
f.write(f"[{timestamp}] {msg}\n")
try:
load_env_file(os.path.join(script_dir, "humble"))
MAX_USER = os.environ["MAX_USER"]
MAX_IP = os.environ["MAX_IP"]
except Exception as e:
log(f"FATAL: {e}")
raise
workspace_path = "ros2_ws/src/alert_dashboard_rqt"
pkg_path = os.path.join(home_dir, workspace_path)
estop_perspective = f"{pkg_path}/resource/estop_rqt.perspective"
TMUX_SESSION = "steamdeck"
KONSOLE_TITLE = "spot_ssh"
# ---------------------------------------------------------------------------
# Window Management Logic
# ---------------------------------------------------------------------------
def snap_windows_to_desktops():
"""
Background thread to move specific windows back to Desktop 1.
wmctrl index: 0 = Desktop 1
"""
targets = [
("RViz", 0), # Push RViz back to Desktop 1
("estop", 0), # Push E-Stop back to Desktop 1
]
start_time = time.time()
# Poll for 15 seconds to catch windows as they spawn
while time.time() - start_time < 15:
found_all = True
for title_part, desktop_idx in targets:
try:
# -r: target by title substring, -t: move to desktop index
result = subprocess.run(
["wmctrl", "-r", title_part, "-t", str(desktop_idx)],
capture_output=True
)
if result.returncode != 0:
found_all = False
except Exception:
found_all = False
if found_all:
break
time.sleep(1.0)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def get_unique_perspective(original_path):
if not os.path.exists(original_path):
raise FileNotFoundError(f"Perspective file not found: {original_path}")
unique_id = str(uuid.uuid4())[:8]
tmp_dir = "/tmp/rqt_perspectives"
os.makedirs(tmp_dir, exist_ok=True)
base, ext = os.path.splitext(os.path.basename(original_path))
unique_path = os.path.join(tmp_dir, f"{base}_{unique_id}{ext}")
if os.path.exists(unique_path):
os.remove(unique_path)
os.symlink(original_path, unique_path)
return unique_path
def get_session():
server = libtmux.Server()
sessions = server.sessions.filter(session_name=TMUX_SESSION)
return sessions[0] if sessions else None
# ---------------------------------------------------------------------------
# Launch Logic
# ---------------------------------------------------------------------------
def open_ros_apps():
# Force shift to Desktop 2 immediately
try:
subprocess.run(["qdbus", "org.kde.KWin", "/KWin", "org.kde.KWin.setCurrentDesktop", "2"])
except Exception as e:
log(f"Initial desktop shift failed: {e}")
server = libtmux.Server()
session = get_session()
if session:
session.kill()
time.sleep(0.3)
session = server.new_session(session_name=TMUX_SESSION, window_name="init", detach=True)
estop_unique = get_unique_perspective(estop_perspective)
commands = {
"dashboard": "ros2 run rqt_gui rqt_gui --standalone alert_dashboard_rqt.alert_dashboard_rqt.DashboardRqtPlugin",
"console": f"konsole --title {KONSOLE_TITLE} -e bash -c 'while true; do ssh {MAX_USER}@{MAX_IP} -t \"sleep 5; tmux new -A -s spot_session\"; sleep 5; done'",
"estop": f"rqt --force-discover --perspective-file {estop_unique} --lock-perspective",
"rviz2": "rviz2",
"controller": "ros2 launch spot_driver_plus controller_launch.py",
}
for i, (win_key, cmd) in enumerate(commands.items()):
if i == 0:
window = session.windows[0]
window.rename_window(win_key)
else:
window = session.new_window(window_name=win_key)
window.panes[0].send_keys(cmd, enter=True)
# Start the thread to push RViz and E-Stop back to Desktop 1
threading.Thread(target=snap_windows_to_desktops, daemon=True).start()
def close_ros_apps():
session = get_session()
if session:
session.kill()
# ---------------------------------------------------------------------------
# UI Setup
# ---------------------------------------------------------------------------
def toggle_ros_apps(_):
try:
if get_session():
close_ros_apps()
else:
open_ros_apps()
except Exception as e:
log(f"ERROR in toggle:\n{traceback.format_exc()}")
spot_icon = os.path.join(home_dir, "steam_deck_ros2/spot.png")
indicator = appindicator.Indicator.new(
"Spot Control View",
spot_icon,
appindicator.IndicatorCategory.SYSTEM_SERVICES
)
indicator.set_status(appindicator.IndicatorStatus.ACTIVE)
menu = Gtk.Menu()
toggle_item = Gtk.ImageMenuItem.new_with_label("Toggle Control View")
toggle_item.set_image(Gtk.Image.new_from_file(spot_icon))
toggle_item.set_always_show_image(True)
toggle_item.connect('activate', toggle_ros_apps)
menu.append(toggle_item)
menu.show_all()
indicator.set_menu(menu)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal.SIG_DFL)
Gtk.main()