-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwebsocket.py
More file actions
387 lines (318 loc) · 13.6 KB
/
websocket.py
File metadata and controls
387 lines (318 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# Written by Leia Hannes, Hannah Loly, Chris Rogers
# Center for Engineering Education and Outreach
# Summer of 2025
import rclpy, time
from rclpy.node import Node
from std_msgs.msg import String
import json
import asyncio
import websockets
import ssl
import json
import threading
import signal
import sys
from rclpy.executors import MultiThreadedExecutor
uri = "wss://chrisrogers.pyscriptapps.com/talking-on-a-channel/api/channels/hackathon"
class WebSocketPublisherNode(Node):
"""Node that publishes incoming WebSocket messages to ROS topics"""
def __init__(self):
super().__init__('websocket_publisher_node')
queue_size = 10
# Publisher for incoming WebSocket data
self.publisher_ = self.create_publisher(String, 'websocket/incoming', queue_size)
# Publisher for connection status
self.status_publisher_ = self.create_publisher(String, 'websocket/status', queue_size)
self.get_logger().info('WebSocket Publisher Node initialized')
def publish_websocket_message(self, data):
"""Publish incoming WebSocket data to ROS topic"""
try:
msg = String()
msg.data = json.dumps(data)
self.publisher_.publish(msg)
self.get_logger().info(f'Published WebSocket message: {data.get("topic", "unknown")}')
except Exception as e:
self.get_logger().error(f'Error publishing WebSocket message: {e}')
def publish_status(self, status):
"""Publish WebSocket connection status"""
try:
msg = String()
msg.data = status
self.status_publisher_.publish(msg)
self.get_logger().info(f'WebSocket status: {status}')
except Exception as e:
self.get_logger().error(f'Error publishing status: {e}')
class WebSocketSubscriberNode(Node):
"""Node that subscribes to ROS topics and sends messages via WebSocket"""
def __init__(self):
super().__init__('websocket_subscriber_node')
queue_size = 10
# Subscriber for outgoing messages
self.subscription = self.create_subscription(
String,
'websocket/outgoing',
self.outgoing_message_callback,
queue_size
)
# Subscriber for WebSocket commands (connect, disconnect, etc.)
self.command_subscription = self.create_subscription(
String,
'websocket/command',
self.command_callback,
queue_size
)
# WebSocket client reference
self.websocket_client = None
self.get_logger().info('WebSocket Subscriber Node initialized')
def set_websocket_client(self, client):
"""Set reference to WebSocket client"""
self.websocket_client = client
def outgoing_message_callback(self, msg):
"""Handle ROS messages that should be sent to WebSocket"""
try:
if self.websocket_client and self.websocket_client.is_connected():
# Parse the ROS message data
try:
message_data = json.loads(msg.data)
except json.JSONDecodeError:
# If not JSON, send as simple message
message_data = {
'type': 'ros_message',
'topic': 'ros/outgoing',
'value': msg.data,
'timestamp': self.get_clock().now().to_msg()._sec
}
# Send to WebSocket
asyncio.run_coroutine_threadsafe(
self.websocket_client.send_message(message_data),
self.websocket_client.loop
)
self.get_logger().info(f'Queued message for WebSocket: {message_data}')
else:
self.get_logger().warn('WebSocket not connected - message not sent')
except Exception as e:
self.get_logger().error(f'Error handling outgoing message: {e}')
def command_callback(self, msg):
"""Handle WebSocket commands (connect, disconnect, etc.)"""
try:
command = msg.data.lower()
self.get_logger().info(f'Received WebSocket command: {command}')
if command == 'reconnect' and self.websocket_client:
asyncio.run_coroutine_threadsafe(
self.websocket_client.reconnect(),
self.websocket_client.loop
)
except Exception as e:
self.get_logger().error(f'Error handling command: {e}')
class WebSocketClient:
"""WebSocket client that communicates with both ROS nodes"""
def __init__(self, publisher_node, subscriber_node):
self.uri = uri
self.publisher_node = publisher_node
self.subscriber_node = subscriber_node
self.websocket = None
self.running = False
self.loop = None
self.send_queue = asyncio.Queue()
self._connected = False
def is_connected(self):
"""Check if WebSocket is connected"""
if not self._connected or not self.websocket:
return False
if hasattr(self.websocket, 'closed'):
return not self.websocket.closed
return True
#return self._connected and self.websocket and not self.websocket.closed
async def send_message(self, message):
"""Queue a message to be sent to WebSocket"""
if self.running:
await self.send_queue.put(message)
async def reconnect(self):
"""Reconnect to WebSocket server"""
self.get_logger().info("Reconnecting to WebSocket...")
self.running = False
if self.websocket:
await self.websocket.close()
await asyncio.sleep(2) # Wait before reconnecting
await self.connect_and_run()
async def connect_and_run(self):
"""Main WebSocket connection and message handling"""
self.loop = asyncio.get_event_loop()
try:
self.publisher_node.publish_status("connecting")
async with websockets.connect(self.uri, ssl=True) as websocket:
self.websocket = websocket
self.running = True
self._connected = True
self.publisher_node.publish_status("connected")
print("WebSocket connected successfully")
# Send initial message
initial_message = {'topic': '/fred/image', 'value': 9}
await websocket.send(json.dumps(initial_message))
print(f"Sent initial message: {initial_message}")
# Run send and receive tasks concurrently
await asyncio.gather(
self.send_handler(),
self.receive_handler(),
return_exceptions=True
)
except Exception as e:
self.publisher_node.publish_status(f"error: {str(e)}")
print(f"WebSocket error: {e}")
finally:
self._connected = False
self.running = False
self.publisher_node.publish_status("disconnected")
async def send_handler(self):
"""Handle outgoing messages from ROS to WebSocket"""
while self.running and self.is_connected():
try:
# Wait for messages to send
message = await asyncio.wait_for(self.send_queue.get(), timeout=1.0)
await self.websocket.send(json.dumps(message))
print(f"Sent to WebSocket: {message}")
except asyncio.TimeoutError:
#print("Send handler timeout (no messages in queue)")
continue
except Exception as e:
print(f"Send handler error: {e}")
break
async def receive_handler(self):
"""Handle incoming WebSocket messages"""
try:
async for message in self.websocket:
try:
data = json.loads(message)
# Filter and process messages
if data.get('type') == 'data' and data.get('payload'):
payload = json.loads(data['payload'])
if '/URarm' in payload['topic']: #'heartbeat' not in payload.get('topic', ''):
# Send to publisher node
self.publisher_node.publish_websocket_message(payload)
print(f'Received from WebSocket: {payload.get("value")}')
except json.JSONDecodeError:
print(f"Non-JSON message received: {message}")
# Still publish as raw text
self.publisher_node.publish_websocket_message({'raw_message': message})
except Exception as e:
print(f"Error processing message: {e}")
except websockets.exceptions.ConnectionClosed:
print("WebSocket connection closed")
except Exception as e:
print(f"Receive handler error: {e}")
def run_websocket_client(publisher_node, subscriber_node, stop_event):
"""Run WebSocket client in asyncio event loop"""
async def websocket_main():
client = WebSocketClient(publisher_node, subscriber_node)
subscriber_node.set_websocket_client(client)
try:
while not stop_event.is_set():
await client.connect_and_run()
if not stop_event.is_set():
print("WebSocket disconnected, attempting to reconnect in 5 seconds...")
await asyncio.sleep(5)
except Exception as e:
print(f"WebSocket client error: {e}")
finally:
print("WebSocket client stopped")
# Create new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(websocket_main())
finally:
loop.close()
def start_websocket():
"""Main function with two separate nodes"""
# Initialize ROS2
rclpy.init()
# Create both nodes
publisher_node = WebSocketPublisherNode()
subscriber_node = WebSocketSubscriberNode()
# Create executor with appropriate number of threads
# 2 threads: one for each node's callbacks
executor = MultiThreadedExecutor(num_threads=2)
executor.add_node(publisher_node)
executor.add_node(subscriber_node)
# Event for coordinating shutdown
stop_event = threading.Event()
# Set up signal handler for graceful shutdown
def signal_handler(signum, frame):
print("\nShutdown signal received...")
stop_event.set()
executor.shutdown()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Start WebSocket client in a separate thread
websocket_thread = threading.Thread(
target=run_websocket_client,
args=(publisher_node, subscriber_node, stop_event),
daemon=True
)
websocket_thread.start()
try:
print("Starting ROS2 MultiThreadedExecutor with WebSocket Publisher and Subscriber nodes...")
print("Topics:")
print(" - websocket/incoming: Incoming WebSocket messages")
print(" - websocket/outgoing: Send messages to WebSocket")
print(" - websocket/status: Connection status updates")
print(" - websocket/command: WebSocket commands (reconnect, etc.)")
# This will block and handle all ROS2 callbacks in multiple threads
executor.spin()
except KeyboardInterrupt:
print("\nKeyboard interrupt received")
finally:
print("Shutting down...")
stop_event.set()
# Clean shutdown
if websocket_thread.is_alive():
websocket_thread.join(timeout=5)
publisher_node.destroy_node()
subscriber_node.destroy_node()
rclpy.shutdown()
print("Shutdown complete")
class WebSubscriber(Node):
def __init__(self):
super().__init__("test_sub")
self.subscription = self.create_subscription(String, '/websocket/incoming', self.callback, 10)
self.states = None
def callback(self, msg):
self.states = msg.data
self.done = True
def read(self):
self.done = False
start = time.monotonic()
while not self.done:
rclpy.spin_once(self, timeout_sec=0.1)
if time.monotonic() - start > 0.1:
return None
time.sleep(0.1)
self.done = False
return self.states
class WebPublisher(Node):
def __init__(self):
super().__init__('websocket_message_sender')
self.publisher_ = self.create_publisher(String, 'websocket/outgoing', 10)
self.states = None
def send(self, topic, value):
message = {
'type': 'ros_message',
'topic': topic,
'value': value,
'timestamp': self.get_clock().now().to_msg().sec
}
msg = String()
msg.data = json.dumps(message)
self.publisher_.publish(msg)
self.get_logger().info(f'Published to websocket/outgoing: {msg.data}')
class Channel_Communication(Node):
def __init__(self):
self.sub = WebSubscriber()
self.pub = WebPublisher()
def read_data(self):
return self.sub.read()
def send_data(self, topic, value):
self.pub.send(topic, value)
def clear_queue(self):
for i in range(10):
self.read_data()