Skip to content

Commit 965bf36

Browse files
committed
Add restart.
Temp allow action message.
1 parent aacf349 commit 965bf36

File tree

6 files changed

+342
-39
lines changed

6 files changed

+342
-39
lines changed

unilabos/app/main.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919

2020
from unilabos.utils.banner_print import print_status, print_unilab_banner
2121
from unilabos.config.config import load_config, BasicConfig, HTTPConfig
22+
from unilabos.app.utils import cleanup_for_restart
23+
24+
# Global restart flags (used by ws_client and web/server)
25+
_restart_requested: bool = False
26+
_restart_reason: str = ""
2227

2328

2429
def load_config_from_file(config_path):
@@ -503,13 +508,19 @@ def _exit(signum, frame):
503508
time.sleep(1)
504509
else:
505510
start_backend(**args_dict)
506-
start_server(
511+
restart_requested = start_server(
507512
open_browser=not args_dict["disable_browser"],
508513
port=BasicConfig.port,
509514
)
515+
if restart_requested:
516+
print_status("[Main] Restart requested, cleaning up...", "info")
517+
cleanup_for_restart()
518+
return
510519
else:
511520
start_backend(**args_dict)
512-
start_server(
521+
522+
# 启动服务器(默认支持WebSocket触发重启)
523+
restart_requested = start_server(
513524
open_browser=not args_dict["disable_browser"],
514525
port=BasicConfig.port,
515526
)

unilabos/app/utils.py

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
"""
2+
UniLabOS 应用工具函数
3+
4+
提供清理、重启等工具函数
5+
"""
6+
7+
import gc
8+
import os
9+
import threading
10+
import time
11+
12+
from unilabos.utils.banner_print import print_status
13+
14+
15+
def cleanup_for_restart() -> bool:
16+
"""
17+
Clean up all resources for restart without exiting the process.
18+
19+
This function prepares the system for re-initialization by:
20+
1. Stopping all communication clients
21+
2. Destroying ROS nodes
22+
3. Resetting singletons
23+
4. Waiting for threads to finish
24+
25+
Returns:
26+
bool: True if cleanup was successful, False otherwise
27+
"""
28+
print_status("[Restart] Starting cleanup for restart...", "info")
29+
30+
# Step 1: Stop WebSocket communication client
31+
print_status("[Restart] Step 1: Stopping WebSocket client...", "info")
32+
try:
33+
from unilabos.app.communication import get_communication_client
34+
35+
comm_client = get_communication_client()
36+
if comm_client is not None:
37+
comm_client.stop()
38+
print_status("[Restart] WebSocket client stopped", "info")
39+
except Exception as e:
40+
print_status(f"[Restart] Error stopping WebSocket: {e}", "warning")
41+
42+
# Step 2: Get HostNode and cleanup ROS
43+
print_status("[Restart] Step 2: Cleaning up ROS nodes...", "info")
44+
try:
45+
from unilabos.ros.nodes.presets.host_node import HostNode
46+
import rclpy
47+
from rclpy.timer import Timer
48+
49+
host_instance = HostNode.get_instance(timeout=5)
50+
if host_instance is not None:
51+
print_status(f"[Restart] Found HostNode: {host_instance.device_id}", "info")
52+
53+
# Gracefully shutdown background threads
54+
print_status("[Restart] Shutting down background threads...", "info")
55+
HostNode.shutdown_background_threads(timeout=5.0)
56+
print_status("[Restart] Background threads shutdown complete", "info")
57+
58+
# Stop discovery timer
59+
if hasattr(host_instance, "_discovery_timer") and isinstance(host_instance._discovery_timer, Timer):
60+
host_instance._discovery_timer.cancel()
61+
print_status("[Restart] Discovery timer cancelled", "info")
62+
63+
# Destroy device nodes
64+
device_count = len(host_instance.devices_instances)
65+
print_status(f"[Restart] Destroying {device_count} device instances...", "info")
66+
for device_id, device_node in list(host_instance.devices_instances.items()):
67+
try:
68+
if hasattr(device_node, "ros_node_instance") and device_node.ros_node_instance is not None:
69+
device_node.ros_node_instance.destroy_node()
70+
print_status(f"[Restart] Device {device_id} destroyed", "info")
71+
except Exception as e:
72+
print_status(f"[Restart] Error destroying device {device_id}: {e}", "warning")
73+
74+
# Clear devices instances
75+
host_instance.devices_instances.clear()
76+
host_instance.devices_names.clear()
77+
78+
# Destroy host node
79+
try:
80+
host_instance.destroy_node()
81+
print_status("[Restart] HostNode destroyed", "info")
82+
except Exception as e:
83+
print_status(f"[Restart] Error destroying HostNode: {e}", "warning")
84+
85+
# Reset HostNode state
86+
HostNode.reset_state()
87+
print_status("[Restart] HostNode state reset", "info")
88+
89+
# Shutdown executor first (to stop executor.spin() gracefully)
90+
if hasattr(rclpy, "__executor") and rclpy.__executor is not None:
91+
try:
92+
rclpy.__executor.shutdown()
93+
rclpy.__executor = None # Clear for restart
94+
print_status("[Restart] ROS executor shutdown complete", "info")
95+
except Exception as e:
96+
print_status(f"[Restart] Error shutting down executor: {e}", "warning")
97+
98+
# Shutdown rclpy
99+
if rclpy.ok():
100+
rclpy.shutdown()
101+
print_status("[Restart] rclpy shutdown complete", "info")
102+
103+
except ImportError as e:
104+
print_status(f"[Restart] ROS modules not available: {e}", "warning")
105+
except Exception as e:
106+
print_status(f"[Restart] Error in ROS cleanup: {e}", "warning")
107+
return False
108+
109+
# Step 3: Reset communication client singleton
110+
print_status("[Restart] Step 3: Resetting singletons...", "info")
111+
try:
112+
from unilabos.app import communication
113+
114+
if hasattr(communication, "_communication_client"):
115+
communication._communication_client = None
116+
print_status("[Restart] Communication client singleton reset", "info")
117+
except Exception as e:
118+
print_status(f"[Restart] Error resetting communication singleton: {e}", "warning")
119+
120+
# Step 4: Wait for threads to finish
121+
print_status("[Restart] Step 4: Waiting for threads to finish...", "info")
122+
time.sleep(3) # Give threads time to finish
123+
124+
# Check remaining threads
125+
remaining_threads = []
126+
for t in threading.enumerate():
127+
if t.name != "MainThread" and t.is_alive():
128+
remaining_threads.append(t.name)
129+
130+
if remaining_threads:
131+
print_status(
132+
f"[Restart] Warning: {len(remaining_threads)} threads still running: {remaining_threads}", "warning"
133+
)
134+
else:
135+
print_status("[Restart] All threads stopped", "info")
136+
137+
# Step 5: Force garbage collection
138+
print_status("[Restart] Step 5: Running garbage collection...", "info")
139+
gc.collect()
140+
gc.collect() # Run twice for weak references
141+
print_status("[Restart] Garbage collection complete", "info")
142+
143+
print_status("[Restart] Cleanup complete. Ready for re-initialization.", "info")
144+
return True

unilabos/app/web/server.py

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import webbrowser
88

9-
import uvicorn
109
from fastapi import FastAPI, Request
1110
from fastapi.middleware.cors import CORSMiddleware
1211
from starlette.responses import Response
@@ -96,15 +95,22 @@ def setup_server() -> FastAPI:
9695
return app
9796

9897

99-
def start_server(host: str = "0.0.0.0", port: int = 8002, open_browser: bool = True) -> None:
98+
def start_server(host: str = "0.0.0.0", port: int = 8002, open_browser: bool = True) -> bool:
10099
"""
101100
启动服务器
102101
103102
Args:
104103
host: 服务器主机
105104
port: 服务器端口
106105
open_browser: 是否自动打开浏览器
106+
107+
Returns:
108+
bool: True if restart was requested, False otherwise
107109
"""
110+
import threading
111+
import time
112+
from uvicorn import Config, Server
113+
108114
# 设置服务器
109115
setup_server()
110116

@@ -123,7 +129,37 @@ def start_server(host: str = "0.0.0.0", port: int = 8002, open_browser: bool = T
123129

124130
# 启动服务器
125131
info(f"[Web] 启动FastAPI服务器: {host}:{port}")
126-
uvicorn.run(app, host=host, port=port, log_config=log_config)
132+
133+
# 使用支持重启的模式
134+
config = Config(app=app, host=host, port=port, log_config=log_config)
135+
server = Server(config)
136+
137+
# 启动服务器线程
138+
server_thread = threading.Thread(target=server.run, daemon=True, name="uvicorn_server")
139+
server_thread.start()
140+
141+
info("[Web] Server started, monitoring for restart requests...")
142+
143+
# 监控重启标志
144+
import unilabos.app.main as main_module
145+
146+
while server_thread.is_alive():
147+
if hasattr(main_module, "_restart_requested") and main_module._restart_requested:
148+
info(
149+
f"[Web] Restart requested via WebSocket, reason: {getattr(main_module, '_restart_reason', 'unknown')}"
150+
)
151+
main_module._restart_requested = False
152+
153+
# 停止服务器
154+
server.should_exit = True
155+
server_thread.join(timeout=5)
156+
157+
info("[Web] Server stopped, ready for restart")
158+
return True
159+
160+
time.sleep(1)
161+
162+
return False
127163

128164

129165
# 当脚本直接运行时启动服务器

unilabos/app/ws_client.py

Lines changed: 50 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -488,11 +488,16 @@ async def _message_handler(self):
488488
async for message in self.websocket:
489489
try:
490490
data = json.loads(message)
491+
message_type = data.get("action", "")
492+
message_data = data.get("data")
491493
if self.session_id and self.session_id == data.get("edge_session"):
492-
await self._process_message(data)
494+
await self._process_message(message_type, message_data)
493495
else:
494-
logger.trace(f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}")
495-
logger.debug(f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}")
496+
if message_type.endswith("_material"):
497+
logger.trace(f"[MessageProcessor] 收到一条归属 {data.get('edge_session')} 的旧消息:{data}")
498+
logger.debug(f"[MessageProcessor] 跳过了一条归属 {data.get('edge_session')} 的旧消息: {data.get('action')}")
499+
else:
500+
await self._process_message(message_type, message_data)
496501
except json.JSONDecodeError:
497502
logger.error(f"[MessageProcessor] Invalid JSON received: {message}")
498503
except Exception as e:
@@ -558,11 +563,8 @@ async def _send_handler(self):
558563
finally:
559564
logger.debug("[MessageProcessor] Send handler stopped")
560565

561-
async def _process_message(self, data: Dict[str, Any]):
566+
async def _process_message(self, message_type: str, message_data: Dict[str, Any]):
562567
"""处理收到的消息"""
563-
message_type = data.get("action", "")
564-
message_data = data.get("data")
565-
566568
logger.debug(f"[MessageProcessor] Processing message: {message_type}")
567569

568570
try:
@@ -575,16 +577,19 @@ async def _process_message(self, data: Dict[str, Any]):
575577
elif message_type == "cancel_action" or message_type == "cancel_task":
576578
await self._handle_cancel_action(message_data)
577579
elif message_type == "add_material":
580+
# noinspection PyTypeChecker
578581
await self._handle_resource_tree_update(message_data, "add")
579582
elif message_type == "update_material":
583+
# noinspection PyTypeChecker
580584
await self._handle_resource_tree_update(message_data, "update")
581585
elif message_type == "remove_material":
586+
# noinspection PyTypeChecker
582587
await self._handle_resource_tree_update(message_data, "remove")
583588
# elif message_type == "session_id":
584589
# self.session_id = message_data.get("session_id")
585590
# logger.info(f"[MessageProcessor] Session ID: {self.session_id}")
586-
elif message_type == "request_reload":
587-
await self._handle_request_reload(message_data)
591+
elif message_type == "request_restart":
592+
await self._handle_request_restart(message_data)
588593
else:
589594
logger.debug(f"[MessageProcessor] Unknown message type: {message_type}")
590595

@@ -894,19 +899,48 @@ def _notify_resource_tree(dev_id, act, item_list):
894899
)
895900
thread.start()
896901

897-
async def _handle_request_reload(self, data: Dict[str, Any]):
902+
async def _handle_request_restart(self, data: Dict[str, Any]):
898903
"""
899-
处理重载请求
904+
处理重启请求
900905
901-
当LabGo发送request_reload时,重新发送设备注册信息
906+
当LabGo发送request_restart时,执行清理并触发重启
902907
"""
903908
reason = data.get("reason", "unknown")
904-
logger.info(f"[MessageProcessor] Received reload request, reason: {reason}")
909+
delay = data.get("delay", 2) # 默认延迟2秒
910+
logger.info(f"[MessageProcessor] Received restart request, reason: {reason}, delay: {delay}s")
905911

906-
# 重新发送host_node_ready信息
912+
# 发送确认消息
907913
if self.websocket_client:
908-
self.websocket_client.publish_host_ready()
909-
logger.info("[MessageProcessor] Re-sent host_node_ready after reload request")
914+
await self.websocket_client.send_message({
915+
"action": "restart_acknowledged",
916+
"data": {"reason": reason, "delay": delay}
917+
})
918+
919+
# 设置全局重启标志
920+
import unilabos.app.main as main_module
921+
main_module._restart_requested = True
922+
main_module._restart_reason = reason
923+
924+
# 延迟后执行清理
925+
await asyncio.sleep(delay)
926+
927+
# 在新线程中执行清理,避免阻塞当前事件循环
928+
def do_cleanup():
929+
import time
930+
time.sleep(0.5) # 给当前消息处理完成的时间
931+
logger.info(f"[MessageProcessor] Starting cleanup for restart, reason: {reason}")
932+
try:
933+
from unilabos.app.utils import cleanup_for_restart
934+
if cleanup_for_restart():
935+
logger.info("[MessageProcessor] Cleanup successful, main() will restart")
936+
else:
937+
logger.error("[MessageProcessor] Cleanup failed")
938+
except Exception as e:
939+
logger.error(f"[MessageProcessor] Error during cleanup: {e}")
940+
941+
cleanup_thread = threading.Thread(target=do_cleanup, name="RestartCleanupThread", daemon=True)
942+
cleanup_thread.start()
943+
logger.info(f"[MessageProcessor] Restart cleanup scheduled")
910944

911945
async def _send_action_state_response(
912946
self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int

unilabos/ros/main_slave_run.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
23
# from nt import device_encoding
34
import threading
45
import time
@@ -55,7 +56,11 @@ def main(
5556
) -> None:
5657
"""主函数"""
5758

58-
rclpy.init(args=rclpy_init_args)
59+
# Support restart - check if rclpy is already initialized
60+
if not rclpy.ok():
61+
rclpy.init(args=rclpy_init_args)
62+
else:
63+
logger.info("[ROS] rclpy already initialized, reusing context")
5964
executor = rclpy.__executor = MultiThreadedExecutor()
6065
# 创建主机节点
6166
host_node = HostNode(
@@ -88,7 +93,7 @@ def main(
8893
joint_republisher = JointRepublisher("joint_republisher", host_node.resource_tracker)
8994
# lh_joint_pub = LiquidHandlerJointPublisher(
9095
# resources_config=resources_list, resource_tracker=host_node.resource_tracker
91-
# )
96+
# )
9297
executor.add_node(resource_mesh_manager)
9398
executor.add_node(joint_republisher)
9499
# executor.add_node(lh_joint_pub)

0 commit comments

Comments
 (0)