Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 41 additions & 30 deletions rosbridge_server/scripts/rosbridge_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.


import asyncio
import signal
import sys
import threading
import time

import rclpy
Expand All @@ -46,26 +48,18 @@
from rosbridge_library.capabilities.unadvertise_service import UnadvertiseService
from std_msgs.msg import Int32
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.netutil import bind_sockets
from tornado.web import Application

from rosbridge_server import ClientManager, RosbridgeWebSocket


def start_hook():
IOLoop.instance().start()


def shutdown_hook():
IOLoop.instance().stop()


class RosbridgeWebsocketNode(Node):
def __init__(self):
super().__init__("rosbridge_websocket")

RosbridgeWebSocket.node_handle = self
RosbridgeWebSocket.event_loop = asyncio.get_event_loop()

##################################################
# Parameter handling #
Expand Down Expand Up @@ -332,31 +326,48 @@ def protocol_parameter_handling(self):
CallService.services_glob = RosbridgeWebSocket.services_glob


def main(args=None):
if args is None:
args = sys.argv
def start_ros_thread(node, shutdown_event):
def spin_ros():
executor = rclpy.executors.SingleThreadedExecutor()
executor.add_node(node)
while rclpy.ok() and not shutdown_event.is_set():
executor.spin_once(timeout_sec=1.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worried about this timeout being a hard-coded value, although it is just one spin...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need some timeout to make sure the thread wakes up once in a while to check for shutdown event. Alternatively, I can try running executor.spin() and calling executor.shutdown() from the main thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that and now one of the tests is timing out. I'm changing this PR to draft until I'll figure this out

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests are timing out due to bugs in rclpy. I submitted a PR with the fix in ros2/rclpy#1469.

node.destroy_node()
rclpy.shutdown()

ros_thread = threading.Thread(target=spin_ros, daemon=True)
ros_thread.start()
return ros_thread

rclpy.init(args=args)

async def async_main():
rclpy.init(args=sys.argv, signal_handler_options=rclpy.signals.SignalHandlerOptions.NO)
node = RosbridgeWebsocketNode()
shutdown_event = threading.Event()
ros_thread = start_ros_thread(node, shutdown_event)

executor = rclpy.executors.SingleThreadedExecutor()
executor.add_node(node)
loop = asyncio.get_running_loop()
stop_event = asyncio.Event()
signal_handled = False

def spin_ros():
executor.spin_once(timeout_sec=0.01)
if not rclpy.ok():
shutdown_hook()

spin_callback = PeriodicCallback(spin_ros, 1)
spin_callback.start()
try:
start_hook()
node.destroy_node()
rclpy.shutdown()
except KeyboardInterrupt:
def handle_signal():
nonlocal signal_handled
if signal_handled:
return
print("Exiting due to SIGINT")
finally:
shutdown_hook() # shutdown hook to stop the server
shutdown_event.set()
stop_event.set()
signal_handled = True

for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, handle_signal)

await stop_event.wait()
ros_thread.join(timeout=2.0)


def main():
asyncio.run(async_main())


if __name__ == "__main__":
Expand Down
11 changes: 6 additions & 5 deletions rosbridge_server/src/rosbridge_server/websocket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,19 @@
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

import asyncio
import sys
import threading
import traceback
import uuid
from collections import deque
from functools import partial, wraps
from functools import wraps

from rosbridge_library.rosbridge_protocol import RosbridgeProtocol
from rosbridge_library.util import bson
from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
from tornado.websocket import WebSocketClosedError, WebSocketHandler

_io_loop = IOLoop.instance()


def _log_exception():
"""Log the most recent exception to ROS."""
Expand Down Expand Up @@ -124,6 +122,7 @@ class RosbridgeWebSocket(WebSocketHandler):
unregister_timeout = 10.0 # seconds
bson_only_mode = False
node_handle = None
event_loop = None

@log_exceptions
def open(self):
Expand Down Expand Up @@ -174,14 +173,16 @@ def on_close(self):
self.incoming_queue.finish()

def send_message(self, message, compression="none"):
cls = self.__class__

if isinstance(message, bson.BSON):
binary = True
elif compression in ["cbor", "cbor-raw"]:
binary = True
else:
binary = False

_io_loop.add_callback(partial(self.prewrite_message, message, binary))
asyncio.run_coroutine_threadsafe(self.prewrite_message(message, binary), cls.event_loop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If event_loop is None (its default value), will this error? Should we check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably will, the same with node_handle. I'm not happy with passing arguments through class variables but I just tried to adhere to how it is currently done.


async def prewrite_message(self, message, binary):
cls = self.__class__
Expand Down