Skip to content

Commit c24c7b5

Browse files
Support 'ros2 action echo' (#978)
* Support 'ros2 action echo' Signed-off-by: Barry Xu <[email protected]> * Address review comments Signed-off-by: Barry Xu <[email protected]> --------- Signed-off-by: Barry Xu <[email protected]>
1 parent 9a0c044 commit c24c7b5

File tree

5 files changed

+745
-0
lines changed

5 files changed

+745
-0
lines changed

ros2action/ros2action/api/__init__.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from rclpy.expand_topic_name import expand_topic_name
16+
from rclpy.node import Node
1617
from rclpy.validate_full_topic_name import validate_full_topic_name
1718
from ros2cli.node.strategy import NodeStrategy
1819
from rosidl_runtime_py import get_action_interfaces
@@ -66,6 +67,43 @@ def get_action_names(*, node):
6667
return [n for (n, t) in action_names_and_types]
6768

6869

70+
def get_action_class(node: Node, action_name: str):
71+
"""
72+
Load action type module for the given action.
73+
74+
The action should be running for this function to find the action type.
75+
:param node: The node object of rclpy Node class.
76+
:param action_name: The fully-qualified name of the action.
77+
:return: the action class or None
78+
"""
79+
action_names_and_types = get_action_names_and_types(node=node)
80+
81+
matched_names_and_types = list(filter(lambda x: x[0] == action_name, action_names_and_types))
82+
if len(matched_names_and_types) < 1:
83+
raise RuntimeError(f"Cannot find type for '{action_name}'")
84+
if len(matched_names_and_types) > 1:
85+
raise RuntimeError(f"Unexpectedly saw more than one entry for action '{action_name}'")
86+
87+
# Now check whether there are multiple types associated with this action, which is unsupported
88+
action_name_and_types = matched_names_and_types[0]
89+
90+
types = action_name_and_types[1]
91+
if len(types) < 1:
92+
raise RuntimeError(f"No types associated with '{action_name}'")
93+
if len(types) > 1:
94+
raise RuntimeError(f"More than one type associated with action '{action_name}'")
95+
96+
action_type = types[0]
97+
98+
if action_type is None:
99+
return None
100+
101+
try:
102+
return get_action(action_type)
103+
except (AttributeError, ModuleNotFoundError, ValueError):
104+
raise RuntimeError(f"The action type '{action_type}' is invalid")
105+
106+
69107
def action_name_completer(prefix, parsed_args, **kwargs):
70108
"""Callable returning a list of action names."""
71109
with NodeStrategy(parsed_args) as node:

ros2action/ros2action/verb/echo.py

Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
# Copyright 2025 Sony Group Corporation.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from collections import OrderedDict
16+
from enum import Enum
17+
import queue
18+
import sys
19+
import threading
20+
21+
import rclpy
22+
from rclpy.qos import qos_profile_action_status_default
23+
from rclpy.qos import qos_profile_services_default
24+
from rclpy.qos import QoSProfile
25+
from rclpy.subscription import Subscription
26+
from rclpy.type_support import EventMessage
27+
from rclpy.type_support import MsgT
28+
29+
from ros2action.api import action_name_completer
30+
from ros2action.api import action_type_completer
31+
from ros2action.api import get_action_class
32+
from ros2action.verb import VerbExtension
33+
34+
from ros2cli.helpers import unsigned_int
35+
from ros2cli.node.strategy import NodeStrategy
36+
37+
from rosidl_runtime_py import message_to_csv
38+
from rosidl_runtime_py import message_to_ordereddict
39+
from rosidl_runtime_py.utilities import get_action
40+
41+
from service_msgs.msg import ServiceEventInfo
42+
43+
import yaml
44+
45+
DEFAULT_TRUNCATE_LENGTH = 128
46+
47+
48+
class ActionInterfaces(Enum):
49+
GOAL_SERVICE = 'GOAL_SERVICE'
50+
CANCEL_SERVICE = 'CANCEL_SERVICE'
51+
RESULT_SERVICE = 'RESULT_SERVICE'
52+
FEEDBACK_TOPIC = 'FEEDBACK_TOPIC'
53+
STATUS_TOPIC = 'STATUS_TOPIC'
54+
55+
56+
class EchoVerb(VerbExtension):
57+
"""Echo a action."""
58+
59+
# Custom representer for getting clean YAML output that preserves the order in an OrderedDict.
60+
# Inspired by: http://stackoverflow.com/a/16782282/7169408
61+
def __represent_ordereddict(self, dumper, data):
62+
items = []
63+
for k, v in data.items():
64+
items.append((dumper.represent_data(k), dumper.represent_data(v)))
65+
return yaml.nodes.MappingNode(u'tag:yaml.org,2002:map', items)
66+
67+
def __init__(self):
68+
self._event_number_to_name = {}
69+
for k, v in ServiceEventInfo._Metaclass_ServiceEventInfo__constants.items():
70+
self._event_number_to_name[v] = k
71+
72+
yaml.add_representer(OrderedDict, self.__represent_ordereddict)
73+
74+
def add_arguments(self, parser, cli_name):
75+
arg = parser.add_argument(
76+
'action_name',
77+
help="Name of the ROS action to echo (e.g. '/fibonacci')")
78+
arg.completer = action_name_completer
79+
arg = parser.add_argument(
80+
'action_type', nargs='?',
81+
help="Type of the ROS action (e.g. 'example_interfaces/action/Fibonacci')")
82+
arg.completer = action_type_completer
83+
parser.add_argument(
84+
'--interfaces', '-i', type=str, metavar='interfaces_name_list',
85+
help='Specify a action interfaces list are output. "|" is used as a delimiter for '
86+
'multiple action interfaces. Action interfaces include "GOAL_SERVICE", '
87+
'"CANCEL_SERVICE", "RESULT_SERVICE", "FEEDBACK_TOPIC" and "STATUS_TOPIC". '
88+
'If this option is not set, output messages from all interfaces of the action.')
89+
parser.add_argument(
90+
'--csv', action='store_true', default=False,
91+
help=(
92+
'Output all recursive fields separated by commas (e.g. for plotting).'
93+
))
94+
parser.add_argument(
95+
'--full-length', '-f', action='store_true',
96+
help='Output all elements for arrays, bytes, and string with a '
97+
"length > '--truncate-length', by default they are truncated "
98+
"after '--truncate-length' elements with '...''")
99+
parser.add_argument(
100+
'--truncate-length', '-l', type=unsigned_int, default=DEFAULT_TRUNCATE_LENGTH,
101+
help='The length to truncate arrays, bytes, and string to '
102+
'(default: %d)' % DEFAULT_TRUNCATE_LENGTH)
103+
parser.add_argument(
104+
'--no-arr', action='store_true', help="Don't print array fields of messages")
105+
parser.add_argument(
106+
'--no-str', action='store_true', help="Don't print string fields of messages")
107+
parser.add_argument(
108+
'--flow-style', action='store_true',
109+
help='Print collections in the block style (not available with csv format)')
110+
111+
def main(self, *, args):
112+
output_interface_list = set()
113+
if args.interfaces is not None:
114+
if args.interfaces: # Not empty string
115+
# Fill in the user-specified action interface.
116+
interfaces = args.interfaces.split('|')
117+
for interface in interfaces:
118+
if interface.strip().upper() in ActionInterfaces:
119+
output_interface_list.add(interface.strip().upper())
120+
else:
121+
return f'"{interface.strip()}" is incorrect interface name.'
122+
else:
123+
return 'Input action interface name is empty.'
124+
else:
125+
# By default, output all the action interfaces.
126+
output_interface_list.update([interface.value for interface in ActionInterfaces])
127+
128+
if args.action_type is None:
129+
with NodeStrategy(args) as node:
130+
try:
131+
action_module = get_action_class(
132+
node, args.action_name)
133+
except (AttributeError, ModuleNotFoundError, ValueError):
134+
raise RuntimeError(f"The action name '{args.action_name}' is invalid")
135+
else:
136+
try:
137+
action_module = get_action(args.action_type)
138+
except (AttributeError, ModuleNotFoundError, ValueError):
139+
raise RuntimeError(f"The service type '{args.action_type}' is invalid")
140+
141+
if action_module is None:
142+
raise RuntimeError(f"Could not load the action type for '{args.action_type}'")
143+
144+
self.csv = args.csv
145+
self.truncate_length = args.truncate_length if not args.full_length else None
146+
self.flow_style = args.flow_style
147+
self.no_arr = args.no_arr
148+
self.no_str = args.no_str
149+
150+
send_goal_event_topic = args.action_name + '/_action/send_goal/_service_event'
151+
send_goal_event_msg_type = action_module.Impl.SendGoalService.Event
152+
153+
cancel_goal_event_topic = args.action_name + '/_action/cancel_goal/_service_event'
154+
cancel_goal_event_msg_type = action_module.Impl.CancelGoalService.Event
155+
156+
get_result_event_topic = args.action_name + '/_action/get_result/_service_event'
157+
get_result_event_msg_type = action_module.Impl.GetResultService.Event
158+
159+
feedback_topic = args.action_name + '/_action/feedback'
160+
feedback_topic_type = action_module.Impl.FeedbackMessage
161+
162+
status_topic = args.action_name + '/_action/status'
163+
status_topic_type = action_module.Impl.GoalStatusMessage
164+
165+
# Queue for messages from above topic. The queue size is set to 100.
166+
# If the queue is full, the message will be dropped.
167+
self.output_msg_queue = queue.Queue(100)
168+
169+
run_thread = True
170+
# Create a thread to output message from output_queue
171+
172+
def message_handler():
173+
while run_thread:
174+
try:
175+
message = self.output_msg_queue.get(block=True, timeout=0.5)
176+
self.output_msg_queue.task_done()
177+
except queue.Empty:
178+
continue
179+
print(message)
180+
output_msg_thread = threading.Thread(target=message_handler)
181+
output_msg_thread.start()
182+
183+
with NodeStrategy(args) as node:
184+
send_goal_event_sub = None
185+
# TODO: The QoS for the service event publisher, feedback publisher and status
186+
# publisher can be specified by the user, so new parameters need to be added to allow
187+
# specifying QoS of subscription to replace the current fixed QoS.
188+
if ActionInterfaces.GOAL_SERVICE.value in output_interface_list:
189+
send_goal_event_sub: Subscription[EventMessage] = node.create_subscription(
190+
send_goal_event_msg_type,
191+
send_goal_event_topic,
192+
self._send_goal_subscriber_callback,
193+
qos_profile_services_default)
194+
195+
cancel_goal_event_sub = None
196+
if ActionInterfaces.CANCEL_SERVICE.value in output_interface_list:
197+
cancel_goal_event_sub: Subscription[EventMessage] = node.create_subscription(
198+
cancel_goal_event_msg_type,
199+
cancel_goal_event_topic,
200+
self._cancel_goal_subscriber_callback,
201+
qos_profile_services_default)
202+
203+
get_result_event_sub = None
204+
if ActionInterfaces.RESULT_SERVICE.value in output_interface_list:
205+
get_result_event_sub: Subscription[EventMessage] = node.create_subscription(
206+
get_result_event_msg_type,
207+
get_result_event_topic,
208+
self._get_result_subscriber_callback,
209+
qos_profile_services_default)
210+
211+
feedback_sub = None
212+
if ActionInterfaces.FEEDBACK_TOPIC.value in output_interface_list:
213+
feedback_sub: Subscription[MsgT] = node.create_subscription(
214+
feedback_topic_type,
215+
feedback_topic,
216+
self._feedback_subscriber_callback,
217+
QoSProfile(depth=10)) # QoS setting refers to action client implementation
218+
219+
status_sub = None
220+
if ActionInterfaces.STATUS_TOPIC.value in output_interface_list:
221+
status_sub: Subscription[MsgT] = node.create_subscription(
222+
status_topic_type,
223+
status_topic,
224+
self._status_subscriber_callback,
225+
qos_profile_action_status_default)
226+
227+
executor: rclpy.Executor = rclpy.get_global_executor()
228+
try:
229+
executor.add_node(node)
230+
while executor.context.ok():
231+
executor.spin_once()
232+
except KeyboardInterrupt:
233+
pass
234+
finally:
235+
executor.remove_node(node)
236+
237+
if send_goal_event_sub:
238+
send_goal_event_sub.destroy()
239+
if cancel_goal_event_sub:
240+
cancel_goal_event_sub.destroy()
241+
if get_result_event_sub:
242+
get_result_event_sub.destroy()
243+
if feedback_sub:
244+
feedback_sub.destroy()
245+
if status_sub:
246+
status_sub.destroy()
247+
248+
run_thread = False
249+
if output_msg_thread.is_alive():
250+
output_msg_thread.join(1)
251+
252+
def _send_goal_subscriber_callback(self, msg):
253+
self._base_subscriber_callback(msg, ActionInterfaces.GOAL_SERVICE.value)
254+
255+
def _cancel_goal_subscriber_callback(self, msg):
256+
self._base_subscriber_callback(msg, ActionInterfaces.CANCEL_SERVICE.value)
257+
258+
def _get_result_subscriber_callback(self, msg):
259+
self._base_subscriber_callback(msg, ActionInterfaces.RESULT_SERVICE.value)
260+
261+
def _feedback_subscriber_callback(self, msg):
262+
self._base_subscriber_callback(msg, ActionInterfaces.FEEDBACK_TOPIC.value)
263+
264+
def _status_subscriber_callback(self, msg):
265+
self._base_subscriber_callback(msg, ActionInterfaces.STATUS_TOPIC.value)
266+
267+
def _base_subscriber_callback(self, msg, interface: str):
268+
to_print = 'interface: ' + interface + '\n'
269+
if self.csv:
270+
to_print += message_to_csv(msg, truncate_length=self.truncate_length,
271+
no_arr=self.no_arr, no_str=self.no_str)
272+
else:
273+
# The "easy" way to print out a representation here is to call message_to_yaml().
274+
# However, the message contains numbers for the event type, but we want to show
275+
# meaningful names to the user. So we call message_to_ordereddict() instead,
276+
# and replace the numbers with meaningful names before dumping to YAML.
277+
msgdict = message_to_ordereddict(msg, truncate_length=self.truncate_length,
278+
no_arr=self.no_arr, no_str=self.no_str)
279+
280+
if 'info' in msgdict:
281+
info = msgdict['info']
282+
if 'event_type' in info:
283+
info['event_type'] = self._event_number_to_name[info['event_type']]
284+
285+
to_print += yaml.dump(msgdict, allow_unicode=True, width=sys.maxsize,
286+
default_flow_style=self.flow_style)
287+
288+
to_print += '---'
289+
try:
290+
self.output_msg_queue.put(to_print, timeout=0.5)
291+
except queue.Full:
292+
print('Output message is full! Please increase the queue size of output message by '
293+
'"--queue_size"')

ros2action/setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
'send_goal = ros2action.verb.send_goal:SendGoalVerb',
4646
'type = ros2action.verb.type:TypeVerb',
4747
'find = ros2action.verb.find:FindVerb',
48+
'echo = ros2action.verb.echo:EchoVerb',
4849
],
4950
}
5051
)

0 commit comments

Comments
 (0)