Skip to content

Commit 6c970ea

Browse files
committed
Simple tools to interact with processes, needs more work
Signed-off-by: Ivan Santiago Paunovic <[email protected]>
1 parent 0abe6ac commit 6c970ea

File tree

6 files changed

+290
-47
lines changed

6 files changed

+290
-47
lines changed

launch/launch/actions/execute_process.py

Lines changed: 84 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ def __init__(
196196
]] = None,
197197
respawn: bool = False,
198198
respawn_delay: Optional[float] = None,
199+
cached_output: bool = False,
199200
**kwargs
200201
) -> None:
201202
"""
@@ -294,6 +295,8 @@ def __init__(
294295
:param: respawn if 'True', relaunch the process that abnormally died.
295296
Defaults to 'False'.
296297
:param: respawn_delay a delay time to relaunch the died process if respawn is 'True'.
298+
:param: cached_output if `True`, both stdout and stderr will be cached.
299+
Use get_stdout() and get_stderr() to read the buffered output.
297300
"""
298301
super().__init__(**kwargs)
299302
self.__cmd = [normalize_to_list_of_substitutions(x) for x in cmd]
@@ -340,6 +343,7 @@ def __init__(
340343
self.__sigkill_timer = None # type: Optional[TimerAction]
341344
self.__stdout_buffer = io.StringIO()
342345
self.__stderr_buffer = io.StringIO()
346+
self.__cached_output = cached_output
343347

344348
self.__executed = False
345349

@@ -591,59 +595,32 @@ def __on_process_stdin(
591595
cast(ProcessStdin, event)
592596
return None
593597

594-
def __on_process_stdout(
595-
self, event: ProcessIO
598+
def __on_process_output(
599+
self, event: ProcessIO, buffer, logger
596600
) -> Optional[SomeActionsType]:
597601
to_write = event.text.decode(errors='replace')
598-
if self.__stdout_buffer.closed:
599-
# __stdout_buffer was probably closed by __flush_buffers on shutdown. Output without
602+
if buffer.closed:
603+
# buffer was probably closed by __flush_buffers on shutdown. Output without
600604
# buffering.
601-
self.__stdout_logger.info(
602-
self.__output_format.format(line=to_write, this=self)
603-
)
604-
else:
605-
self.__stdout_buffer.write(to_write)
606-
self.__stdout_buffer.seek(0)
607-
last_line = None
608-
for line in self.__stdout_buffer:
609-
if line.endswith(os.linesep):
610-
self.__stdout_logger.info(
611-
self.__output_format.format(line=line[:-len(os.linesep)], this=self)
612-
)
613-
else:
614-
last_line = line
615-
break
616-
self.__stdout_buffer.seek(0)
617-
self.__stdout_buffer.truncate(0)
618-
if last_line is not None:
619-
self.__stdout_buffer.write(last_line)
620-
621-
def __on_process_stderr(
622-
self, event: ProcessIO
623-
) -> Optional[SomeActionsType]:
624-
to_write = event.text.decode(errors='replace')
625-
if self.__stderr_buffer.closed:
626-
# __stderr buffer was probably closed by __flush_buffers on shutdown. Output without
627-
# buffering.
628-
self.__stderr_logger.info(
605+
buffer.info(
629606
self.__output_format.format(line=to_write, this=self)
630607
)
631608
else:
632-
self.__stderr_buffer.write(to_write)
633-
self.__stderr_buffer.seek(0)
609+
buffer.write(to_write)
610+
buffer.seek(0)
634611
last_line = None
635-
for line in self.__stderr_buffer:
612+
for line in buffer:
636613
if line.endswith(os.linesep):
637-
self.__stderr_logger.info(
614+
logger.info(
638615
self.__output_format.format(line=line[:-len(os.linesep)], this=self)
639616
)
640617
else:
641618
last_line = line
642619
break
643-
self.__stderr_buffer.seek(0)
644-
self.__stderr_buffer.truncate(0)
620+
buffer.seek(0)
621+
buffer.truncate(0)
645622
if last_line is not None:
646-
self.__stderr_buffer.write(last_line)
623+
buffer.write(last_line)
647624

648625
def __flush_buffers(self, event, context):
649626
line = self.__stdout_buffer.getvalue()
@@ -669,6 +646,35 @@ def __flush_buffers(self, event, context):
669646
self.__stderr_buffer.seek(0)
670647
self.__stderr_buffer.truncate(0)
671648

649+
def __on_process_output_cached(
650+
self, event: ProcessIO, buffer, logger
651+
) -> Optional[SomeActionsType]:
652+
to_write = event.text.decode(errors='replace')
653+
last_cursor = buffer.tell()
654+
self.__stdout_buffer.seek(0, 2) # go to end of buffer
655+
buffer.write(to_write)
656+
buffer.seek(last_cursor)
657+
new_cursor = last_cursor
658+
for line in buffer:
659+
if not line.endswith(os.linesep):
660+
break
661+
new_cursor = buffer.tell()
662+
logger.info(
663+
self.__output_format.format(line=line[:-len(os.linesep)], this=self)
664+
)
665+
buffer.seek(new_cursor)
666+
667+
def __flush_cached_buffers(self, event, context):
668+
for line in self.__stdout_buffer:
669+
self.__stdout_buffer.info(
670+
self.__output_format.format(line=line, this=self)
671+
)
672+
673+
for line in self.__stderr_buffer:
674+
self.__stderr_logger.info(
675+
self.__output_format.format(line=line, this=self)
676+
)
677+
672678
def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeActionsType]:
673679
due_to_sigint = cast(Shutdown, event).due_to_sigint
674680
return self._shutdown_process(
@@ -903,6 +909,13 @@ def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEnti
903909
# If shutdown starts before execution can start, don't start execution.
904910
return None
905911

912+
if self.__cached_output:
913+
on_output_method = self.__on_process_output_cached
914+
flush_buffers_method = self.__flush_cached_buffers
915+
else:
916+
on_output_method = self.__on_process_output
917+
flush_buffers_method = self.__flush_buffers
918+
906919
event_handlers = [
907920
EventHandler(
908921
matcher=lambda event: is_a_subclass(event, ShutdownProcess),
@@ -915,8 +928,10 @@ def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEnti
915928
OnProcessIO(
916929
target_action=self,
917930
on_stdin=self.__on_process_stdin,
918-
on_stdout=self.__on_process_stdout,
919-
on_stderr=self.__on_process_stderr
931+
on_stdout=lambda event: on_output_method(
932+
event, self.__stdout_buffer, self.__stdout_logger),
933+
on_stderr=lambda event: on_output_method(
934+
event, self.__stderr_buffer, self.__stderr_logger),
920935
),
921936
OnShutdown(
922937
on_shutdown=self.__on_shutdown,
@@ -927,7 +942,7 @@ def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEnti
927942
),
928943
OnProcessExit(
929944
target_action=self,
930-
on_exit=self.__flush_buffers,
945+
on_exit=flush_buffers_method,
931946
),
932947
]
933948
for event_handler in event_handlers:
@@ -985,3 +1000,30 @@ def shell(self):
9851000
def prefix(self):
9861001
"""Getter for prefix."""
9871002
return self.__prefix
1003+
1004+
def get_stdout(self):
1005+
"""
1006+
Get cached stdout.
1007+
1008+
:raises RuntimeError: if cached_output is false.
1009+
"""
1010+
if not self.__cached_output:
1011+
raise RuntimeError(f"cached output must be true to be able to get stdout, proc '{self.__name}'")
1012+
return self.__stdout_buffer.getvalue()
1013+
1014+
def get_stderr(self):
1015+
"""
1016+
Get cached stdout.
1017+
1018+
:raises RuntimeError: if cached_output is false.
1019+
"""
1020+
if not self.__cached_output:
1021+
raise RuntimeError(f"cached output must be true to be able to get stderr, proc '{self.__name}'")
1022+
return self.__stderr_buffer.getvalue()
1023+
1024+
@property
1025+
def return_code(self):
1026+
"""Gets the process return code, None if it hasn't finished."""
1027+
if self._subprocess_transport is None:
1028+
return None
1029+
return self._subprocess_transport.get_returncode()

launch/launch/launch_context.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,17 @@ def would_handle_event(self, event: Event) -> bool:
174174
"""Check whether an event would be handled or not."""
175175
return any(handler.matches(event) for handler in self._event_handlers)
176176

177-
def register_event_handler(self, event_handler: BaseEventHandler) -> None:
178-
"""Register a event handler."""
179-
self._event_handlers.appendleft(event_handler)
177+
def register_event_handler(self, event_handler: BaseEventHandler, append = False) -> None:
178+
"""
179+
Register a event handler.
180+
181+
:param append: if 'true', the new event handler will be executed after the previously
182+
registered ones. If not, it will prepend the old handlers.
183+
"""
184+
if append:
185+
self._event_handlers.append(event_handler)
186+
else:
187+
self._event_handlers.appendleft(event_handler)
180188

181189
def unregister_event_handler(self, event_handler: BaseEventHandler) -> None:
182190
"""Unregister an event handler."""

launch_testing/launch_testing/pytest/fixture.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,19 @@ def launch_service(event_loop):
5151
return launch_service
5252

5353

54+
def get_launch_context_fixture(*, scope='function', overridable=True):
55+
"""Return a launch service fixture."""
56+
57+
@pytest.fixture(scope=scope)
58+
def launch_context(launch_service):
59+
"""Create an instance of the launch service for each test case."""
60+
return launch_service.context
61+
if overridable:
62+
launch_context._launch_testing_overridable_fixture = True
63+
launch_context._launch_testing_fixture_scope = scope
64+
return launch_context
65+
66+
5467
def get_event_loop_fixture(*, scope='function', overridable=True):
5568
"""Return an event loop fixture."""
5669

@@ -98,7 +111,8 @@ def fixture(
98111
mod_locals = vars(mod)
99112
for name, getter in (
100113
('launch_service', get_launch_service_fixture),
101-
('event_loop', get_event_loop_fixture)
114+
('event_loop', get_event_loop_fixture),
115+
('launch_context', get_launch_context_fixture),
102116
):
103117
if name in mod_locals:
104118
obj = mod_locals[name]

launch_testing/launch_testing/pytest/plugin.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
from .fixture import finalize_launch_service
2929
from .fixture import get_launch_service_fixture
30+
from .fixture import get_launch_context_fixture
3031

3132
"""
3233
launch_testing native pytest based implementation.
@@ -524,5 +525,8 @@ def inner(**kwargs):
524525
return inner
525526

526527

527-
"""Launch service fixture."""
528528
launch_service = get_launch_service_fixture(overridable=False)
529+
"""Launch service fixture."""
530+
531+
launch_context = get_launch_context_fixture(overridable=False)
532+
"""Launch context fixture."""
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# Copyright 2021 Open Source Robotics Foundation, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import contextlib
17+
import threading
18+
import time
19+
20+
import launch
21+
from launch import event_handlers
22+
23+
24+
@contextlib.contextmanager
25+
def register_event_handler(context, event_handler):
26+
# Code to acquire resource, e.g.:
27+
try:
28+
yield context.register_event_handler(event_handler, append=True)
29+
finally:
30+
context.unregister_event_handler(event_handler)
31+
32+
def _get_on_process_start(execute_process_action, pyevent):
33+
event_handlers.OnProcessStart(
34+
target_action=execute_process_action, on_start=lambda _1, _2: pyevent.set())
35+
36+
async def _wait_for_event(
37+
launch_context, execute_process_action, get_launch_event_handler, timeout=None
38+
):
39+
pyevent = asyncio.Event()
40+
event_handler = get_launch_event_handler(execute_process_action, pyevent)
41+
with register_event_handler(launch_context, event_handler):
42+
await asyncio.wait_for(pyevent.wait(), timeout)
43+
44+
async def _wait_for_event_with_condition(
45+
launch_context, execute_process_action, get_launch_event_handler, condition, timeout=None
46+
):
47+
pyevent = asyncio.Event()
48+
event_handler = get_launch_event_handler(execute_process_action, pyevent)
49+
cond_value = condition()
50+
with register_event_handler(launch_context, event_handler):
51+
start = time.time()
52+
now = start
53+
while not cond_value and (timeout is None or now < start + timeout):
54+
await asyncio.wait_for(pyevent.wait(), start - now + timeout)
55+
pyevent.clear()
56+
cond_value = condition()
57+
now = time.time()
58+
return cond_value
59+
60+
def _wait_for_event_sync(
61+
launch_context, execute_process_action, get_launch_event_handler, timeout=None
62+
):
63+
pyevent = threading.Event()
64+
event_handler = get_launch_event_handler(execute_process_action, pyevent)
65+
with register_event_handler(launch_context, event_handler):
66+
pyevent.wait(timeout)
67+
68+
def _wait_for_event_with_condition_sync(
69+
launch_context, execute_process_action, get_launch_event_handler, condition, timeout=None
70+
):
71+
pyevent = threading.Event()
72+
event_handler = get_launch_event_handler(execute_process_action, pyevent)
73+
cond_value = condition()
74+
with register_event_handler(launch_context, event_handler):
75+
start = time.time()
76+
now = start
77+
while not cond_value and (timeout is None or now < start + timeout):
78+
pyevent.wait(start - now + timeout)
79+
pyevent.clear()
80+
cond_value = condition()
81+
now = time.time()
82+
return cond_value
83+
84+
def _get_stdout_event_handler(action, pyevent):
85+
return event_handlers.OnProcessIO(
86+
target_action=action, on_stdout=lambda _1: pyevent.set())
87+
88+
async def wait_for_output(
89+
launch_context, execute_process_action, validate_output, timeout=None
90+
):
91+
def condition():
92+
try:
93+
return validate_output(execute_process_action.get_stdout())
94+
except AssertionError:
95+
return False
96+
success = await _wait_for_event_with_condition(
97+
launch_context, execute_process_action, _get_stdout_event_handler, condition, timeout)
98+
if not success:
99+
# Validate the output again, this time not catching assertion errors.
100+
# This allows the user to use asserts directly, errors will be nicely rendeded by pytest.
101+
return validate_output(execute_process_action.get_stdout())
102+
103+
104+
def wait_for_output_sync(
105+
launch_context, execute_process_action, validate_output, timeout=None
106+
):
107+
def condition():
108+
try:
109+
return validate_output(execute_process_action.get_stdout())
110+
except AssertionError:
111+
return False
112+
success = _wait_for_event_with_condition_sync(
113+
launch_context, execute_process_action, _get_stdout_event_handler, condition, timeout)
114+
if not success:
115+
# Validate the output again, this time not catching assertion errors.
116+
# This allows the user to use asserts directly, errors will be nicely rendeded by pytest.
117+
return validate_output(execute_process_action.get_stdout()) in (None, True)

0 commit comments

Comments
 (0)