Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions ros2node/ros2node/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from typing import Any
from typing import List

from rcl_interfaces.msg import LoggerLevel
from rcl_interfaces.srv import SetLoggerLevels
import rclpy
from rclpy.node import HIDDEN_NODE_PREFIX
from ros2cli.helpers import wait_for
from ros2cli.node.strategy import NodeStrategy
Expand All @@ -28,6 +31,14 @@
NodeName = namedtuple('NodeName', ('name', 'namespace', 'full_name'))
TopicInfo = namedtuple('Topic', ('name', 'types'))

LEVEL_STR_TO_ENUM = {
'DEBUG': LoggerLevel.LOG_LEVEL_DEBUG,
'INFO': LoggerLevel.LOG_LEVEL_INFO,
'WARN': LoggerLevel.LOG_LEVEL_WARN,
'ERROR': LoggerLevel.LOG_LEVEL_ERROR,
'FATAL': LoggerLevel.LOG_LEVEL_FATAL,
}


def _is_hidden_name(name):
# note, we're assuming the hidden node prefix is the same for other hidden names
Expand Down Expand Up @@ -145,6 +156,47 @@ def get_action_client_info(*, node, remote_node_name, include_hidden=False):
for n, t in names_and_types if include_hidden or not _is_hidden_name(n)]


def call_log_level_set(node, node_name, level):
"""
Set the log level of the specified node using the SetLoggerLevels ROS service.

:param node: The rclpy node to use as the client
:param node_name: The full name of the target node
:param level: The log level as a string (e.g., 'DEBUG', 'INFO', 'WARN', 'ERROR', 'FATAL')
"""
# Prepare the service name for the target node
service_name = f'{node_name}/set_logger_levels'
client = node.create_client(SetLoggerLevels, service_name)

if not client.service_is_ready():
raise RuntimeError(f'Service not available. Are the logging services enabled?')

# Prepare the request
level_value = LEVEL_STR_TO_ENUM.get(level.upper(), None)
if level_value is None:
raise ValueError(
f'Invalid log level "{level}". Valid levels: {", ".join(LEVEL_STR_TO_ENUM.keys())}')

request = SetLoggerLevels.Request()
logger_level = LoggerLevel()
logger_level.name = node_name
logger_level.level = level_value
request.levels = [logger_level]

# Call the service
future = client.call_async(request)
rclpy.spin_until_future_complete(node, future)
if future.result() is not None:
res = future.result()
if not all(r.successful for r in res.results):
raise RuntimeError(
f'Failed to set log level for node "{node_name}": {res.results}')
else:
print(f'Successfully set log level of "{node_name}" to {level}.')
else:
raise RuntimeError(f'Failed to set log level: {future.exception()}')


class NodeNameCompleter:
"""Callable returning a list of node names."""

Expand Down
36 changes: 36 additions & 0 deletions ros2node/ros2node/verb/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright (c) 2025 Natesh Narain
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ros2cli.node.direct import DirectNode
from ros2cli.node.strategy import add_arguments
from ros2node.api import call_log_level_set, LEVEL_STR_TO_ENUM
from ros2node.verb import VerbExtension


class LogVerb(VerbExtension):
"""Set the log level of a node."""

def add_arguments(self, parser, cli_name):
add_arguments(parser)

parser.add_argument('node_name', help='The name of the node')
parser.add_argument('-l', '--logger-name', help='Logger name if different from node name')
parser.add_argument('level', choices=LEVEL_STR_TO_ENUM.keys(), help='Log level')

def main(self, *, args):
with DirectNode(args) as node:
if args.logger_name is None:
args.logger_name = args.node_name.lstrip('/')
# Call the service to set the log level
call_log_level_set(node, args.logger_name, args.level)
1 change: 1 addition & 0 deletions ros2node/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
'ros2node.verb': [
'info = ros2node.verb.info:InfoVerb',
'list = ros2node.verb.list:ListVerb',
'log = ros2node.verb.log:LogVerb',
],
}
)
7 changes: 7 additions & 0 deletions ros2node/test/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,10 @@ def test_info_hidden_node_hidden_flag(self):
]),
text=node_command.output, strict=False
), 'Output does not match:\n' + node_command.output

@launch_testing.markers.retry_on_failure(times=5, delay=1)
def test_change_log_level(self):
with self.launch_node_command(arguments=['log', '/complex_node', 'DEBUG']) as node_command:
assert node_command.wait_for_shutdown(timeout=10)
assert node_command.exit_code == launch_testing.asserts.EXIT_OK
assert node_command.output == 'Successfully set log level of "/complex_node" to DEBUG.\n'