1717import unittest
1818
1919from action_msgs .msg import GoalStatus
20+ from ament_index_python .packages import get_package_prefix
2021from geometry_msgs .msg import TransformStamped , Twist , TwistStamped
2122from launch import LaunchDescription
23+ from launch .actions import (
24+ LogInfo ,
25+ SetEnvironmentVariable ,
26+ )
2227from launch_ros .actions import Node
2328import launch_testing
29+ import launch_testing .actions
30+ import launch_testing .asserts
31+ import launch_testing .markers
32+ import launch_testing .util
2433from nav2_msgs .action import DockRobot , NavigateToPose , UndockRobot
2534import pytest
2635import rclpy
27- import rclpy .time
28- import rclpy .duration
2936from rclpy .action .client import ActionClient
3037from rclpy .action .server import ActionServer
38+ import rclpy .duration
39+ import rclpy .time
3140from sensor_msgs .msg import BatteryState
32- from tf2_ros import TransformBroadcaster , Buffer , TransformListener
41+ from tf2_ros import Buffer , TransformBroadcaster , TransformListener
3342
3443
3544# This test can be run standalone with:
3645# python3 -u -m pytest test_docking_server.py -s
3746
3847@pytest .mark .rostest
48+ @launch_testing .markers .keep_alive
3949def generate_test_description ():
40-
41- return LaunchDescription ([
42- Node (
50+ tmux_gdb_prefix = (
51+ 'tmux split-window '
52+ + get_package_prefix ('nav2_bringup' )
53+ + '/lib/nav2_bringup/gdb_tmux_splitwindow.sh'
54+ )
55+ logme = LogInfo (msg = f'tmux_gdb_prefix={ tmux_gdb_prefix } ' )
56+ docking_server = Node (
4357 package = 'opennav_docking' ,
4458 executable = 'opennav_docking' ,
4559 name = 'docking_server' ,
60+ # arguments=['--ros-args', '--log-level', 'debug'],
4661 parameters = [{'wait_charge_timeout' : 1.0 ,
4762 'controller' : {
4863 'use_collision_detection' : False ,
@@ -59,17 +74,42 @@ def generate_test_description():
5974 'pose' : [10.0 , 0.0 , 0.0 ]
6075 }}],
6176 output = 'screen' ,
62- ),
63- Node (
77+ )
78+ lifecycle_manager_navigation = Node (
6479 package = 'nav2_lifecycle_manager' ,
6580 executable = 'lifecycle_manager' ,
6681 name = 'lifecycle_manager_navigation' ,
6782 output = 'screen' ,
83+ # prefix=tmux_gdb_prefix,
6884 parameters = [{'autostart' : True },
6985 {'node_names' : ['docking_server' ]}]
70- ),
86+ )
87+
88+ return LaunchDescription ([
89+ SetEnvironmentVariable ('RCUTILS_LOGGING_BUFFERED_STREAM' , '1' ),
90+ SetEnvironmentVariable ('RCUTILS_LOGGING_USE_STDOUT' , '1' ),
91+ logme ,
92+ docking_server ,
93+ lifecycle_manager_navigation ,
94+ # In tests where all of the procs under tests terminate themselves, it's necessary
95+ # to add a dummy process not under test to keep the launch alive. launch_test
96+ # provides a simple launch action that does this:
97+ launch_testing .util .KeepAliveProc (),
7198 launch_testing .actions .ReadyToTest (),
72- ])
99+ ]), locals ()
100+
101+
102+ @launch_testing .post_shutdown_test ()
103+ class TestPostShutdown (unittest .TestCase ):
104+
105+ def test_action_graceful_shutdown (self ,
106+ proc_info ,
107+ docking_server ,
108+ lifecycle_manager_navigation ):
109+ """Test graceful shutdown."""
110+ launch_testing .asserts .assertExitCodes (proc_info , process = docking_server )
111+ launch_testing .asserts .assertExitCodes (proc_info ,
112+ process = lifecycle_manager_navigation )
73113
74114
75115class TestDockingServer (unittest .TestCase ):
@@ -146,7 +186,7 @@ def nav_execute_callback(self, goal_handle):
146186 return result
147187
148188 def check_transform (self , timeout_sec , source_frame = 'odom' , target_frame = 'base_link' ):
149- """Try to look up the transform we're publishing"""
189+ """Try to look up the transform we're publishing. """
150190 if self .transform_available :
151191 return True
152192
@@ -162,8 +202,8 @@ def check_transform(self, timeout_sec, source_frame='odom', target_frame='base_l
162202
163203 self .node .get_logger ().info ('Successfully found transform:' )
164204 self .node .get_logger ().info (f'Translation: [{ transform .transform .translation .x } , '
165- f'{ transform .transform .translation .y } , '
166- f'{ transform .transform .translation .z } ]' )
205+ f'{ transform .transform .translation .y } , '
206+ f'{ transform .transform .translation .z } ]' )
167207 self .transform_available = True
168208 except Exception as e :
169209 self .node .get_logger ().error (f'Error looking up transform: { str (e )} ' )
@@ -179,7 +219,6 @@ def test_docking_server(self):
179219 self .tf_listener = TransformListener (self .tf_buffer , self .node )
180220 self .transform_available = False
181221
182-
183222 # Create a timer to run "control loop" at 20hz
184223 self .timer = self .node .create_timer (0.05 , self .timer_callback )
185224
@@ -222,7 +261,7 @@ def test_docking_server(self):
222261 # Test docking action
223262 self .action_result = []
224263 assert self .dock_action_client .wait_for_server (timeout_sec = 5.0 ), \
225- 'dock_robot service not available'
264+ 'dock_robot service not available'
226265
227266 goal = DockRobot .Goal ()
228267 goal .use_dock_id = True
0 commit comments