1414
1515"""Module for the LoadComposableNodes action."""
1616
17+ import asyncio
1718from pathlib import Path
1819import threading
20+ import os
1921
2022from typing import List
2123from typing import Optional
2224from typing import Text
2325from typing import Union
2426
27+ import rclpy
2528import composition_interfaces .srv
2629
2730from launch .action import Action
31+ from launch .event import Event
32+ from launch .event_handlers import OnShutdown
2833from launch .frontend import Entity
2934from launch .frontend import expose_action
3035from launch .frontend import Parser
3136from launch .launch_context import LaunchContext
3237import launch .logging
38+ from launch .some_entities_type import SomeEntitiesType
3339from launch .some_substitutions_type import SomeSubstitutionsType
3440from launch .some_substitutions_type import SomeSubstitutionsType_types_tuple
3541from launch .utilities import ensure_argument_type
4450from .lifecycle_transition import LifecycleTransition
4551
4652from ..descriptions import ComposableNode
47- from ..ros_adapters import get_ros_node
53+ from ..ros_adapters import get_ros_node , ROSAdapter
4854from ..utilities import add_node_name
4955from ..utilities import evaluate_parameters
5056from ..utilities import get_node_name_count
@@ -63,6 +69,7 @@ def __init__(
6369 * ,
6470 composable_node_descriptions : List [ComposableNode ],
6571 target_container : Union [SomeSubstitutionsType , ComposableNodeContainer ],
72+ unload_on_shutdown : Union [bool , SomeSubstitutionsType ] = False ,
6673 ** kwargs ,
6774 ) -> None :
6875 """
@@ -85,11 +92,21 @@ def __init__(
8592 'target_container' ,
8693 'LoadComposableNodes'
8794 )
95+ ensure_argument_type (
96+ unload_on_shutdown ,
97+ list (SomeSubstitutionsType_types_tuple ) +
98+ [bool ],
99+ 'unload_on_shutdown' ,
100+ 'LoadComposableNodes'
101+ )
88102 super ().__init__ (** kwargs )
89103 self .__composable_node_descriptions = composable_node_descriptions
90104 self .__target_container = target_container
105+ self .__unload_on_shutdown = unload_on_shutdown
91106 self .__final_target_container_name : Optional [Text ] = None
92107 self .__logger = launch .logging .get_logger (__name__ )
108+ self .__loaded_nodes = {} # Track loaded nodes for unloading
109+ self .__no_shutdown_future : Optional [asyncio .Future ] = None
93110
94111 @classmethod
95112 def parse (cls , entity : Entity , parser : Parser ):
@@ -105,6 +122,9 @@ def parse(cls, entity: Entity, parser: Parser):
105122 composable_node_cls , composable_node_kwargs = ComposableNode .parse (parser , entity )
106123 kwargs ['composable_node_descriptions' ].append (
107124 composable_node_cls (** composable_node_kwargs ))
125+
126+ kwargs ['unload_on_shutdown' ] = parser .parse_if_substitutions (
127+ entity .get_attr ('unload_on_shutdown' , data_type = bool , optional = True ))
108128
109129 return cls , kwargs
110130
@@ -162,6 +182,7 @@ def unblock(future):
162182
163183 node_name = response .full_node_name if response .full_node_name else request .node_name
164184 if response .success :
185+ self .__loaded_nodes [response .unique_id ] = node_name
165186 if node_name is not None :
166187 add_node_name (context , node_name )
167188 node_name_count = get_node_name_count (context , node_name )
@@ -205,6 +226,144 @@ def _load_in_sequence(
205226 )
206227 )
207228
229+ def _unload_node (
230+ self ,
231+ request : composition_interfaces .srv .UnloadNode .Request ,
232+ client : rclpy .client .Client
233+ ) -> None :
234+ """
235+ Unload node synchronously.
236+
237+ :param request: service request to unload a node
238+ :param client: service client to call the service
239+ """
240+ if not client .wait_for_service (timeout_sec = 5.0 ):
241+ self .__logger .warning (
242+ "Abandoning wait for the '{}' service, due to timeout." .format (
243+ client .srv_name
244+ )
245+ )
246+
247+ # Asynchronously wait on service call so that we can periodically check for shutdown
248+ event = threading .Event ()
249+
250+ def unblock (future ):
251+ nonlocal event
252+ event .set ()
253+
254+ self .__logger .debug (
255+ "Calling the '{}' service with request '{}'" .format (
256+ client .srv_name , request
257+ )
258+ )
259+
260+ response_future = client .call_async (request )
261+ response_future .add_done_callback (unblock )
262+
263+ timeout = 10.0
264+ while not event .wait (1.0 ):
265+ timeout -= 1.0
266+ if timeout <= 0.0 :
267+ self .__logger .warning (
268+ "Abandoning wait for the '{}' service response, due to timeout." .format (
269+ client .srv_name ),
270+ )
271+ response_future .cancel ()
272+ return
273+
274+ # Get response
275+ if response_future .exception () is not None :
276+ raise response_future .exception ()
277+ response = response_future .result ()
278+
279+ self .__logger .debug ("Received response '{}'" .format (response ))
280+
281+ node_name = self .__loaded_nodes [request .unique_id ]
282+ if response .success :
283+ del self .__loaded_nodes [request .unique_id ]
284+ self .__logger .info ("Unloaded node '{}' in container '{}'" .format (
285+ node_name , self .__final_target_container_name
286+ ))
287+ else :
288+ self .__logger .error (
289+ "Failed to unload node '{}' in container '{}': {}" .format (
290+ node_name , self .__final_target_container_name ,
291+ response .error_message
292+ )
293+ )
294+
295+ def _unload_in_sequence (
296+ self ,
297+ unload_node_requests : List [composition_interfaces .srv .UnloadNode .Request ],
298+ context : LaunchContext ,
299+ ros_adapter : ROSAdapter = None ,
300+ client : rclpy .client .Client = None
301+ ) -> None :
302+ """
303+ Unload composable nodes sequentially.
304+
305+ :param unload_node_requests: a list of LoadNode service requests to execute
306+ :param context: current launch context
307+ :param ros_adapter: the ROS adapter to use for unloading
308+ :param client: the service client to use for unloading
309+ """
310+ # If this is the first call, a ROS node has to be created.
311+ # We need to create it ourselves, since the global ROS adapter also uses OnShutdown
312+ # to terminate, which may happen before our handler is called.
313+ if ros_adapter is None :
314+ # Init an auxiliary ROS adapter solely for processing the unloading requests.
315+ ros_adapter = ROSAdapter (autostart = False )
316+ ros_adapter .start (custom_name = 'launch_ros_{}_unload_nodes_{}' .format (os .getpid (), id (self )))
317+ # Create the service client.
318+ client = ros_adapter .ros_node .create_client (
319+ composition_interfaces .srv .UnloadNode , '{}/_container/unload_node' .format (
320+ self .__final_target_container_name
321+ )
322+ )
323+
324+ # Process the first request and schedule the rest.
325+ next_unload_node_request = unload_node_requests [0 ]
326+ unload_node_requests = unload_node_requests [1 :]
327+ self ._unload_node (next_unload_node_request , client )
328+ if len (unload_node_requests ) > 0 :
329+ context .add_completion_future (
330+ context .asyncio_loop .run_in_executor (
331+ None , self ._unload_in_sequence , unload_node_requests , context , ros_adapter , client
332+ )
333+ )
334+ else :
335+ # If this was the last request, stop the ROS node.
336+ ros_adapter .shutdown ()
337+
338+ def __on_shutdown (self , event : Event , context : LaunchContext ) -> Optional [SomeEntitiesType ]:
339+ """
340+ Unload all nodes when the launch context is shutdown.
341+ This handler is only registered if `unload_on_shutdown` is True.
342+ """
343+ # Generate unload requests
344+ unload_node_requests = []
345+ for unique_id , node_name in self .__loaded_nodes .items ():
346+ request = composition_interfaces .srv .UnloadNode .Request ()
347+ request .unique_id = unique_id
348+ unload_node_requests .append (request )
349+
350+ # Process the unload requests
351+ if unload_node_requests :
352+ context .add_completion_future (
353+ context .asyncio_loop .run_in_executor (
354+ None , self ._unload_in_sequence , unload_node_requests , context
355+ )
356+ )
357+
358+ # Cancel the dummy coroutine so that the launch process can exit
359+ # The exit will happen once the futures from _unload_in_sequence complete
360+ if self .__no_shutdown_future is not None :
361+ self .__no_shutdown_future .cancel ()
362+
363+ def get_asyncio_future (self ) -> Optional [asyncio .Future ]:
364+ """Return the future used to track completion."""
365+ return self .__no_shutdown_future
366+
208367 def execute (
209368 self ,
210369 context : LaunchContext
@@ -275,6 +434,22 @@ def execute(
275434 if len (autostart_actions ) != 0 :
276435 return autostart_actions
277436
437+ if self .__unload_on_shutdown :
438+ # Register a shutdown event handler to unload the nodes when the launch context is shutdown.
439+ context .register_event_handler (
440+ OnShutdown (on_shutdown = self .__on_shutdown )
441+ )
442+
443+ # Start dummy coroutine so that the launch process does not exit
444+ async def dummy_coroutine ():
445+ try :
446+ while True :
447+ await asyncio .sleep (1.0 )
448+ except asyncio .CancelledError :
449+ pass
450+ self .__no_shutdown_future = context .asyncio_loop .create_task (
451+ dummy_coroutine ()
452+ )
278453
279454def get_composable_node_load_request (
280455 composable_node_description : ComposableNode ,
0 commit comments