Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ros2component/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
<maintainer email="[email protected]">Michel Hidalgo</maintainer>
<license>Apache License 2.0</license>

<depend>ros2cli</depend>

<exec_depend>ament_index_python</exec_depend>
<exec_depend>composition_interfaces</exec_depend>
<exec_depend>rcl_interfaces</exec_depend>
<exec_depend>rclcpp_components</exec_depend>
<exec_depend>rclpy</exec_depend>
<exec_depend>ros2cli</exec_depend>
<exec_depend>ros2node</exec_depend>
<exec_depend>ros2param</exec_depend>
<exec_depend>ros2pkg</exec_depend>
Expand All @@ -24,6 +25,8 @@
<test_depend>ament_pep257</test_depend>
<test_depend>ament_xmllint</test_depend>
<test_depend>python3-pytest</test_depend>
<test_depend>ros2component_fixtures</test_depend>
<test_depend>ros_testing</test_depend>

<export>
<build_type>ament_python</build_type>
Expand Down
150 changes: 150 additions & 0 deletions ros2component/test/test_cli_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import unittest

from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch.actions import OpaqueFunction

from launch_ros.actions import Node

import launch_testing
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools
import launch_testing_ros.tools

import pytest

from rmw_implementation import get_available_rmw_implementations

LIST_THREE_NODES = """\
/ComponentManager
1 /listener
2 /talker
3 /talker
"""


# TODO(BMarchi): Opensplice doesn't get along with running any cli command with ExecuteProcess,
# it just hangs there making `wait_for_timeout` fail. All tests should run fine with opensplice.
@pytest.mark.rostest
@launch_testing.parametrize(
'rmw_implementation',
[v for v in get_available_rmw_implementations() if v != 'rmw_opensplice_cpp'])
def generate_test_description(rmw_implementation, ready_fn):
additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
component_node = Node(
package='rclcpp_components', node_executable='component_container', output='screen')
listener_command_action = ExecuteProcess(
cmd=['ros2', 'component', 'load', '/ComponentManager',
'ros2component_fixtures', 'ros2component_fixtures::Listener'],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
'PYTHONUNBUFFERED': '1'
},
name='ros2component-cli',
output='screen',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'component', 'load', '/ComponentManager',
'ros2component_fixtures', 'ros2component_fixtures::Talker'],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
'PYTHONUNBUFFERED': '1'
},
name='ros2component-cli',
output='screen',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'component', 'load', '/ComponentManager',
'ros2component_fixtures', 'ros2component_fixtures::Talker'],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
'PYTHONUNBUFFERED': '1'
},
name='ros2component-cli',
output='screen'
)
]
)
]
)
return LaunchDescription([
# Always restart daemon to isolate tests.
ExecuteProcess(
cmd=['ros2', 'daemon', 'stop'],
name='daemon-stop',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'daemon', 'start'],
name='daemon-start',
on_exit=[
component_node,
OpaqueFunction(function=lambda context: ready_fn()),
listener_command_action
],
additional_env=additional_env
)
]
),
]), locals()


class TestROS2ComponentListCLI(unittest.TestCase):

@classmethod
def setUpClass(
cls,
launch_service,
proc_info,
proc_output,
rmw_implementation
):
@contextlib.contextmanager
def launch_node_command(self, arguments):
node_command_action = ExecuteProcess(
cmd=['ros2', 'component', *arguments],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
'PYTHONUNBUFFERED': '1'
},
name='ros2component-cli',
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, node_command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
# ignore ros2cli daemon nodes
filtered_patterns=['.*ros2cli.*'],
filtered_rmw_implementation=rmw_implementation
)
) as node_command:
yield node_command
cls.launch_node_command = launch_node_command
cls.rmw_implementation = rmw_implementation

@launch_testing.markers.retry_on_failure(times=5)
def test_list_verb(self):
with self.launch_node_command(
arguments=['list']) as list_command:
assert list_command.wait_for_shutdown(timeout=5)
assert list_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_text=LIST_THREE_NODES,
text=list_command.output,
strict=True
)
196 changes: 196 additions & 0 deletions ros2component/test/test_cli_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
# Copyright 2019 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import unittest

from launch import LaunchDescription
from launch.actions import ExecuteProcess
from launch.actions import OpaqueFunction

from launch_ros.actions import Node

import launch_testing
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools
import launch_testing_ros.tools

import pytest

from rmw_implementation import get_available_rmw_implementations


# TODO(BMarchi): Opensplice doesn't get along with running any cli command with ExecuteProcess,
# it just hangs there making `wait_for_timeout` fail. All tests should run fine with opensplice.
@pytest.mark.rostest
@launch_testing.parametrize(
'rmw_implementation',
[v for v in get_available_rmw_implementations() if v != 'rmw_opensplice_cpp'])
def generate_test_description(rmw_implementation, ready_fn):
additional_env = {'RMW_IMPLEMENTATION': rmw_implementation}
component_node = Node(
package='rclcpp_components', node_executable='component_container', output='screen')
return LaunchDescription([
# Always restart daemon to isolate tests.
ExecuteProcess(
cmd=['ros2', 'daemon', 'stop'],
name='daemon-stop',
on_exit=[
ExecuteProcess(
cmd=['ros2', 'daemon', 'start'],
name='daemon-start',
on_exit=[
component_node,
OpaqueFunction(function=lambda context: ready_fn())
],
additional_env=additional_env
)
]
),
]), locals()


class TestROS2ComponentLoadCLI(unittest.TestCase):

@classmethod
def setUpClass(
cls,
launch_service,
proc_info,
proc_output,
rmw_implementation
):
@contextlib.contextmanager
def launch_node_command(self, arguments):
node_command_action = ExecuteProcess(
cmd=['ros2', 'component', *arguments],
additional_env={
'RMW_IMPLEMENTATION': rmw_implementation,
'PYTHONUNBUFFERED': '1'
},
name='ros2component-cli',
output='screen'
)
with launch_testing.tools.launch_process(
launch_service, node_command_action, proc_info, proc_output,
output_filter=launch_testing_ros.tools.basic_output_filter(
# ignore ros2cli daemon nodes
filtered_patterns=['.*ros2cli.*'],
filtered_rmw_implementation=rmw_implementation
)
) as node_command:
yield node_command
cls.launch_node_command = launch_node_command

@launch_testing.markers.retry_on_failure(times=2)
def test_load_verb(self):
with self.launch_node_command(
arguments=[
'load', '/ComponentManager',
'ros2component_fixtures', 'ros2component_fixtures::Talker']) as talker_node:
assert talker_node.wait_for_shutdown(timeout=20)
assert talker_node.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
"Loaded component 1 into '/ComponentManager' "
"container node as '/talker'"],
text=talker_node.output,
strict=True
)
with self.launch_node_command(
arguments=[
'load', '/ComponentManager',
'ros2component_fixtures',
'ros2component_fixtures::Listener']) as listener_node:
assert listener_node.wait_for_shutdown(timeout=20)
assert listener_node.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
"Loaded component 2 into '/ComponentManager' "
"container node as '/listener'"],
text=listener_node.output,
strict=True
)
with self.launch_node_command(
arguments=[
'unload', '/ComponentManager', '1']) as unload_command:
assert unload_command.wait_for_shutdown(timeout=20)
assert unload_command.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=["Unloaded component 1 from '/ComponentManager' container"],
text=unload_command.output,
strict=True
)
# Test the unique id for loaded nodes.
with self.launch_node_command(
arguments=[
'load', '/ComponentManager',
'ros2component_fixtures', 'ros2component_fixtures::Talker']) as talker_node:
assert talker_node.wait_for_shutdown(timeout=20)
assert talker_node.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
"Loaded component 3 into '/ComponentManager' "
"container node as '/talker'"],
text=talker_node.output,
strict=True
)
# Test we can load the same node more than once.
with self.launch_node_command(
arguments=[
'load', '/ComponentManager',
'ros2component_fixtures', 'ros2component_fixtures::Talker']) as talker_node:
assert talker_node.wait_for_shutdown(timeout=20)
assert talker_node.exit_code == launch_testing.asserts.EXIT_OK
assert launch_testing.tools.expect_output(
expected_lines=[
"Loaded component 4 into '/ComponentManager' "
"container node as '/talker'"],
text=talker_node.output,
strict=True
)

@launch_testing.markers.retry_on_failure(times=2)
def test_load_verb_nonexistent_class(self):
with self.launch_node_command(
arguments=[
'load', '/ComponentManager',
'ros2component_fixtures',
'ros2component_fixtures::NonExistingPlugin']) as command:
assert command.wait_for_shutdown(timeout=20)
assert command.exit_code == 1
assert launch_testing.tools.expect_output(
expected_lines=[
'Failed to load component: '
'Failed to find class with the requested plugin name.'],
text=command.output,
strict=True
)

@launch_testing.markers.retry_on_failure(times=2)
def test_load_verb_nonexistent_plugin(self):
with self.launch_node_command(
arguments=[
'load', '/ComponentManager',
'non_existent_plugin', 'non_existent_plugin::Test']) as command:
assert command.wait_for_shutdown(timeout=20)
assert command.exit_code == 1
assert launch_testing.tools.expect_output(
expected_lines=[
'Failed to load component: '
'Could not find requested resource in ament index'],
text=command.output,
strict=True
)
Loading