@@ -46,6 +46,7 @@ def __init__(self):
4646 self .websocket : Optional [websockets .WebSocketClientProtocol ] = None
4747 self .connection_active = False
4848 self .reconnect_task : Optional [asyncio .Task ] = None
49+ self .websocket_loop : Optional [asyncio .AbstractEventLoop ] = None # Store event loop reference
4950
5051 # Callback group for async operations
5152 self .callback_group = ReentrantCallbackGroup ()
@@ -109,12 +110,14 @@ def _run_websocket_client(self):
109110 """Run WebSocket client in separate thread with its own event loop"""
110111 loop = asyncio .new_event_loop ()
111112 asyncio .set_event_loop (loop )
113+ self .websocket_loop = loop # Store loop reference for thread-safe calls
112114
113115 try :
114116 loop .run_until_complete (self ._maintain_connection ())
115117 except Exception as e :
116118 self .get_logger ().error (f"WebSocket client error: { e } " )
117119 finally :
120+ self .websocket_loop = None # Clear reference
118121 loop .close ()
119122
120123 async def _maintain_connection (self ):
@@ -209,10 +212,13 @@ def handle_virtual_request(self, msg: String):
209212 }
210213
211214 # Send to voice agent
212- asyncio .run_coroutine_threadsafe (
213- self ._send_to_voice_agent (command ),
214- self .websocket_thread ._target .__globals__ .get ('loop' ) if hasattr (self .websocket_thread , '_target' ) else None
215- )
215+ if self .websocket_loop and not self .websocket_loop .is_closed ():
216+ asyncio .run_coroutine_threadsafe (
217+ self ._send_to_voice_agent (command ),
218+ self .websocket_loop
219+ )
220+ else :
221+ self .get_logger ().warn ("Cannot send virtual request - WebSocket event loop not available" )
216222
217223 self .get_logger ().info (f"Forwarded virtual request: { request_data .get ('request_type' )} " )
218224
@@ -236,10 +242,13 @@ def handle_command(self, msg: String):
236242 }
237243
238244 # Send to voice agent
239- asyncio .run_coroutine_threadsafe (
240- self ._send_to_voice_agent (command ),
241- self .websocket_thread ._target .__globals__ .get ('loop' ) if hasattr (self .websocket_thread , '_target' ) else None
242- )
245+ if self .websocket_loop and not self .websocket_loop .is_closed ():
246+ asyncio .run_coroutine_threadsafe (
247+ self ._send_to_voice_agent (command ),
248+ self .websocket_loop
249+ )
250+ else :
251+ self .get_logger ().warn ("Cannot send command - WebSocket event loop not available" )
243252
244253 self .get_logger ().info (f"Forwarded command: { command_data .get ('action' )} " )
245254
0 commit comments