diff --git a/ur_robot_driver/CMakeLists.txt b/ur_robot_driver/CMakeLists.txt
index 89492f54f..6069c5ace 100644
--- a/ur_robot_driver/CMakeLists.txt
+++ b/ur_robot_driver/CMakeLists.txt
@@ -103,8 +103,13 @@ add_executable(controller_stopper_node
)
ament_target_dependencies(controller_stopper_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${THIS_PACKAGE_INCLUDE_DEPENDS})
+add_executable(urscript_interface
+ src/urscript_interface.cpp
+)
+ament_target_dependencies(urscript_interface ${${PROJECT_NAME}_EXPORTED_TARGETS} ${THIS_PACKAGE_INCLUDE_DEPENDS})
+
install(
- TARGETS dashboard_client ur_ros2_control_node controller_stopper_node
+ TARGETS dashboard_client ur_ros2_control_node controller_stopper_node urscript_interface
DESTINATION lib/${PROJECT_NAME}
)
@@ -182,5 +187,9 @@ if(BUILD_TESTING)
TIMEOUT
500
)
+ add_launch_test(test/urscript_interface.py
+ TIMEOUT
+ 500
+ )
endif()
endif()
diff --git a/ur_robot_driver/doc/usage.rst b/ur_robot_driver/doc/usage.rst
index b6e03cb22..f031ddfe7 100644
--- a/ur_robot_driver/doc/usage.rst
+++ b/ur_robot_driver/doc/usage.rst
@@ -240,3 +240,61 @@ Then start
ros2 launch ur_robot_driver ur_control.launch.py ur_type:=ur5e robot_ip:=yyy.yyy.yyy.yyy use_fake_hardware:=true launch_rviz:=false initial_joint_controller:=joint_trajectory_controller
# and in another shell
ros2 launch ur_moveit_config ur_moveit.launch.py ur_type:=ur5e launch_rviz:=true
+
+Custom URScript commands
+------------------------
+
+The driver's package contains a ``urscript_interface`` node that allows sending URScript snippets
+directly to the robot. It gets started in the driver's launchfiles by default. To use it, simply
+publish a message to its interface:
+
+.. code-block:: bash
+
+ # simple popup
+ ros2 topic pub /urscript_interface/script_command std_msgs/msg/String '{data: popup("hello")}' --once
+
+Be aware, that running a program on this interface (meaning publishing script code to that interface) stops any running program on the robot.
+Thus, the motion-interpreting program that is started by the driver gets stopped and has to be
+restarted again. Depending whether you use headless mode or not, you'll have to call the
+``resend_program`` service or press the ``play`` button on the teach panel to start the
+external_control program again.
+
+.. note::
+ Currently, there is no feedback on the code's correctness. If the code sent to the
+ robot is incorrect, it will silently not get executed. Make sure that you send valid URScript code!
+
+Multi-line programs
+^^^^^^^^^^^^^^^^^^^
+
+When you want to define multi-line programs, make sure to check that newlines are correctly
+interpreted from your message. For this purpose the driver prints the program as it is being sent to
+the robot. When sending a multi-line program from the command line, you can use an empty line
+between each statement:
+
+.. code-block:: bash
+
+ ros2 topic pub --once /urscript_interface/script_command std_msgs/msg/String '{data:
+ "def my_prog():
+
+ set_digital_out(1, True)
+
+ movej(p[0.2, 0.3, 0.8, 0, 0, 3.14], a=1.2, v=0.25, r=0)
+
+ textmsg(\"motion finished\")
+
+ end"}'
+
+Non-interrupting programs
+^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To prevent interrupting the main program, you can send certain commands as `secondary programs
+`_.
+
+.. code-block:: bash
+
+ ros2 topic pub --once /urscript_interface/script_command std_msgs/msg/String '{data:
+ "sec my_prog():
+
+ textmsg(\"This is a log message\")
+
+ end"}'
diff --git a/ur_robot_driver/launch/ur_control.launch.py b/ur_robot_driver/launch/ur_control.launch.py
index 1e3c1c87f..e974b448b 100644
--- a/ur_robot_driver/launch/ur_control.launch.py
+++ b/ur_robot_driver/launch/ur_control.launch.py
@@ -243,6 +243,13 @@ def launch_setup(context, *args, **kwargs):
],
)
+ urscript_interface = Node(
+ package="ur_robot_driver",
+ executable="urscript_interface",
+ parameters=[{"robot_ip": robot_ip}],
+ output="screen",
+ )
+
controller_stopper_node = Node(
package="ur_robot_driver",
executable="controller_stopper_node",
@@ -341,6 +348,7 @@ def controller_spawner(name, active=True):
dashboard_client_node,
tool_communication_node,
controller_stopper_node,
+ urscript_interface,
robot_state_publisher_node,
rviz_node,
initial_joint_controller_spawner_stopped,
diff --git a/ur_robot_driver/src/urscript_interface.cpp b/ur_robot_driver/src/urscript_interface.cpp
new file mode 100644
index 000000000..757e76fb8
--- /dev/null
+++ b/ur_robot_driver/src/urscript_interface.cpp
@@ -0,0 +1,92 @@
+// Copyright 2023, FZI Forschungszentrum Informatik, Created on behalf of Universal Robots A/S
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+//
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+//
+// * Neither the name of the {copyright_holder} nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+// POSSIBILITY OF SUCH DAMAGE.
+
+//----------------------------------------------------------------------
+/*!\file
+ *
+ * \author Felix Exner exner@fzi.de
+ * \date 2023-06-20
+ *
+ */
+//----------------------------------------------------------------------
+
+#include
+#include
+
+#include
+
+#include
+#include
+
+class URScriptInterface : public rclcpp::Node
+{
+public:
+ URScriptInterface()
+ : Node("urscript_interface")
+ , m_script_sub(this->create_subscription(
+ "~/script_command", 1, [this](const std_msgs::msg::String::SharedPtr msg) {
+ auto program_with_newline = msg->data + '\n';
+
+ RCLCPP_INFO_STREAM(this->get_logger(), program_with_newline);
+
+ size_t len = program_with_newline.size();
+ const auto* data = reinterpret_cast(program_with_newline.c_str());
+ size_t written;
+
+ if (m_secondary_stream->write(data, len, written)) {
+ URCL_LOG_INFO("Sent program to robot:\n%s", program_with_newline.c_str());
+ return true;
+ }
+ URCL_LOG_ERROR("Could not send program to robot");
+ return false;
+ }))
+ {
+ this->declare_parameter("robot_ip", rclcpp::PARAMETER_STRING);
+ m_secondary_stream = std::make_unique>(
+ this->get_parameter("robot_ip").as_string(), urcl::primary_interface::UR_SECONDARY_PORT);
+ m_secondary_stream->connect();
+
+ auto program_with_newline = std::string("textmsg(\"urscript_interface connected\")\n");
+ size_t len = program_with_newline.size();
+ const auto* data = reinterpret_cast(program_with_newline.c_str());
+ size_t written;
+ m_secondary_stream->write(data, len, written);
+ }
+
+private:
+ rclcpp::Subscription::SharedPtr m_script_sub;
+ std::unique_ptr> m_secondary_stream;
+};
+
+int main(int argc, char** argv)
+{
+ rclcpp::init(argc, argv);
+ rclcpp::spin(std::make_unique());
+ rclcpp::shutdown();
+ return 0;
+}
diff --git a/ur_robot_driver/test/urscript_interface.py b/ur_robot_driver/test/urscript_interface.py
new file mode 100755
index 000000000..461d04699
--- /dev/null
+++ b/ur_robot_driver/test/urscript_interface.py
@@ -0,0 +1,293 @@
+#!/usr/bin/env python
+# Copyright 2023, FZI Forschungszentrum Informatik
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# * Neither the name of the {copyright_holder} nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+import pytest
+import time
+import unittest
+
+import rclpy
+import rclpy.node
+from launch import LaunchDescription
+from launch.actions import (
+ DeclareLaunchArgument,
+ ExecuteProcess,
+ IncludeLaunchDescription,
+ RegisterEventHandler,
+)
+from launch.event_handlers import OnProcessExit
+from launch.launch_description_sources import PythonLaunchDescriptionSource
+from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
+from launch_ros.substitutions import FindPackagePrefix, FindPackageShare
+from launch_testing.actions import ReadyToTest
+from std_srvs.srv import Trigger
+from std_msgs.msg import String as StringMsg
+from ur_dashboard_msgs.msg import RobotMode
+from ur_dashboard_msgs.srv import (
+ GetLoadedProgram,
+ GetProgramState,
+ GetRobotMode,
+ IsProgramRunning,
+ Load,
+)
+from ur_msgs.msg import IOStates
+from controller_manager_msgs.srv import ListControllers
+
+
+ROBOT_IP = "192.168.56.101"
+TIMEOUT_WAIT_SERVICE = 10
+# If we download the docker image simultaneously to the tests, it can take quite some time until the
+# dashboard server is reachable and usable.
+TIMEOUT_WAIT_SERVICE_INITIAL = 120
+
+
+@pytest.mark.launch_test
+def generate_test_description():
+ declared_arguments = []
+
+ declared_arguments.append(
+ DeclareLaunchArgument(
+ "ur_type",
+ default_value="ur5e",
+ description="Type/series of used UR robot.",
+ choices=["ur3", "ur3e", "ur5", "ur5e", "ur10", "ur10e", "ur16e"],
+ )
+ )
+
+ ur_type = LaunchConfiguration("ur_type")
+
+ robot_driver = IncludeLaunchDescription(
+ PythonLaunchDescriptionSource(
+ PathJoinSubstitution(
+ [FindPackageShare("ur_robot_driver"), "launch", "ur_control.launch.py"]
+ )
+ ),
+ launch_arguments={
+ "robot_ip": "192.168.56.101",
+ "ur_type": ur_type,
+ "launch_rviz": "false",
+ "controller_spawner_timeout": str(TIMEOUT_WAIT_SERVICE_INITIAL),
+ "initial_joint_controller": "scaled_joint_trajectory_controller",
+ "headless_mode": "true",
+ "launch_dashboard_client": "false",
+ "start_joint_controller": "false",
+ }.items(),
+ )
+
+ ursim = ExecuteProcess(
+ cmd=[
+ PathJoinSubstitution(
+ [
+ FindPackagePrefix("ur_robot_driver"),
+ "lib",
+ "ur_robot_driver",
+ "start_ursim.sh",
+ ]
+ ),
+ " ",
+ "-m ",
+ ur_type,
+ ],
+ name="start_ursim",
+ output="screen",
+ )
+
+ wait_dashboard_server = ExecuteProcess(
+ cmd=[
+ PathJoinSubstitution(
+ [FindPackagePrefix("ur_robot_driver"), "bin", "wait_dashboard_server.sh"]
+ )
+ ],
+ name="wait_dashboard_server",
+ output="screen",
+ )
+
+ driver_starter = RegisterEventHandler(
+ OnProcessExit(target_action=wait_dashboard_server, on_exit=robot_driver)
+ )
+
+ return LaunchDescription(
+ declared_arguments + [ReadyToTest(), wait_dashboard_server, driver_starter, ursim]
+ )
+
+
+class URScriptInterfaceTest(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ # Initialize the ROS context
+ rclpy.init()
+ cls.node = rclpy.node.Node("urscript_interface_test")
+ cls.init_robot(cls)
+
+ @classmethod
+ def tearDownClass(cls):
+ # Shutdown the ROS context
+ cls.node.destroy_node()
+ rclpy.shutdown()
+
+ def init_robot(self):
+ # We wait longer for the first client, as the robot is still starting up
+ power_on_client = waitForService(
+ self.node, "/dashboard_client/power_on", Trigger, timeout=TIMEOUT_WAIT_SERVICE_INITIAL
+ )
+
+ # Connect to all other expected services
+ dashboard_interfaces = {
+ "/dashboard_client/power_off": Trigger,
+ "/dashboard_client/brake_release": Trigger,
+ "/dashboard_client/unlock_protective_stop": Trigger,
+ "/dashboard_client/restart_safety": Trigger,
+ "/dashboard_client/get_robot_mode": GetRobotMode,
+ "/dashboard_client/load_installation": Load,
+ "/dashboard_client/load_program": Load,
+ "/dashboard_client/close_popup": Trigger,
+ "/dashboard_client/get_loaded_program": GetLoadedProgram,
+ "/dashboard_client/program_state": GetProgramState,
+ "/dashboard_client/program_running": IsProgramRunning,
+ "/dashboard_client/play": Trigger,
+ "/dashboard_client/stop": Trigger,
+ }
+ self.service_clients = {
+ srv_name: waitForService(self.node, f"{srv_name}", srv_type)
+ for (srv_name, srv_type) in dashboard_interfaces.items()
+ }
+
+ self.service_clients["/controller_manager/list_controllers"] = waitForService(
+ self.node,
+ "/controller_manager/list_controllers",
+ ListControllers,
+ timeout=TIMEOUT_WAIT_SERVICE_INITIAL,
+ )
+
+ # Add first client to dict
+ self.service_clients["/dashboard_client/power_on"] = power_on_client
+
+ self.urscript_pub = self.node.create_publisher(
+ StringMsg, "/urscript_interface/script_command", 1
+ )
+
+ def setUp(self):
+ # Start robot
+ empty_req = Trigger.Request()
+ self.call_service("/dashboard_client/power_on", empty_req)
+ self.call_service("/dashboard_client/brake_release", empty_req)
+
+ time.sleep(1)
+ robot_mode_resp = self.call_service(
+ "/dashboard_client/get_robot_mode", GetRobotMode.Request()
+ )
+ self.assertEqual(robot_mode_resp.robot_mode.mode, RobotMode.RUNNING)
+ self.call_service("/dashboard_client/stop", empty_req)
+ time.sleep(1)
+
+ io_controller_running = False
+
+ while not io_controller_running:
+ time.sleep(1)
+ response = self.call_service(
+ "/controller_manager/list_controllers", ListControllers.Request()
+ )
+ for controller in response.controller:
+ if controller.name == "io_and_status_controller":
+ io_controller_running = controller.state == "active"
+
+ def test_set_io(self):
+ """Test setting an IO using a direct program call."""
+ self.io_states_sub = self.node.create_subscription(
+ IOStates,
+ "/io_and_status_controller/io_states",
+ self.io_msg_cb,
+ rclpy.qos.qos_profile_system_default,
+ )
+
+ self.set_digout_checked(0, True)
+ time.sleep(1)
+ self.set_digout_checked(0, False)
+
+ self.io_msg = None
+ self.io_states_sub = self.node.create_subscription(
+ IOStates,
+ "/io_and_status_controller/io_states",
+ self.io_msg_cb,
+ rclpy.qos.qos_profile_system_default,
+ )
+
+ script_msg = StringMsg(
+ data="sec my_program():\n set_digital_out(0, False)\n set_digital_out(1,True)\nend"
+ )
+ self.urscript_pub.publish(script_msg)
+ self.check_pin_states([0, 1], [False, True])
+
+ time.sleep(1)
+
+ script_msg = StringMsg(
+ data="sec my_program():\n set_digital_out(0, True)\n set_digital_out(1,False)\nend"
+ )
+ self.urscript_pub.publish(script_msg)
+ self.check_pin_states([0, 1], [True, False])
+
+ def io_msg_cb(self, msg):
+ self.io_msg = msg
+
+ def set_digout_checked(self, pin, state):
+ self.io_msg = None
+
+ script_msg = StringMsg(data=f"set_digital_out({pin}, {state})")
+ self.urscript_pub.publish(script_msg)
+
+ self.check_pin_states([pin], [state])
+
+ def check_pin_states(self, pins, states):
+ pin_states = [not x for x in states]
+ end_time = time.time() + 50
+ while pin_states != states and time.time() < end_time:
+ rclpy.spin_once(self.node, timeout_sec=0.1)
+ if self.io_msg is not None:
+ for i, pin_id in enumerate(pins):
+ pin_states[i] = self.io_msg.digital_out_states[pin_id].state
+ self.assertIsNotNone(self.io_msg, "Did not receive an IO state in requested time.")
+ self.assertEqual(pin_states, states)
+
+ def call_service(self, srv_name, request):
+ self.node.get_logger().info(f"Calling service '{srv_name}' with request {request}")
+ future = self.service_clients[srv_name].call_async(request)
+ rclpy.spin_until_future_complete(self.node, future)
+ if future.result() is not None:
+ self.node.get_logger().info(f"Received result {future.result()}")
+ return future.result()
+ else:
+ raise Exception(f"Exception while calling service: {future.exception()}")
+
+
+def waitForService(node, srv_name, srv_type, timeout=TIMEOUT_WAIT_SERVICE):
+ client = node.create_client(srv_type, srv_name)
+ if client.wait_for_service(timeout) is False:
+ raise Exception(f"Could not reach service '{srv_name}' within timeout of {timeout}")
+
+ node.get_logger().info(f"Successfully connected to service '{srv_name}'")
+ return client