diff --git a/ros2node/ros2node/api/__init__.py b/ros2node/ros2node/api/__init__.py index 39e452f08..40d88afc6 100644 --- a/ros2node/ros2node/api/__init__.py +++ b/ros2node/ros2node/api/__init__.py @@ -16,6 +16,9 @@ from typing import Any from typing import List +from rcl_interfaces.msg import LoggerLevel +from rcl_interfaces.srv import SetLoggerLevels +import rclpy from rclpy.node import HIDDEN_NODE_PREFIX from ros2cli.helpers import wait_for from ros2cli.node.strategy import NodeStrategy @@ -28,6 +31,14 @@ NodeName = namedtuple('NodeName', ('name', 'namespace', 'full_name')) TopicInfo = namedtuple('Topic', ('name', 'types')) +LEVEL_STR_TO_ENUM = { + 'DEBUG': LoggerLevel.LOG_LEVEL_DEBUG, + 'INFO': LoggerLevel.LOG_LEVEL_INFO, + 'WARN': LoggerLevel.LOG_LEVEL_WARN, + 'ERROR': LoggerLevel.LOG_LEVEL_ERROR, + 'FATAL': LoggerLevel.LOG_LEVEL_FATAL, +} + def _is_hidden_name(name): # note, we're assuming the hidden node prefix is the same for other hidden names @@ -145,6 +156,47 @@ def get_action_client_info(*, node, remote_node_name, include_hidden=False): for n, t in names_and_types if include_hidden or not _is_hidden_name(n)] +def call_log_level_set(node, node_name, level): + """ + Set the log level of the specified node using the SetLoggerLevels ROS service. + + :param node: The rclpy node to use as the client + :param node_name: The full name of the target node + :param level: The log level as a string (e.g., 'DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL') + """ + # Prepare the service name for the target node + service_name = f'{node_name}/set_logger_levels' + client = node.create_client(SetLoggerLevels, service_name) + + if not client.service_is_ready(): + raise RuntimeError(f'Service not available. Are the logging services enabled?') + + # Prepare the request + level_value = LEVEL_STR_TO_ENUM.get(level.upper(), None) + if level_value is None: + raise ValueError( + f'Invalid log level "{level}". Valid levels: {", ".join(LEVEL_STR_TO_ENUM.keys())}') + + request = SetLoggerLevels.Request() + logger_level = LoggerLevel() + logger_level.name = node_name + logger_level.level = level_value + request.levels = [logger_level] + + # Call the service + future = client.call_async(request) + rclpy.spin_until_future_complete(node, future) + if future.result() is not None: + res = future.result() + if not all(r.successful for r in res.results): + raise RuntimeError( + f'Failed to set log level for node "{node_name}": {res.results}') + else: + print(f'Successfully set log level of "{node_name}" to {level}.') + else: + raise RuntimeError(f'Failed to set log level: {future.exception()}') + + class NodeNameCompleter: """Callable returning a list of node names.""" diff --git a/ros2node/ros2node/verb/log.py b/ros2node/ros2node/verb/log.py new file mode 100644 index 000000000..071a82b0b --- /dev/null +++ b/ros2node/ros2node/verb/log.py @@ -0,0 +1,36 @@ +# Copyright (c) 2025 Natesh Narain +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ros2cli.node.direct import DirectNode +from ros2cli.node.strategy import add_arguments +from ros2node.api import call_log_level_set, LEVEL_STR_TO_ENUM +from ros2node.verb import VerbExtension + + +class LogVerb(VerbExtension): + """Set the log level of a node.""" + + def add_arguments(self, parser, cli_name): + add_arguments(parser) + + parser.add_argument('node_name', help='The name of the node') + parser.add_argument('-l', '--logger-name', help='Logger name if different from node name') + parser.add_argument('level', choices=LEVEL_STR_TO_ENUM.keys(), help='Log level') + + def main(self, *, args): + with DirectNode(args) as node: + if args.logger_name is None: + args.logger_name = args.node_name.lstrip('/') + # Call the service to set the log level + call_log_level_set(node, args.logger_name, args.level) diff --git a/ros2node/setup.py b/ros2node/setup.py index 998a14b50..5ac7e6426 100644 --- a/ros2node/setup.py +++ b/ros2node/setup.py @@ -43,6 +43,7 @@ 'ros2node.verb': [ 'info = ros2node.verb.info:InfoVerb', 'list = ros2node.verb.list:ListVerb', + 'log = ros2node.verb.log:LogVerb', ], } ) diff --git a/ros2node/test/test_cli.py b/ros2node/test/test_cli.py index 7777db1b1..c9b1fde10 100644 --- a/ros2node/test/test_cli.py +++ b/ros2node/test/test_cli.py @@ -230,3 +230,10 @@ def test_info_hidden_node_hidden_flag(self): ]), text=node_command.output, strict=False ), 'Output does not match:\n' + node_command.output + + @launch_testing.markers.retry_on_failure(times=5, delay=1) + def test_change_log_level(self): + with self.launch_node_command(arguments=['log', '/complex_node', 'DEBUG']) as node_command: + assert node_command.wait_for_shutdown(timeout=10) + assert node_command.exit_code == launch_testing.asserts.EXIT_OK + assert node_command.output == 'Successfully set log level of "/complex_node" to DEBUG.\n'