diff --git a/ros2node/package.xml b/ros2node/package.xml
index f101f8f79..7e738b593 100644
--- a/ros2node/package.xml
+++ b/ros2node/package.xml
@@ -19,6 +19,8 @@
rclpy
ros2cli
+ rosidl_runtime_py
+ type_description_interfaces
ament_copyright
ament_flake8
diff --git a/ros2node/ros2node/verb/get_type_description.py b/ros2node/ros2node/verb/get_type_description.py
new file mode 100644
index 000000000..99a4c2d8f
--- /dev/null
+++ b/ros2node/ros2node/verb/get_type_description.py
@@ -0,0 +1,64 @@
+# Copyright 2023 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import rclpy
+from ros2cli.node import NODE_NAME_PREFIX
+from ros2node.api import NodeNameCompleter
+from ros2node.verb import VerbExtension
+from rosidl_runtime_py import message_to_yaml
+from type_description_interfaces.srv import GetTypeDescription
+
+
+class GetTypeDescriptionVerb(VerbExtension):
+ """Call a node's ~/get_type_description service and print the response."""
+
+ def add_arguments(self, parser, cli_name):
+ argument = parser.add_argument(
+ 'node_name',
+ help='Node name to request information')
+ argument.completer = NodeNameCompleter()
+ argument = parser.add_argument(
+ 'type_name',
+ help='ROS interface type name, in PACKAGE/NAMESPACE/TYPENAME format')
+ argument = parser.add_argument(
+ 'type_hash',
+ help='REP-2011 RIHS hash string')
+ argument = parser.add_argument(
+ '--include_type_sources', default=False, action='store_true',
+ help='Whether to return the original idl/msg/etc. source file(s) in the response')
+
+ def main(self, *, args):
+ rclpy.init()
+ node = rclpy.create_node(NODE_NAME_PREFIX + '_requester_%s_%s' %
+ ('type_description_interfaces', 'GetTypeDescription'))
+
+ service_name = args.node_name + '/get_type_description'
+ cli = node.create_client(GetTypeDescription, service_name)
+
+ if not cli.service_is_ready():
+ print(f'waiting for service {service_name} to become available...')
+ cli.wait_for_service()
+
+ request = GetTypeDescription.Request(type_name=args.type_name,
+ type_hash=args.type_hash,
+ include_type_sources=args.include_type_sources)
+ future = cli.call_async(request)
+ rclpy.spin_until_future_complete(node, future)
+ if future.result() is not None:
+ print(message_to_yaml(future.result()))
+ else:
+ raise RuntimeError('Exception while calling service: %r' % future.exception())
+
+ node.destroy_node()
+ rclpy.shutdown()
diff --git a/ros2node/setup.py b/ros2node/setup.py
index dc5248b2f..9723e411e 100644
--- a/ros2node/setup.py
+++ b/ros2node/setup.py
@@ -42,6 +42,7 @@
'ros2node.verb': [
'info = ros2node.verb.info:InfoVerb',
'list = ros2node.verb.list:ListVerb',
+ 'get_type_description = ros2node.verb.get_type_description:GetTypeDescriptionVerb',
],
}
)
diff --git a/ros2node/test/test_cli.py b/ros2node/test/test_cli.py
index 97f02ff4f..dafe790f5 100644
--- a/ros2node/test/test_cli.py
+++ b/ros2node/test/test_cli.py
@@ -229,3 +229,44 @@ def test_info_hidden_node_hidden_flag(self):
]),
text=node_command.output, strict=False
), 'Output does not match:\n' + node_command.output
+
+ @launch_testing.markers.retry_on_failure(times=5, delay=1)
+ def test_get_type_description_invalid_hash(self):
+ arguments = ['get_type_description', '/complex_node', 'some_type', 'some_hash']
+ with self.launch_node_command(arguments=arguments) as node_command:
+ assert node_command.wait_for_shutdown(timeout=10)
+ assert node_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.chain([
+ 'successful: false',
+ re.compile(r'failure_reason: .*'),
+ 'type_description:',
+ ' type_description:',
+ ' type_name: ''',
+ ' fields: []',
+ ' referenced_type_descriptions: []',
+ 'type_sources: []',
+ 'extra_information: []',
+ ]),
+ text=node_command.output, strict=False
+ ), 'Output does not match:\n' + node_command.output
+
+ @launch_testing.markers.retry_on_failure(times=5, delay=1)
+ def test_get_type_description_valid_hash(self):
+ arguments = [
+ 'get_type_description',
+ '/complex_node',
+ 'test_msgs.msg.Arrays',
+ # Type hash below is from test_msgs/share/test_msgs/msg/Arrays.json
+ 'RIHS01_8a321b80dae8c44dfd1ce558eaf1b1cd5ebbe5950863ed675836535511a7f91a'
+ ]
+ with self.launch_node_command(arguments=arguments) as node_command:
+ assert node_command.wait_for_shutdown(timeout=10)
+ assert node_command.exit_code == launch_testing.asserts.EXIT_OK
+ assert launch_testing.tools.expect_output(
+ expected_lines=itertools.chain([
+ 'successful: true',
+ "failure_reason: ''",
+ ]),
+ text=node_command.output, strict=False
+ ), 'Output does not match:\n' + node_command.output