1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15+ import io
1516import os
1617import time
1718import unittest
1819
1920from launch import LaunchDescription
2021from launch import LaunchService
21- from launch.actions import EmitEvent
22- from launch.actions import ExecuteProcess
23- from launch.actions import RegisterEventHandler
24- from launch.event_handlers import OnProcessIO
25- from launch.events import Shutdown
22+ import launch.actions
23+ import launch.event_handlers
24+ import launch.events
2625
2726
2827# TODO(clalancette): we can't currently use launch_testing since there is
@@ -37,6 +36,10 @@ class TestExecutablesDemo(unittest.TestCase):
3736 self._ls.include_launch_description(self.generate_launch_description())
3837 self._saw_cam2image_output = False
3938 self._saw_showimage_output = False
39+ # Windows can emit partial lines, so buffer the data and only check
40+ # check when we have a complete line
41+ self._cam2image_buffer = io.StringIO()
42+ self._showimage_buffer = io.StringIO()
4043
4144 def generate_launch_description(self):
4245 launch_description = LaunchDescription()
@@ -58,7 +61,7 @@ class TestExecutablesDemo(unittest.TestCase):
5861
5962 command = [showimage_executable]
6063 command.extend(subscriber_executable_args)
61- showimage_process = ExecuteProcess(
64+ showimage_process = launch.actions. ExecuteProcess(
6265 cmd=command,
6366 name=showimage_name,
6467 env=env,
@@ -73,7 +76,7 @@ class TestExecutablesDemo(unittest.TestCase):
7376
7477 command = [cam2image_executable]
7578 command.extend(publisher_executable_args)
76- cam2image_process = ExecuteProcess(
79+ cam2image_process = launch.actions. ExecuteProcess(
7780 cmd=command,
7881 name=cam2image_name,
7982 env=env,
@@ -82,31 +85,53 @@ class TestExecutablesDemo(unittest.TestCase):
8285 launch_description.add_action(cam2image_process)
8386
8487 launch_description.add_action(
85- RegisterEventHandler(
86- OnProcessIO(
88+ launch.actions. RegisterEventHandler(
89+ launch.event_handlers. OnProcessIO(
8790 on_stdout=self.append,
8891 on_stderr=self.append,
8992 )
9093 )
9194 )
9295
96+ launch_description.add_action(
97+ launch.actions.TimerAction(
98+ period=10.0,
99+ actions=[launch.actions.Shutdown(reason='Timer expired')])
100+ )
101+
93102 return launch_description
94103
104+ def append_and_check(self, process_io, process_name, text_to_check):
105+ if process_name in process_io.process_name:
106+ buffer = getattr(self, '_' + process_name + '_buffer')
107+ buffer.write(process_io.text.decode(errors='replace'))
108+ buffer.seek(0)
109+ last_line = None
110+ for line in buffer:
111+ # Note that this does not use os.linesep; apparently rclpy
112+ # node.get_logger().info doesn't use the OS line separator
113+ if line.endswith('\n'):
114+ # We have a complete line, see if it has what we want
115+ if text_to_check in line[:-len(os.linesep)]:
116+ setattr(self, '_saw_' + process_name + '_output', True)
117+ break
118+ else:
119+ last_line = line
120+ break
121+ buffer.seek(0)
122+ buffer.truncate(0)
123+ if last_line is not None:
124+ buffer.write(last_line)
125+
95126 def append(self, process_io):
96- if 'cam2image' in process_io.process_name:
97- if 'Publishing image #' in process_io.text.decode():
98- self._saw_cam2image_output = True
99- elif 'showimage' in process_io.process_name:
100- if 'Received image #' in process_io.text.decode():
101- self._saw_showimage_output = True
127+ self.append_and_check(process_io, 'cam2image', 'Publishing image #')
128+ self.append_and_check(process_io, 'showimage', 'Received image #')
102129
103130 if self._saw_cam2image_output and self._saw_showimage_output:
104131 # We've seen all required arguments from the test, quit
105- return EmitEvent(event=Shutdown(reason='finished', due_to_sigint=False))
106-
107- if time.time() - self._start_time > 10.0:
108- # We've waited more than 10 seconds with no output; raise an error
109- return EmitEvent(event=Shutdown(reason='timeout', due_to_sigint=False))
132+ return launch.actions.EmitEvent(
133+ event=launch.events.Shutdown(reason='finished', due_to_sigint=False)
134+ )
110135
111136 def test_reliable_qos(self):
112137 self._ls.run()
0 commit comments