Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 178 additions & 1 deletion launch_ros/launch_ros/actions/load_composable_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,28 @@

"""Module for the LoadComposableNodes action."""

import asyncio
from pathlib import Path
import threading
import os

from typing import List
from typing import Optional
from typing import Text
from typing import Union

import rclpy
import composition_interfaces.srv

from launch.action import Action
from launch.event import Event
from launch.event_handlers import OnShutdown
from launch.frontend import Entity
from launch.frontend import expose_action
from launch.frontend import Parser
from launch.launch_context import LaunchContext
import launch.logging
from launch.some_entities_type import SomeEntitiesType
from launch.some_substitutions_type import SomeSubstitutionsType
from launch.some_substitutions_type import SomeSubstitutionsType_types_tuple
from launch.utilities import ensure_argument_type
Expand All @@ -44,7 +50,7 @@
from .lifecycle_transition import LifecycleTransition

from ..descriptions import ComposableNode
from ..ros_adapters import get_ros_node
from ..ros_adapters import get_ros_node, ROSAdapter
from ..utilities import add_node_name
from ..utilities import evaluate_parameters
from ..utilities import get_node_name_count
Expand All @@ -63,6 +69,7 @@ def __init__(
*,
composable_node_descriptions: List[ComposableNode],
target_container: Union[SomeSubstitutionsType, ComposableNodeContainer],
unload_on_shutdown: Union[bool, SomeSubstitutionsType] = False,
**kwargs,
) -> None:
"""
Expand All @@ -85,11 +92,21 @@ def __init__(
'target_container',
'LoadComposableNodes'
)
ensure_argument_type(
unload_on_shutdown,
list(SomeSubstitutionsType_types_tuple) +
[bool],
'unload_on_shutdown',
'LoadComposableNodes'
)
super().__init__(**kwargs)
self.__composable_node_descriptions = composable_node_descriptions
self.__target_container = target_container
self.__unload_on_shutdown = unload_on_shutdown
self.__final_target_container_name: Optional[Text] = None
self.__logger = launch.logging.get_logger(__name__)
self.__loaded_nodes = {} # Track loaded nodes for unloading
self.__no_shutdown_future: Optional[asyncio.Future] = None

@classmethod
def parse(cls, entity: Entity, parser: Parser):
Expand All @@ -105,6 +122,9 @@ def parse(cls, entity: Entity, parser: Parser):
composable_node_cls, composable_node_kwargs = ComposableNode.parse(parser, entity)
kwargs['composable_node_descriptions'].append(
composable_node_cls(**composable_node_kwargs))

kwargs['unload_on_shutdown'] = parser.parse_if_substitutions(
entity.get_attr('unload_on_shutdown', data_type=bool, optional=True))

return cls, kwargs

Expand Down Expand Up @@ -162,6 +182,7 @@ def unblock(future):

node_name = response.full_node_name if response.full_node_name else request.node_name
if response.success:
self.__loaded_nodes[response.unique_id] = node_name
if node_name is not None:
add_node_name(context, node_name)
node_name_count = get_node_name_count(context, node_name)
Expand Down Expand Up @@ -205,6 +226,146 @@ def _load_in_sequence(
)
)

def _unload_node(
self,
request: composition_interfaces.srv.UnloadNode.Request,
client: rclpy.client.Client
) -> None:
"""
Unload node synchronously.

:param request: service request to unload a node
:param client: service client to call the service
"""
if not client.service_is_ready():
self.__logger.info(
"Could not unload node '{}'. Is '{}' down?".format(
self.__loaded_nodes[request.unique_id], self.__final_target_container_name,
client.srv_name
)
)
return

# Asynchronously wait on service call so that we can periodically check for shutdown
event = threading.Event()

def unblock(future):
nonlocal event
event.set()

self.__logger.debug(
"Calling the '{}' service with request '{}'".format(
client.srv_name, request
)
)

response_future = client.call_async(request)
response_future.add_done_callback(unblock)

timeout = 10.0
while not event.wait(1.0):
timeout -= 1.0
if timeout <= 0.0:
self.__logger.warning(
"Abandoning wait for the '{}' service response, due to timeout.".format(
client.srv_name),
)
response_future.cancel()
return

# Get response
if response_future.exception() is not None:
raise response_future.exception()
response = response_future.result()

self.__logger.debug("Received response '{}'".format(response))

node_name = self.__loaded_nodes[request.unique_id]
if response.success:
del self.__loaded_nodes[request.unique_id]
self.__logger.info("Unloaded node '{}' in container '{}'".format(
node_name, self.__final_target_container_name
))
else:
self.__logger.error(
"Failed to unload node '{}' in container '{}': {}".format(
node_name, self.__final_target_container_name,
response.error_message
)
)

def _unload_in_sequence(
self,
unload_node_requests: List[composition_interfaces.srv.UnloadNode.Request],
context: LaunchContext,
ros_adapter: ROSAdapter = None,
client: rclpy.client.Client = None
) -> None:
"""
Unload composable nodes sequentially.

:param unload_node_requests: a list of LoadNode service requests to execute
:param context: current launch context
:param ros_adapter: the ROS adapter to use for unloading
:param client: the service client to use for unloading
"""
# If this is the first call, a ROS node has to be created.
# We need to create it ourselves, since the global ROS adapter also uses OnShutdown
# to terminate, which may happen before our handler is called.
if ros_adapter is None:
# Init an auxiliary ROS adapter solely for processing the unloading requests.
ros_adapter = ROSAdapter(autostart=False)
ros_adapter.start(custom_name='launch_ros_{}_unload_nodes_{}'.format(os.getpid(), id(self)))
# Create the service client.
client = ros_adapter.ros_node.create_client(
composition_interfaces.srv.UnloadNode, '{}/_container/unload_node'.format(
self.__final_target_container_name
)
)

# Process the first request and schedule the rest.
next_unload_node_request = unload_node_requests[0]
unload_node_requests = unload_node_requests[1:]
self._unload_node(next_unload_node_request, client)
if len(unload_node_requests) > 0:
context.add_completion_future(
context.asyncio_loop.run_in_executor(
None, self._unload_in_sequence, unload_node_requests, context, ros_adapter, client
)
)
else:
# If this was the last request, stop the ROS node.
ros_adapter.shutdown()

def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeEntitiesType]:
"""
Unload all nodes when the launch context is shutdown.
This handler is only registered if `unload_on_shutdown` is True.
"""
# Generate unload requests
unload_node_requests = []
for unique_id, node_name in self.__loaded_nodes.items():
request = composition_interfaces.srv.UnloadNode.Request()
request.unique_id = unique_id
unload_node_requests.append(request)

# Process the unload requests
if unload_node_requests:
context.add_completion_future(
context.asyncio_loop.run_in_executor(
None, self._unload_in_sequence, unload_node_requests, context
)
)

# Cancel the dummy coroutine so that the launch process can exit
# The exit will happen once the futures from _unload_in_sequence complete
if self.__no_shutdown_future is not None:
self.__no_shutdown_future.cancel()

def get_asyncio_future(self) -> Optional[asyncio.Future]:
"""Return the future used to track completion."""
return self.__no_shutdown_future

def execute(
self,
context: LaunchContext
Expand Down Expand Up @@ -275,6 +436,22 @@ def execute(
if len(autostart_actions) != 0:
return autostart_actions

if self.__unload_on_shutdown:
# Register a shutdown event handler to unload the nodes when the launch context is shutdown.
context.register_event_handler(
OnShutdown(on_shutdown=self.__on_shutdown)
)

# Start dummy coroutine so that the launch process does not exit
async def dummy_coroutine():
try:
while True:
await asyncio.sleep(1.0)
except asyncio.CancelledError:
pass
self.__no_shutdown_future = context.asyncio_loop.create_task(
dummy_coroutine()
)

def get_composable_node_load_request(
composable_node_description: ComposableNode,
Expand Down
10 changes: 7 additions & 3 deletions launch_ros/launch_ros/ros_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ def __init__(
if autostart:
self.start()

def start(self):
"""Start ROS adapter."""
def start(self, custom_name: Optional[str] = None):
"""
Start ROS adapter.

:param: custom_name Custom name for the ROS node.
"""
if self.__is_running:
raise RuntimeError('Cannot start a ROS adapter that is already running')
self.__ros_context = rclpy.Context()
rclpy.init(args=self.__argv, context=self.__ros_context)
self.__ros_node = rclpy.create_node(
'launch_ros_{}'.format(os.getpid()),
'launch_ros_{}'.format(os.getpid()) if custom_name is None else custom_name,
context=self.__ros_context
)
self.__ros_executor = SingleThreadedExecutor(context=self.__ros_context)
Expand Down