Skip to content

Commit e9081a2

Browse files
committed
Add test for urscript interface
1 parent de6d275 commit e9081a2

File tree

2 files changed

+268
-0
lines changed

2 files changed

+268
-0
lines changed

ur_robot_driver/CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,5 +187,9 @@ if(BUILD_TESTING)
187187
TIMEOUT
188188
500
189189
)
190+
add_launch_test(test/urscript_interface.py
191+
TIMEOUT
192+
500
193+
)
190194
endif()
191195
endif()
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
#!/usr/bin/env python
2+
# Copyright 2023, FZI Forschungszentrum Informatik
3+
#
4+
# Redistribution and use in source and binary forms, with or without
5+
# modification, are permitted provided that the following conditions are met:
6+
#
7+
# * Redistributions of source code must retain the above copyright
8+
# notice, this list of conditions and the following disclaimer.
9+
#
10+
# * Redistributions in binary form must reproduce the above copyright
11+
# notice, this list of conditions and the following disclaimer in the
12+
# documentation and/or other materials provided with the distribution.
13+
#
14+
# * Neither the name of the {copyright_holder} nor the names of its
15+
# contributors may be used to endorse or promote products derived from
16+
# this software without specific prior written permission.
17+
#
18+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21+
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
22+
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23+
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24+
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25+
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26+
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27+
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28+
# POSSIBILITY OF SUCH DAMAGE.
29+
30+
import pytest
31+
import time
32+
import unittest
33+
34+
import rclpy
35+
import rclpy.node
36+
from launch import LaunchDescription
37+
from launch.actions import (
38+
DeclareLaunchArgument,
39+
ExecuteProcess,
40+
IncludeLaunchDescription,
41+
RegisterEventHandler,
42+
)
43+
from launch.event_handlers import OnProcessExit
44+
from launch.launch_description_sources import PythonLaunchDescriptionSource
45+
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
46+
from launch_ros.substitutions import FindPackagePrefix, FindPackageShare
47+
from launch_testing.actions import ReadyToTest
48+
from std_srvs.srv import Trigger
49+
from std_msgs.msg import String as StringMsg
50+
from ur_dashboard_msgs.srv import (
51+
GetLoadedProgram,
52+
GetProgramState,
53+
GetRobotMode,
54+
IsProgramRunning,
55+
Load,
56+
)
57+
from ur_msgs.msg import IOStates
58+
59+
60+
ROBOT_IP = "192.168.56.101"
61+
TIMEOUT_WAIT_SERVICE = 10
62+
# If we download the docker image simultaneously to the tests, it can take quite some time until the
63+
# dashboard server is reachable and usable.
64+
TIMEOUT_WAIT_SERVICE_INITIAL = 120
65+
66+
67+
@pytest.mark.launch_test
68+
def generate_test_description():
69+
declared_arguments = []
70+
71+
declared_arguments.append(
72+
DeclareLaunchArgument(
73+
"ur_type",
74+
default_value="ur5e",
75+
description="Type/series of used UR robot.",
76+
choices=["ur3", "ur3e", "ur5", "ur5e", "ur10", "ur10e", "ur16e"],
77+
)
78+
)
79+
80+
ur_type = LaunchConfiguration("ur_type")
81+
82+
robot_driver = IncludeLaunchDescription(
83+
PythonLaunchDescriptionSource(
84+
PathJoinSubstitution(
85+
[FindPackageShare("ur_robot_driver"), "launch", "ur_control.launch.py"]
86+
)
87+
),
88+
launch_arguments={
89+
"robot_ip": "192.168.56.101",
90+
"ur_type": ur_type,
91+
"launch_rviz": "false",
92+
"controller_spawner_timeout": str(TIMEOUT_WAIT_SERVICE_INITIAL),
93+
"initial_joint_controller": "scaled_joint_trajectory_controller",
94+
"headless_mode": "true",
95+
"launch_dashboard_client": "false",
96+
"start_joint_controller": "false",
97+
}.items(),
98+
)
99+
100+
ursim = ExecuteProcess(
101+
cmd=[
102+
PathJoinSubstitution(
103+
[
104+
FindPackagePrefix("ur_robot_driver"),
105+
"lib",
106+
"ur_robot_driver",
107+
"start_ursim.sh",
108+
]
109+
),
110+
" ",
111+
"-m ",
112+
ur_type,
113+
],
114+
name="start_ursim",
115+
output="screen",
116+
)
117+
118+
wait_dashboard_server = ExecuteProcess(
119+
cmd=[
120+
PathJoinSubstitution(
121+
[FindPackagePrefix("ur_robot_driver"), "bin", "wait_dashboard_server.sh"]
122+
)
123+
],
124+
name="wait_dashboard_server",
125+
output="screen",
126+
)
127+
128+
driver_starter = RegisterEventHandler(
129+
OnProcessExit(target_action=wait_dashboard_server, on_exit=robot_driver)
130+
)
131+
132+
return LaunchDescription(
133+
declared_arguments + [ReadyToTest(), wait_dashboard_server, driver_starter, ursim]
134+
)
135+
136+
137+
class URScriptInterfaceTest(unittest.TestCase):
138+
@classmethod
139+
def setUpClass(cls):
140+
# Initialize the ROS context
141+
rclpy.init()
142+
cls.node = rclpy.node.Node("urscript_interface_test")
143+
cls.init_robot(cls)
144+
145+
@classmethod
146+
def tearDownClass(cls):
147+
# Shutdown the ROS context
148+
cls.node.destroy_node()
149+
rclpy.shutdown()
150+
151+
def init_robot(self):
152+
# We wait longer for the first client, as the robot is still starting up
153+
power_on_client = waitForService(
154+
self.node, "/dashboard_client/power_on", Trigger, timeout=TIMEOUT_WAIT_SERVICE_INITIAL
155+
)
156+
157+
# Connect to all other expected services
158+
dashboard_interfaces = {
159+
"power_off": Trigger,
160+
"brake_release": Trigger,
161+
"unlock_protective_stop": Trigger,
162+
"restart_safety": Trigger,
163+
"get_robot_mode": GetRobotMode,
164+
"load_installation": Load,
165+
"load_program": Load,
166+
"close_popup": Trigger,
167+
"get_loaded_program": GetLoadedProgram,
168+
"program_state": GetProgramState,
169+
"program_running": IsProgramRunning,
170+
"play": Trigger,
171+
"stop": Trigger,
172+
}
173+
self.dashboard_clients = {
174+
srv_name: waitForService(self.node, f"/dashboard_client/{srv_name}", srv_type)
175+
for (srv_name, srv_type) in dashboard_interfaces.items()
176+
}
177+
178+
# Add first client to dict
179+
self.dashboard_clients["power_on"] = power_on_client
180+
181+
self.urscript_pub = self.node.create_publisher(
182+
StringMsg, "/urscript_interface/script_command", 1
183+
)
184+
185+
def setUp(self):
186+
# Start robot
187+
empty_req = Trigger.Request()
188+
self.dashboard_call("power_on", empty_req)
189+
self.dashboard_call("brake_release", empty_req)
190+
191+
def test_set_io(self):
192+
"""Test setting an IO using a direct program call."""
193+
self.io_states_sub = self.node.create_subscription(
194+
IOStates,
195+
"/io_and_status_controller/io_states",
196+
self.io_msg_cb,
197+
rclpy.qos.qos_profile_system_default,
198+
)
199+
200+
self.set_digout_checked(0, True)
201+
time.sleep(1)
202+
self.set_digout_checked(0, False)
203+
204+
def io_msg_cb(self, msg):
205+
self.io_msg = msg
206+
207+
def set_digout_checked(self, pin, state):
208+
self.io_msg = None
209+
210+
script_msg = StringMsg(data=f"set_digital_out({pin}, {state})")
211+
self.urscript_pub.publish(script_msg)
212+
213+
self.check_pin_states([pin], [state])
214+
215+
def check_pin_states(self, pins, states):
216+
pin_states = [not x for x in states]
217+
end_time = time.time() + 5
218+
while pin_states != states and time.time() < end_time:
219+
rclpy.spin_once(self.node, timeout_sec=0.1)
220+
if self.io_msg is not None:
221+
for i, pin_id in enumerate(pins):
222+
pin_states[i] = self.io_msg.digital_out_states[pin_id].state
223+
self.assertEqual(pin_states, states)
224+
225+
def test_multiline_script(self):
226+
"""Tests sending a multiline script as secondary program."""
227+
self.io_msg = None
228+
self.io_states_sub = self.node.create_subscription(
229+
IOStates,
230+
"/io_and_status_controller/io_states",
231+
self.io_msg_cb,
232+
rclpy.qos.qos_profile_system_default,
233+
)
234+
235+
script_msg = StringMsg(
236+
data="sec my_program():\n set_digital_out(0, False)\n set_digital_out(1,True)\nend"
237+
)
238+
self.urscript_pub.publish(script_msg)
239+
self.check_pin_states([0, 1], [False, True])
240+
241+
script_msg = StringMsg(
242+
data="sec my_program():\n set_digital_out(0, True)\n set_digital_out(1,False)\nend"
243+
)
244+
self.urscript_pub.publish(script_msg)
245+
self.check_pin_states([0, 1], [True, False])
246+
247+
def dashboard_call(self, srv_name, request):
248+
self.node.get_logger().info(f"Calling service '{srv_name}' with request {request}")
249+
future = self.dashboard_clients[srv_name].call_async(request)
250+
rclpy.spin_until_future_complete(self.node, future)
251+
if future.result() is not None:
252+
self.node.get_logger().info(f"Received result {future.result()}")
253+
return future.result()
254+
else:
255+
raise Exception(f"Exception while calling service: {future.exception()}")
256+
257+
258+
def waitForService(node, srv_name, srv_type, timeout=TIMEOUT_WAIT_SERVICE):
259+
client = node.create_client(srv_type, srv_name)
260+
if client.wait_for_service(timeout) is False:
261+
raise Exception(f"Could not reach service '{srv_name}' within timeout of {timeout}")
262+
263+
node.get_logger().info(f"Successfully connected to service '{srv_name}'")
264+
return client

0 commit comments

Comments
 (0)