diff --git a/launch_ros/launch_ros/actions/load_composable_nodes.py b/launch_ros/launch_ros/actions/load_composable_nodes.py index ba4f6e21c..17dc6e2b7 100644 --- a/launch_ros/launch_ros/actions/load_composable_nodes.py +++ b/launch_ros/launch_ros/actions/load_composable_nodes.py @@ -14,22 +14,28 @@ """Module for the LoadComposableNodes action.""" +import asyncio from pathlib import Path import threading +import os from typing import List from typing import Optional from typing import Text from typing import Union +import rclpy import composition_interfaces.srv from launch.action import Action +from launch.event import Event +from launch.event_handlers import OnShutdown from launch.frontend import Entity from launch.frontend import expose_action from launch.frontend import Parser from launch.launch_context import LaunchContext import launch.logging +from launch.some_entities_type import SomeEntitiesType from launch.some_substitutions_type import SomeSubstitutionsType from launch.some_substitutions_type import SomeSubstitutionsType_types_tuple from launch.utilities import ensure_argument_type @@ -44,7 +50,7 @@ from .lifecycle_transition import LifecycleTransition from ..descriptions import ComposableNode -from ..ros_adapters import get_ros_node +from ..ros_adapters import get_ros_node, ROSAdapter from ..utilities import add_node_name from ..utilities import evaluate_parameters from ..utilities import get_node_name_count @@ -63,6 +69,7 @@ def __init__( *, composable_node_descriptions: List[ComposableNode], target_container: Union[SomeSubstitutionsType, ComposableNodeContainer], + unload_on_shutdown: Union[bool, SomeSubstitutionsType] = False, **kwargs, ) -> None: """ @@ -85,11 +92,21 @@ def __init__( 'target_container', 'LoadComposableNodes' ) + ensure_argument_type( + unload_on_shutdown, + list(SomeSubstitutionsType_types_tuple) + + [bool], + 'unload_on_shutdown', + 'LoadComposableNodes' + ) super().__init__(**kwargs) self.__composable_node_descriptions = composable_node_descriptions self.__target_container = target_container + self.__unload_on_shutdown = unload_on_shutdown self.__final_target_container_name: Optional[Text] = None self.__logger = launch.logging.get_logger(__name__) + self.__loaded_nodes = {} # Track loaded nodes for unloading + self.__no_shutdown_future: Optional[asyncio.Future] = None @classmethod def parse(cls, entity: Entity, parser: Parser): @@ -105,6 +122,9 @@ def parse(cls, entity: Entity, parser: Parser): composable_node_cls, composable_node_kwargs = ComposableNode.parse(parser, entity) kwargs['composable_node_descriptions'].append( composable_node_cls(**composable_node_kwargs)) + + kwargs['unload_on_shutdown'] = parser.parse_if_substitutions( + entity.get_attr('unload_on_shutdown', data_type=bool, optional=True)) return cls, kwargs @@ -162,6 +182,7 @@ def unblock(future): node_name = response.full_node_name if response.full_node_name else request.node_name if response.success: + self.__loaded_nodes[response.unique_id] = node_name if node_name is not None: add_node_name(context, node_name) node_name_count = get_node_name_count(context, node_name) @@ -205,6 +226,146 @@ def _load_in_sequence( ) ) + def _unload_node( + self, + request: composition_interfaces.srv.UnloadNode.Request, + client: rclpy.client.Client + ) -> None: + """ + Unload node synchronously. + + :param request: service request to unload a node + :param client: service client to call the service + """ + if not client.service_is_ready(): + self.__logger.info( + "Could not unload node '{}'. Is '{}' down?".format( + self.__loaded_nodes[request.unique_id], self.__final_target_container_name, + client.srv_name + ) + ) + return + + # Asynchronously wait on service call so that we can periodically check for shutdown + event = threading.Event() + + def unblock(future): + nonlocal event + event.set() + + self.__logger.debug( + "Calling the '{}' service with request '{}'".format( + client.srv_name, request + ) + ) + + response_future = client.call_async(request) + response_future.add_done_callback(unblock) + + timeout = 10.0 + while not event.wait(1.0): + timeout -= 1.0 + if timeout <= 0.0: + self.__logger.warning( + "Abandoning wait for the '{}' service response, due to timeout.".format( + client.srv_name), + ) + response_future.cancel() + return + + # Get response + if response_future.exception() is not None: + raise response_future.exception() + response = response_future.result() + + self.__logger.debug("Received response '{}'".format(response)) + + node_name = self.__loaded_nodes[request.unique_id] + if response.success: + del self.__loaded_nodes[request.unique_id] + self.__logger.info("Unloaded node '{}' in container '{}'".format( + node_name, self.__final_target_container_name + )) + else: + self.__logger.error( + "Failed to unload node '{}' in container '{}': {}".format( + node_name, self.__final_target_container_name, + response.error_message + ) + ) + + def _unload_in_sequence( + self, + unload_node_requests: List[composition_interfaces.srv.UnloadNode.Request], + context: LaunchContext, + ros_adapter: ROSAdapter = None, + client: rclpy.client.Client = None + ) -> None: + """ + Unload composable nodes sequentially. + + :param unload_node_requests: a list of LoadNode service requests to execute + :param context: current launch context + :param ros_adapter: the ROS adapter to use for unloading + :param client: the service client to use for unloading + """ + # If this is the first call, a ROS node has to be created. + # We need to create it ourselves, since the global ROS adapter also uses OnShutdown + # to terminate, which may happen before our handler is called. + if ros_adapter is None: + # Init an auxiliary ROS adapter solely for processing the unloading requests. + ros_adapter = ROSAdapter(autostart=False) + ros_adapter.start(custom_name='launch_ros_{}_unload_nodes_{}'.format(os.getpid(), id(self))) + # Create the service client. + client = ros_adapter.ros_node.create_client( + composition_interfaces.srv.UnloadNode, '{}/_container/unload_node'.format( + self.__final_target_container_name + ) + ) + + # Process the first request and schedule the rest. + next_unload_node_request = unload_node_requests[0] + unload_node_requests = unload_node_requests[1:] + self._unload_node(next_unload_node_request, client) + if len(unload_node_requests) > 0: + context.add_completion_future( + context.asyncio_loop.run_in_executor( + None, self._unload_in_sequence, unload_node_requests, context, ros_adapter, client + ) + ) + else: + # If this was the last request, stop the ROS node. + ros_adapter.shutdown() + + def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeEntitiesType]: + """ + Unload all nodes when the launch context is shutdown. + This handler is only registered if `unload_on_shutdown` is True. + """ + # Generate unload requests + unload_node_requests = [] + for unique_id, node_name in self.__loaded_nodes.items(): + request = composition_interfaces.srv.UnloadNode.Request() + request.unique_id = unique_id + unload_node_requests.append(request) + + # Process the unload requests + if unload_node_requests: + context.add_completion_future( + context.asyncio_loop.run_in_executor( + None, self._unload_in_sequence, unload_node_requests, context + ) + ) + + # Cancel the dummy coroutine so that the launch process can exit + # The exit will happen once the futures from _unload_in_sequence complete + if self.__no_shutdown_future is not None: + self.__no_shutdown_future.cancel() + + def get_asyncio_future(self) -> Optional[asyncio.Future]: + """Return the future used to track completion.""" + return self.__no_shutdown_future + def execute( self, context: LaunchContext @@ -275,6 +436,22 @@ def execute( if len(autostart_actions) != 0: return autostart_actions + if self.__unload_on_shutdown: + # Register a shutdown event handler to unload the nodes when the launch context is shutdown. + context.register_event_handler( + OnShutdown(on_shutdown=self.__on_shutdown) + ) + + # Start dummy coroutine so that the launch process does not exit + async def dummy_coroutine(): + try: + while True: + await asyncio.sleep(1.0) + except asyncio.CancelledError: + pass + self.__no_shutdown_future = context.asyncio_loop.create_task( + dummy_coroutine() + ) def get_composable_node_load_request( composable_node_description: ComposableNode, diff --git a/launch_ros/launch_ros/ros_adapters.py b/launch_ros/launch_ros/ros_adapters.py index 926644bea..c000e2911 100644 --- a/launch_ros/launch_ros/ros_adapters.py +++ b/launch_ros/launch_ros/ros_adapters.py @@ -51,14 +51,18 @@ def __init__( if autostart: self.start() - def start(self): - """Start ROS adapter.""" + def start(self, custom_name: Optional[str] = None): + """ + Start ROS adapter. + + :param: custom_name Custom name for the ROS node. + """ if self.__is_running: raise RuntimeError('Cannot start a ROS adapter that is already running') self.__ros_context = rclpy.Context() rclpy.init(args=self.__argv, context=self.__ros_context) self.__ros_node = rclpy.create_node( - 'launch_ros_{}'.format(os.getpid()), + 'launch_ros_{}'.format(os.getpid()) if custom_name is None else custom_name, context=self.__ros_context ) self.__ros_executor = SingleThreadedExecutor(context=self.__ros_context)