From 487a55642ebe90804bdfa54067f44ffbf8353a72 Mon Sep 17 00:00:00 2001 From: URJala Date: Fri, 14 Feb 2025 14:07:03 +0000 Subject: [PATCH 1/2] The tests in robot_driver.py run, and initialisation is done entirely in conftest and test_common --- ur_robot_driver/test/conftest.py | 119 +++++ ur_robot_driver/test/pytest.ini | 3 + ur_robot_driver/test/robot_driver.py | 717 ++++++++++++--------------- ur_robot_driver/test/test_common.py | 48 +- 4 files changed, 491 insertions(+), 396 deletions(-) create mode 100644 ur_robot_driver/test/conftest.py create mode 100644 ur_robot_driver/test/pytest.ini diff --git a/ur_robot_driver/test/conftest.py b/ur_robot_driver/test/conftest.py new file mode 100644 index 000000000..da0402c6f --- /dev/null +++ b/ur_robot_driver/test/conftest.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +# Copyright 2025, Universal Robots A/S +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# * Neither the name of the {copyright_holder} nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +from test_common import ( + interfaces, + generate_driver_test_description, +) +import pytest +import launch_pytest +import logging +import rclpy.node + +# fixture scope for all fixtures +scope = "function" + + +# Fixtures for use with the integration tests. +@pytest.fixture(scope=scope, params=["scaled_joint_trajectory_controller"]) +def initial_joint_controller(request): + return request.param + + +@pytest.fixture(scope=scope, params=["", "my_ur_"]) +def tf_prefix(request): + return request.param + + +@pytest.fixture(scope=scope, params=["false", "true"]) +def mock_hardware(request): + return request.param + + +@pytest.fixture(scope=scope, params=[0, 1]) +def params(request): + params = [ + {"tf_prefix": "", "mock_hardware": "false"}, + {"tf_prefix": "my_ur_", "mock_hardware": "true"}, + ] + return params[request.param] + + +# Could also be passed like this, but that result in each test being executed 4 times, instead of 2, which seems excessive: +# @launch_pytest.fixture(scope=scope, autouse=True, auto_shutdown=True) +# def launch_description(tf_prefix, mock_hardware): +# return generate_driver_test_description(tf_prefix=tf_prefix, mock_hardware=mock_hardware) + + +# This only gives 2 exeecutions of each test +@launch_pytest.fixture(scope=scope, autouse=True, auto_shutdown=True) +def launch_description(params): + print("-------------LAUNCHING DRIVER -------------------------------") + tf_prefix = params["tf_prefix"] + mock_hardware = params["mock_hardware"] + return generate_driver_test_description(tf_prefix=tf_prefix, mock_hardware=mock_hardware) + + +@pytest.fixture(scope=scope) +def rclpy_init(): + """ + Initializes and finalizes rclpy. + + This ensures that rclpy.init() has been called before, but is not called more than once. + """ + logging.info("Initializing rclpy") + rclpy.init() + + yield + + logging.info("Shutting down rclpy") + rclpy.shutdown() + + +@pytest.fixture(scope=scope) +def node(rclpy_init): + """ + Creates a node with a given name. + + The name needs to be passed in as a parameter + """ + logging.info("Creating testing node") + rclpy_node = rclpy.node.Node("robot_driver_test") + + yield rclpy_node + + logging.info("Destroying testing node") + rclpy_node.destroy_node() + + +@pytest.fixture(scope=scope) +def robot(node, params): + interface = interfaces(node, params) + interface.init_interfaces() + yield interface diff --git a/ur_robot_driver/test/pytest.ini b/ur_robot_driver/test/pytest.ini new file mode 100644 index 000000000..c24fe5bb9 --- /dev/null +++ b/ur_robot_driver/test/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +filterwarnings = + ignore::DeprecationWarning diff --git a/ur_robot_driver/test/robot_driver.py b/ur_robot_driver/test/robot_driver.py index b0b2a4c84..36516736c 100755 --- a/ur_robot_driver/test/robot_driver.py +++ b/ur_robot_driver/test/robot_driver.py @@ -30,410 +30,341 @@ import os import sys import time -import unittest - -import launch_testing -import pytest import rclpy from builtin_interfaces.msg import Duration from control_msgs.action import FollowJointTrajectory from control_msgs.msg import JointTolerance from controller_manager_msgs.srv import SwitchController -from rclpy.node import Node from sensor_msgs.msg import JointState from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint from ur_msgs.msg import IOStates sys.path.append(os.path.dirname(__file__)) -from test_common import ( # noqa: E402 - ActionInterface, - ControllerManagerInterface, - DashboardInterface, - IoStatusInterface, - ConfigurationInterface, - generate_driver_test_description, - ROBOT_JOINTS, -) - -TIMEOUT_EXECUTE_TRAJECTORY = 30 - - -@pytest.mark.launch_test -@launch_testing.parametrize( - "tf_prefix", - [(""), ("my_ur_")], -) -def generate_test_description(tf_prefix): - return generate_driver_test_description(tf_prefix=tf_prefix) - - -class RobotDriverTest(unittest.TestCase): - @classmethod - def setUpClass(cls): - # Initialize the ROS context - rclpy.init() - cls.node = Node("robot_driver_test") - time.sleep(1) - cls.init_robot(cls) - - @classmethod - def tearDownClass(cls): - # Shutdown the ROS context - cls.node.destroy_node() - rclpy.shutdown() - - def init_robot(self): - self._dashboard_interface = DashboardInterface(self.node) - self._controller_manager_interface = ControllerManagerInterface(self.node) - self._io_status_controller_interface = IoStatusInterface(self.node) - self._configuration_controller_interface = ConfigurationInterface(self.node) - - self._scaled_follow_joint_trajectory = ActionInterface( - self.node, - "/scaled_joint_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) - self._passthrough_forward_joint_trajectory = ActionInterface( - self.node, - "/passthrough_trajectory_controller/follow_joint_trajectory", - FollowJointTrajectory, - ) - - def setUp(self): - self._dashboard_interface.start_robot() - time.sleep(1) - self.assertTrue(self._io_status_controller_interface.resend_robot_program().success) - - # - # Test functions - # - - def test_get_robot_software_version(self): - self.assertNotEqual( - self._configuration_controller_interface.get_robot_software_version().major, 0 - ) - - def test_start_scaled_jtc_controller(self): - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - - def test_start_passthrough_controller(self): - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=["passthrough_trajectory_controller"], - deactivate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - - def test_set_io(self): - """Test to set an IO and check whether it has been set.""" - # Create io callback to verify result - io_msg = None - - def io_msg_cb(msg): - nonlocal io_msg - io_msg = msg - - io_states_sub = self.node.create_subscription( - IOStates, - "/io_and_status_controller/io_states", - io_msg_cb, - rclpy.qos.qos_profile_system_default, - ) - - # Set pin 0 to 1.0 - test_pin = 0 - - logging.info("Setting pin %d to 1.0", test_pin) - self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=1.0) - - # Wait until the pin state has changed - pin_state = False - end_time = time.time() + 5 - while not pin_state and time.time() < end_time: - rclpy.spin_once(self.node, timeout_sec=0.1) - if io_msg is not None: - pin_state = io_msg.digital_out_states[test_pin].state - - self.assertEqual(pin_state, 1.0) - - # Set pin 0 to 0.0 - logging.info("Setting pin %d to 0.0", test_pin) - self._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=0.0) - - # Wait until the pin state has changed back - end_time = time.time() + 5 - while pin_state and time.time() < end_time: - rclpy.spin_once(self.node, timeout_sec=0.1) - if io_msg is not None: - pin_state = io_msg.digital_out_states[test_pin].state - - self.assertEqual(pin_state, 0.0) - - # Clean up io subscription - self.node.destroy_subscription(io_states_sub) - - def test_trajectory(self, tf_prefix): - """Test robot movement.""" - # Construct test trajectory - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), - (Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Sending trajectory goal - logging.info("Sending simple goal") - goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) - self.assertTrue(goal_handle.accepted) - - # Verify execution - result = self._scaled_follow_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - - def test_illegal_trajectory(self, tf_prefix): - """ - Test trajectory server. - - This is more of a validation test that the testing suite does the right thing - """ - # Construct test trajectory, the second point wrongly starts before the first - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=3, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Send illegal goal - logging.info("Sending illegal goal") - goal_handle = self._scaled_follow_joint_trajectory.send_goal( - trajectory=trajectory, - ) - - # Verify the failure is correctly detected - self.assertFalse(goal_handle.accepted) - - def test_trajectory_scaled(self, tf_prefix): - """Test robot movement.""" - # Construct test trajectory - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - (Duration(sec=6, nanosec=500000000), [-1.0 for j in ROBOT_JOINTS]), - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - # Execute trajectory - logging.info("Sending goal for robot to follow") - goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) - self.assertTrue(goal_handle.accepted) - - # Verify execution - result = self._scaled_follow_joint_trajectory.get_result( - goal_handle, - TIMEOUT_EXECUTE_TRAJECTORY, - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - - def test_trajectory_scaled_aborts_on_violation(self, tf_prefix): - """Test that the robot correctly aborts the trajectory when the constraints are violated.""" - # Construct test trajectory - test_trajectory = [ - (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), - ( - Duration(sec=6, nanosec=50000000), - [-1.0 for j in ROBOT_JOINTS], - ), # physically unfeasible - ( - Duration(sec=8, nanosec=0), - [-1.5 for j in ROBOT_JOINTS], - ), # physically unfeasible - ] - - trajectory = JointTrajectory( - joint_names=[tf_prefix + joint for joint in ROBOT_JOINTS], - points=[ - JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) - for (test_time, test_pos) in test_trajectory - ], - ) - - last_joint_state = None - - def js_cb(msg): - nonlocal last_joint_state - last_joint_state = msg - - joint_state_sub = self.node.create_subscription(JointState, "/joint_states", js_cb, 1) - joint_state_sub # prevent warning about unused variable - - # Send goal - logging.info("Sending goal for robot to follow") - goal_handle = self._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) - self.assertTrue(goal_handle.accepted) - - # Get result - result = self._scaled_follow_joint_trajectory.get_result( - goal_handle, - TIMEOUT_EXECUTE_TRAJECTORY, - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.PATH_TOLERANCE_VIOLATED) - - state_when_aborted = last_joint_state - - # This section is to make sure the robot stopped moving once the trajectory was aborted - time.sleep(2.0) - # Ugly workaround since we want to wait for a joint state in the same thread - while last_joint_state == state_when_aborted: - rclpy.spin_once(self.node) - state_after_sleep = last_joint_state - - logging.info("Joint states before sleep:\t %s", state_when_aborted.position.tolist()) - logging.info("Joint states after sleep:\t %s", state_after_sleep.position.tolist()) - - self.assertTrue( - all( - [ - abs(a - b) < 0.01 - for a, b in zip(state_after_sleep.position, state_when_aborted.position) - ] - ) - ) - - # TODO: uncomment when JTC starts taking into account goal_time_tolerance from goal message - # see https://github.com/ros-controls/ros2_controllers/issues/249 - # Now do the same again, but with a goal time constraint - # self.node.get_logger().info("Sending scaled goal with time restrictions") - # - # goal.goal_time_tolerance = Duration(nanosec=10000000) - # goal_response = self.call_action("/scaled_joint_trajectory_controller/follow_joint_trajectory", goal) - # - # self.assertEqual(goal_response.accepted, True) - # - # if goal_response.accepted: - # result = self.get_result("/scaled_joint_trajectory_controller/follow_joint_trajectory", goal_response, TIMEOUT_EXECUTE_TRAJECTORY) - # self.assertEqual(result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED) - # self.node.get_logger().info("Received result GOAL_TOLERANCE_VIOLATED") - - def test_passthrough_trajectory(self, tf_prefix): - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - activate_controllers=["passthrough_trajectory_controller"], - deactivate_controllers=["scaled_joint_trajectory_controller"], - ).ok - ) - waypts = [ - [-1.58, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], - [-3, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], - [-1.58, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], - ] - time_vec = [ - Duration(sec=4, nanosec=0), +from test_common import ROBOT_JOINTS, TIMEOUT_EXECUTE_TRAJECTORY # noqa: E402 + + +def test_get_robot_software_version(robot): + if not robot.mock_hardware: + assert robot._configuration_controller_interface.get_robot_software_version().major != 0 + else: + assert robot._configuration_controller_interface.get_robot_software_version().major == 0 + + +def test_start_scaled_jtc_controller(robot): + assert robot._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + + +def test_start_passthrough_controller(robot): + assert robot._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + assert robot._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok + + +def test_set_io(robot): + """Test to set an IO and check whether it has been set.""" + # Create io callback to verify result + if robot.mock_hardware: + return True + io_msg = None + + def io_msg_cb(msg): + nonlocal io_msg + io_msg = msg + + io_states_sub = robot.node.create_subscription( + IOStates, + "/io_and_status_controller/io_states", + io_msg_cb, + rclpy.qos.qos_profile_system_default, + ) + + # Set pin 0 to 1.0 + test_pin = 0 + + logging.info("Setting pin %d to 1.0", test_pin) + robot._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=1.0) + + # Wait until the pin state has changed + pin_state = False + end_time = time.time() + 5 + while not pin_state and time.time() < end_time: + rclpy.spin_once(robot.node, timeout_sec=0.1) + if io_msg is not None: + pin_state = io_msg.digital_out_states[test_pin].state + + assert pin_state == 1.0 + + # Set pin 0 to 0.0 + logging.info("Setting pin %d to 0.0", test_pin) + robot._io_status_controller_interface.set_io(fun=1, pin=test_pin, state=0.0) + + # Wait until the pin state has changed back + end_time = time.time() + 5 + while pin_state and time.time() < end_time: + rclpy.spin_once(robot.node, timeout_sec=0.1) + if io_msg is not None: + pin_state = io_msg.digital_out_states[test_pin].state + + assert pin_state == 0.0 + + # Clean up io subscription + robot.node.destroy_subscription(io_states_sub) + + +def test_trajectory(robot): + """Test robot movement.""" + # Construct test trajectory + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=9, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), + (Duration(sec=12, nanosec=0), [-1.0 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[robot.tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Sending trajectory goal + logging.info("Sending simple goal") + goal_handle = robot._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) + assert goal_handle.accepted + + # Verify execution + result = robot._scaled_follow_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY + ) + assert result.error_code == FollowJointTrajectory.Result.SUCCESSFUL + + +def test_illegal_trajectory(robot): + """ + Test trajectory server. + + This is more of a validation test that the testing suite does the right thing + """ + # Construct test trajectory, the second point wrongly starts before the first + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=3, nanosec=0), [-0.5 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[robot.tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Send illegal goal + logging.info("Sending illegal goal") + goal_handle = robot._scaled_follow_joint_trajectory.send_goal( + trajectory=trajectory, + ) + + # Verify the failure is correctly detected + assert not goal_handle.accepted + + +def test_trajectory_scaled(robot): + """Test robot movement.""" + # Construct test trajectory + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + (Duration(sec=6, nanosec=500000000), [-1.0 for j in ROBOT_JOINTS]), + ] + + trajectory = JointTrajectory( + joint_names=[robot.tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + # Execute trajectory + logging.info("Sending goal for robot to follow") + goal_handle = robot._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) + assert goal_handle.accepted + + # Verify execution + result = robot._scaled_follow_joint_trajectory.get_result( + goal_handle, + TIMEOUT_EXECUTE_TRAJECTORY, + ) + assert result.error_code == FollowJointTrajectory.Result.SUCCESSFUL + + +def test_trajectory_scaled_aborts_on_violation(robot): + """Test that the robot correctly aborts the trajectory when the constraints are violated.""" + # Construct test trajectory + if robot.mock_hardware: + return True + test_trajectory = [ + (Duration(sec=6, nanosec=0), [0.0 for j in ROBOT_JOINTS]), + ( + Duration(sec=6, nanosec=50000000), + [-1.0 for j in ROBOT_JOINTS], + ), # physically unfeasible + ( Duration(sec=8, nanosec=0), - Duration(sec=12, nanosec=0), - ] - goal_tolerance = [ - JointTolerance(position=0.01, name=tf_prefix + ROBOT_JOINTS[i]) - for i in range(len(ROBOT_JOINTS)) - ] - goal_time_tolerance = Duration(sec=1, nanosec=0) - test_trajectory = zip(time_vec, waypts) - trajectory = JointTrajectory( - points=[ - JointTrajectoryPoint(positions=pos, time_from_start=times) - for (times, pos) in test_trajectory - ], - joint_names=[tf_prefix + ROBOT_JOINTS[i] for i in range(len(ROBOT_JOINTS))], - ) - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, - ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual(result.error_code, FollowJointTrajectory.Result.SUCCESSFUL) - # Test impossible goal tolerance, should fail. - goal_tolerance = [ - JointTolerance(position=0.000000001, name=tf_prefix + ROBOT_JOINTS[i]) - for i in range(len(ROBOT_JOINTS)) - ] - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, + [-1.5 for j in ROBOT_JOINTS], + ), # physically unfeasible + ] + + trajectory = JointTrajectory( + joint_names=[robot.tf_prefix + joint for joint in ROBOT_JOINTS], + points=[ + JointTrajectoryPoint(positions=test_pos, time_from_start=test_time) + for (test_time, test_pos) in test_trajectory + ], + ) + + last_joint_state = None + + def js_cb(msg): + nonlocal last_joint_state + last_joint_state = msg + + joint_state_sub = robot.node.create_subscription(JointState, "/joint_states", js_cb, 1) + joint_state_sub # prevent warning about unused variable + + # Send goal + logging.info("Sending goal for robot to follow") + goal_handle = robot._scaled_follow_joint_trajectory.send_goal(trajectory=trajectory) + assert goal_handle.accepted + + # Get result + result = robot._scaled_follow_joint_trajectory.get_result( + goal_handle, + TIMEOUT_EXECUTE_TRAJECTORY, + ) + assert result.error_code == FollowJointTrajectory.Result.PATH_TOLERANCE_VIOLATED + + state_when_aborted = last_joint_state + + # This section is to make sure the robot stopped moving once the trajectory was aborted + time.sleep(2.0) + # Ugly workaround since we want to wait for a joint state in the same thread + while last_joint_state == state_when_aborted: + rclpy.spin_once(robot.node) + state_after_sleep = last_joint_state + + logging.info("Joint states before sleep:\t %s", state_when_aborted.position.tolist()) + logging.info("Joint states after sleep:\t %s", state_after_sleep.position.tolist()) + + assert all( + [abs(a - b) < 0.01 for a, b in zip(state_after_sleep.position, state_when_aborted.position)] + ) + + # # TODO: uncomment when JTC starts taking into account goal_time_tolerance from goal message + # # see https://github.com/ros-controls/ros2_controllers/issues/249 + # # Now do the same again, but with a goal time constraint + # # self.node.get_logger().info("Sending scaled goal with time restrictions") + # # + # # goal.goal_time_tolerance = Duration(nanosec=10000000) + # # goal_response = self.call_action("/scaled_joint_trajectory_controller/follow_joint_trajectory", goal) + # # + # # self.assertEqual(goal_response.accepted, True) + # # + # # if goal_response.accepted: + # # result = self.get_result("/scaled_joint_trajectory_controller/follow_joint_trajectory", goal_response, TIMEOUT_EXECUTE_TRAJECTORY) + # # self.assertEqual(result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED) + # # self.node.get_logger().info("Received result GOAL_TOLERANCE_VIOLATED") + + +def test_passthrough_trajectory(robot): + if robot.mock_hardware: + return True + assert robot._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + activate_controllers=["passthrough_trajectory_controller"], + deactivate_controllers=["scaled_joint_trajectory_controller"], + ).ok + waypts = [ + [-1.58, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], + [-3, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], + [-1.58, -1.692, -1.4311, -0.0174, 1.5882, 0.0349], + ] + time_vec = [ + Duration(sec=4, nanosec=0), + Duration(sec=8, nanosec=0), + Duration(sec=12, nanosec=0), + ] + goal_tolerance = [ + JointTolerance(position=0.01, name=robot.tf_prefix + ROBOT_JOINTS[i]) + for i in range(len(ROBOT_JOINTS)) + ] + goal_time_tolerance = Duration(sec=1, nanosec=0) + test_trajectory = zip(time_vec, waypts) + trajectory = JointTrajectory( + points=[ + JointTrajectoryPoint(positions=pos, time_from_start=times) + for (times, pos) in test_trajectory + ], + joint_names=[robot.tf_prefix + ROBOT_JOINTS[i] for i in range(len(ROBOT_JOINTS))], + ) + goal_handle = robot._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + assert goal_handle.accepted + if goal_handle.accepted: + result = robot._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual( - result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - ) - - # Test impossible goal time - goal_tolerance = [ - JointTolerance(position=0.01, name=tf_prefix + ROBOT_JOINTS[i]) for i in range(6) - ] - goal_time_tolerance.sec = 0 - goal_time_tolerance.nanosec = 10 - goal_handle = self._passthrough_forward_joint_trajectory.send_goal( - trajectory=trajectory, - goal_time_tolerance=goal_time_tolerance, - goal_tolerance=goal_tolerance, + assert result.error_code == FollowJointTrajectory.Result.SUCCESSFUL + # Test impossible goal tolerance, should fail. + goal_tolerance = [ + JointTolerance(position=0.000000001, name=robot.tf_prefix + ROBOT_JOINTS[i]) + for i in range(len(ROBOT_JOINTS)) + ] + goal_handle = robot._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + assert goal_handle.accepted + if goal_handle.accepted: + result = robot._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY ) - self.assertTrue(goal_handle.accepted) - if goal_handle.accepted: - result = self._passthrough_forward_joint_trajectory.get_result( - goal_handle, TIMEOUT_EXECUTE_TRAJECTORY - ) - self.assertEqual( - result.error_code, FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED - ) - self.assertTrue( - self._controller_manager_interface.switch_controller( - strictness=SwitchController.Request.BEST_EFFORT, - deactivate_controllers=["passthrough_trajectory_controller"], - activate_controllers=["scaled_joint_trajectory_controller"], - ).ok + assert result.error_code == FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + + # Test impossible goal time + goal_tolerance = [ + JointTolerance(position=0.01, name=robot.tf_prefix + ROBOT_JOINTS[i]) for i in range(6) + ] + goal_time_tolerance.sec = 0 + goal_time_tolerance.nanosec = 10 + goal_handle = robot._passthrough_forward_joint_trajectory.send_goal( + trajectory=trajectory, + goal_time_tolerance=goal_time_tolerance, + goal_tolerance=goal_tolerance, + ) + assert goal_handle.accepted + if goal_handle.accepted: + result = robot._passthrough_forward_joint_trajectory.get_result( + goal_handle, TIMEOUT_EXECUTE_TRAJECTORY ) + assert result.error_code == FollowJointTrajectory.Result.GOAL_TOLERANCE_VIOLATED + assert robot._controller_manager_interface.switch_controller( + strictness=SwitchController.Request.BEST_EFFORT, + deactivate_controllers=["passthrough_trajectory_controller"], + activate_controllers=["scaled_joint_trajectory_controller"], + ).ok diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 927542676..36445dfa5 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -46,7 +46,9 @@ from launch.launch_description_sources import PythonLaunchDescriptionSource from launch.substitutions import LaunchConfiguration, PathJoinSubstitution from launch_ros.substitutions import FindPackagePrefix, FindPackageShare -from launch_testing.actions import ReadyToTest + +# from launch_testing.actions import ReadyToTest +from launch_pytest.actions import ReadyToTest from rclpy.action import ActionClient from std_srvs.srv import Trigger from ur_dashboard_msgs.msg import RobotMode @@ -57,11 +59,13 @@ IsProgramRunning, Load, ) +from control_msgs.action import FollowJointTrajectory from ur_msgs.srv import SetIO, GetRobotSoftwareVersion, SetForceMode TIMEOUT_WAIT_SERVICE = 10 TIMEOUT_WAIT_SERVICE_INITIAL = 120 # If we download the docker image simultaneously to the tests, it can take quite some time until the dashboard server is reachable and usable. TIMEOUT_WAIT_ACTION = 10 +TIMEOUT_EXECUTE_TRAJECTORY = 30 ROBOT_JOINTS = [ "elbow_joint", @@ -234,6 +238,11 @@ def _check_call(self, result): raise Exception("Service call not successful") +class MockDashboard: + def start_robot(self): + return True + + class ControllerManagerInterface( _ServiceInterface, namespace="/controller_manager", @@ -283,6 +292,35 @@ class ForceModeInterface( pass +class interfaces: + def __init__(self, node, params): + self.node = node + # Save launch parameters in case they could be used later + self.mock_hardware = params["mock_hardware"] == "true" + self.tf_prefix = params["tf_prefix"] + + def init_interfaces(self): + print("Initialising interfaces") + if not self.mock_hardware: + self._dashboard_interface = DashboardInterface(self.node) + else: + self._dashboard_interface = MockDashboard() + self._controller_manager_interface = ControllerManagerInterface(self.node) + self._io_status_controller_interface = IoStatusInterface(self.node) + self._configuration_controller_interface = ConfigurationInterface(self.node) + + self._scaled_follow_joint_trajectory = ActionInterface( + self.node, + "/scaled_joint_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + self._passthrough_forward_joint_trajectory = ActionInterface( + self.node, + "/passthrough_trajectory_controller/follow_joint_trajectory", + FollowJointTrajectory, + ) + + def _declare_launch_arguments(): declared_arguments = [] @@ -341,7 +379,10 @@ def generate_dashboard_test_description(): def generate_driver_test_description( - tf_prefix="", controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL + tf_prefix="", + initial_joint_controller="scaled_joint_trajectory_controller", + mock_hardware="false", + controller_spawner_timeout=TIMEOUT_WAIT_SERVICE_INITIAL, ): ur_type = LaunchConfiguration("ur_type") @@ -350,10 +391,11 @@ def generate_driver_test_description( "ur_type": ur_type, "launch_rviz": "false", "controller_spawner_timeout": str(controller_spawner_timeout), - "initial_joint_controller": "scaled_joint_trajectory_controller", + "initial_joint_controller": initial_joint_controller, "headless_mode": "true", "launch_dashboard_client": "true", "start_joint_controller": "false", + "use_mock_hardware": mock_hardware, } if tf_prefix: launch_arguments["tf_prefix"] = tf_prefix From 7de2247678f40c225c4eb4c927644e9f3e75d462 Mon Sep 17 00:00:00 2001 From: URJala Date: Tue, 4 Mar 2025 09:14:26 +0000 Subject: [PATCH 2/2] Changes for local testing (should be reverted) --- ur_robot_driver/test/test_common.py | 38 ++++++++++++++--------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/ur_robot_driver/test/test_common.py b/ur_robot_driver/test/test_common.py index 36445dfa5..7198d7b62 100644 --- a/ur_robot_driver/test/test_common.py +++ b/ur_robot_driver/test/test_common.py @@ -40,9 +40,10 @@ DeclareLaunchArgument, ExecuteProcess, IncludeLaunchDescription, - RegisterEventHandler, + # RegisterEventHandler, ) -from launch.event_handlers import OnProcessExit + +# from launch.event_handlers import OnProcessExit from launch.launch_description_sources import PythonLaunchDescriptionSource from launch.substitutions import LaunchConfiguration, PathJoinSubstitution from launch_ros.substitutions import FindPackagePrefix, FindPackageShare @@ -387,7 +388,7 @@ def generate_driver_test_description( ur_type = LaunchConfiguration("ur_type") launch_arguments = { - "robot_ip": "192.168.56.101", + "robot_ip": "172.17.0.3", "ur_type": ur_type, "launch_rviz": "false", "controller_spawner_timeout": str(controller_spawner_timeout), @@ -408,20 +409,17 @@ def generate_driver_test_description( ), launch_arguments=launch_arguments.items(), ) - wait_dashboard_server = ExecuteProcess( - cmd=[ - PathJoinSubstitution( - [FindPackagePrefix("ur_robot_driver"), "bin", "wait_dashboard_server.sh"] - ) - ], - name="wait_dashboard_server", - output="screen", - ) - driver_starter = RegisterEventHandler( - OnProcessExit(target_action=wait_dashboard_server, on_exit=robot_driver) - ) - - return LaunchDescription( - _declare_launch_arguments() - + [ReadyToTest(), wait_dashboard_server, _ursim_action(), driver_starter] - ) + # wait_dashboard_server = ExecuteProcess( + # cmd=[ + # PathJoinSubstitution( + # [FindPackagePrefix("ur_robot_driver"), "bin", "wait_dashboard_server.sh"] + # ) + # ], + # name="wait_dashboard_server", + # output="screen", + # ) + # driver_starter = RegisterEventHandler( + # OnProcessExit(target_action=wait_dashboard_server, on_exit=robot_driver) + # ) + + return LaunchDescription(_declare_launch_arguments() + [ReadyToTest(), robot_driver])