Skip to content

Commit da11106

Browse files
committed
Simple tools to interact with processes, needs more work
Signed-off-by: Ivan Santiago Paunovic <[email protected]>
1 parent 6ba7e44 commit da11106

File tree

6 files changed

+290
-47
lines changed

6 files changed

+290
-47
lines changed

launch/launch/actions/execute_process.py

Lines changed: 84 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ def __init__(
110110
]] = None,
111111
respawn: bool = False,
112112
respawn_delay: Optional[float] = None,
113+
cached_output: bool = False,
113114
**kwargs
114115
) -> None:
115116
"""
@@ -208,6 +209,8 @@ def __init__(
208209
:param: respawn if 'True', relaunch the process that abnormally died.
209210
Defaults to 'False'.
210211
:param: respawn_delay a delay time to relaunch the died process if respawn is 'True'.
212+
:param: cached_output if `True`, both stdout and stderr will be cached.
213+
Use get_stdout() and get_stderr() to read the buffered output.
211214
"""
212215
super().__init__(**kwargs)
213216
self.__cmd = [normalize_to_list_of_substitutions(x) for x in cmd]
@@ -254,6 +257,7 @@ def __init__(
254257
self.__sigkill_timer = None # type: Optional[TimerAction]
255258
self.__stdout_buffer = io.StringIO()
256259
self.__stderr_buffer = io.StringIO()
260+
self.__cached_output = cached_output
257261

258262
self.__executed = False
259263

@@ -505,59 +509,32 @@ def __on_process_stdin(
505509
cast(ProcessStdin, event)
506510
return None
507511

508-
def __on_process_stdout(
509-
self, event: ProcessIO
512+
def __on_process_output(
513+
self, event: ProcessIO, buffer, logger
510514
) -> Optional[SomeActionsType]:
511515
to_write = event.text.decode(errors='replace')
512-
if self.__stdout_buffer.closed:
513-
# __stdout_buffer was probably closed by __flush_buffers on shutdown. Output without
516+
if buffer.closed:
517+
# buffer was probably closed by __flush_buffers on shutdown. Output without
514518
# buffering.
515-
self.__stdout_logger.info(
516-
self.__output_format.format(line=to_write, this=self)
517-
)
518-
else:
519-
self.__stdout_buffer.write(to_write)
520-
self.__stdout_buffer.seek(0)
521-
last_line = None
522-
for line in self.__stdout_buffer:
523-
if line.endswith(os.linesep):
524-
self.__stdout_logger.info(
525-
self.__output_format.format(line=line[:-len(os.linesep)], this=self)
526-
)
527-
else:
528-
last_line = line
529-
break
530-
self.__stdout_buffer.seek(0)
531-
self.__stdout_buffer.truncate(0)
532-
if last_line is not None:
533-
self.__stdout_buffer.write(last_line)
534-
535-
def __on_process_stderr(
536-
self, event: ProcessIO
537-
) -> Optional[SomeActionsType]:
538-
to_write = event.text.decode(errors='replace')
539-
if self.__stderr_buffer.closed:
540-
# __stderr buffer was probably closed by __flush_buffers on shutdown. Output without
541-
# buffering.
542-
self.__stderr_logger.info(
519+
buffer.info(
543520
self.__output_format.format(line=to_write, this=self)
544521
)
545522
else:
546-
self.__stderr_buffer.write(to_write)
547-
self.__stderr_buffer.seek(0)
523+
buffer.write(to_write)
524+
buffer.seek(0)
548525
last_line = None
549-
for line in self.__stderr_buffer:
526+
for line in buffer:
550527
if line.endswith(os.linesep):
551-
self.__stderr_logger.info(
528+
logger.info(
552529
self.__output_format.format(line=line[:-len(os.linesep)], this=self)
553530
)
554531
else:
555532
last_line = line
556533
break
557-
self.__stderr_buffer.seek(0)
558-
self.__stderr_buffer.truncate(0)
534+
buffer.seek(0)
535+
buffer.truncate(0)
559536
if last_line is not None:
560-
self.__stderr_buffer.write(last_line)
537+
buffer.write(last_line)
561538

562539
def __flush_buffers(self, event, context):
563540
line = self.__stdout_buffer.getvalue()
@@ -583,6 +560,35 @@ def __flush_buffers(self, event, context):
583560
self.__stderr_buffer.seek(0)
584561
self.__stderr_buffer.truncate(0)
585562

563+
def __on_process_output_cached(
564+
self, event: ProcessIO, buffer, logger
565+
) -> Optional[SomeActionsType]:
566+
to_write = event.text.decode(errors='replace')
567+
last_cursor = buffer.tell()
568+
self.__stdout_buffer.seek(0, 2) # go to end of buffer
569+
buffer.write(to_write)
570+
buffer.seek(last_cursor)
571+
new_cursor = last_cursor
572+
for line in buffer:
573+
if not line.endswith(os.linesep):
574+
break
575+
new_cursor = buffer.tell()
576+
logger.info(
577+
self.__output_format.format(line=line[:-len(os.linesep)], this=self)
578+
)
579+
buffer.seek(new_cursor)
580+
581+
def __flush_cached_buffers(self, event, context):
582+
for line in self.__stdout_buffer:
583+
self.__stdout_buffer.info(
584+
self.__output_format.format(line=line, this=self)
585+
)
586+
587+
for line in self.__stderr_buffer:
588+
self.__stderr_logger.info(
589+
self.__output_format.format(line=line, this=self)
590+
)
591+
586592
def __on_shutdown(self, event: Event, context: LaunchContext) -> Optional[SomeActionsType]:
587593
due_to_sigint = cast(Shutdown, event).due_to_sigint
588594
return self._shutdown_process(
@@ -817,6 +823,13 @@ def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEnti
817823
# If shutdown starts before execution can start, don't start execution.
818824
return None
819825

826+
if self.__cached_output:
827+
on_output_method = self.__on_process_output_cached
828+
flush_buffers_method = self.__flush_cached_buffers
829+
else:
830+
on_output_method = self.__on_process_output
831+
flush_buffers_method = self.__flush_buffers
832+
820833
event_handlers = [
821834
EventHandler(
822835
matcher=lambda event: is_a_subclass(event, ShutdownProcess),
@@ -829,8 +842,10 @@ def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEnti
829842
OnProcessIO(
830843
target_action=self,
831844
on_stdin=self.__on_process_stdin,
832-
on_stdout=self.__on_process_stdout,
833-
on_stderr=self.__on_process_stderr
845+
on_stdout=lambda event: on_output_method(
846+
event, self.__stdout_buffer, self.__stdout_logger),
847+
on_stderr=lambda event: on_output_method(
848+
event, self.__stderr_buffer, self.__stderr_logger),
834849
),
835850
OnShutdown(
836851
on_shutdown=self.__on_shutdown,
@@ -841,7 +856,7 @@ def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEnti
841856
),
842857
OnProcessExit(
843858
target_action=self,
844-
on_exit=self.__flush_buffers,
859+
on_exit=flush_buffers_method,
845860
),
846861
]
847862
for event_handler in event_handlers:
@@ -899,3 +914,30 @@ def shell(self):
899914
def prefix(self):
900915
"""Getter for prefix."""
901916
return self.__prefix
917+
918+
def get_stdout(self):
919+
"""
920+
Get cached stdout.
921+
922+
:raises RuntimeError: if cached_output is false.
923+
"""
924+
if not self.__cached_output:
925+
raise RuntimeError(f"cached output must be true to be able to get stdout, proc '{self.__name}'")
926+
return self.__stdout_buffer.getvalue()
927+
928+
def get_stderr(self):
929+
"""
930+
Get cached stdout.
931+
932+
:raises RuntimeError: if cached_output is false.
933+
"""
934+
if not self.__cached_output:
935+
raise RuntimeError(f"cached output must be true to be able to get stderr, proc '{self.__name}'")
936+
return self.__stderr_buffer.getvalue()
937+
938+
@property
939+
def return_code(self):
940+
"""Gets the process return code, None if it hasn't finished."""
941+
if self._subprocess_transport is None:
942+
return None
943+
return self._subprocess_transport.get_returncode()

launch/launch/launch_context.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,17 @@ def would_handle_event(self, event: Event) -> bool:
174174
"""Check whether an event would be handled or not."""
175175
return any(handler.matches(event) for handler in self._event_handlers)
176176

177-
def register_event_handler(self, event_handler: BaseEventHandler) -> None:
178-
"""Register a event handler."""
179-
self._event_handlers.appendleft(event_handler)
177+
def register_event_handler(self, event_handler: BaseEventHandler, append = False) -> None:
178+
"""
179+
Register a event handler.
180+
181+
:param append: if 'true', the new event handler will be executed after the previously
182+
registered ones. If not, it will prepend the old handlers.
183+
"""
184+
if append:
185+
self._event_handlers.append(event_handler)
186+
else:
187+
self._event_handlers.appendleft(event_handler)
180188

181189
def unregister_event_handler(self, event_handler: BaseEventHandler) -> None:
182190
"""Unregister an event handler."""

launch_testing/launch_testing/pytest/fixture.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,19 @@ def launch_service(event_loop):
5151
return launch_service
5252

5353

54+
def get_launch_context_fixture(*, scope='function', overridable=True):
55+
"""Return a launch service fixture."""
56+
57+
@pytest.fixture(scope=scope)
58+
def launch_context(launch_service):
59+
"""Create an instance of the launch service for each test case."""
60+
return launch_service.context
61+
if overridable:
62+
launch_context._launch_testing_overridable_fixture = True
63+
launch_context._launch_testing_fixture_scope = scope
64+
return launch_context
65+
66+
5467
def get_event_loop_fixture(*, scope='function', overridable=True):
5568
"""Return an event loop fixture."""
5669

@@ -98,7 +111,8 @@ def fixture(
98111
mod_locals = vars(mod)
99112
for name, getter in (
100113
('launch_service', get_launch_service_fixture),
101-
('event_loop', get_event_loop_fixture)
114+
('event_loop', get_event_loop_fixture),
115+
('launch_context', get_launch_context_fixture),
102116
):
103117
if name in mod_locals:
104118
obj = mod_locals[name]

launch_testing/launch_testing/pytest/plugin.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
from .fixture import finalize_launch_service
2929
from .fixture import get_launch_service_fixture
30+
from .fixture import get_launch_context_fixture
3031

3132
"""
3233
launch_testing native pytest based implementation.
@@ -524,5 +525,8 @@ def inner(**kwargs):
524525
return inner
525526

526527

527-
"""Launch service fixture."""
528528
launch_service = get_launch_service_fixture(overridable=False)
529+
"""Launch service fixture."""
530+
531+
launch_context = get_launch_context_fixture(overridable=False)
532+
"""Launch context fixture."""
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# Copyright 2021 Open Source Robotics Foundation, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import asyncio
16+
import contextlib
17+
import threading
18+
import time
19+
20+
import launch
21+
from launch import event_handlers
22+
23+
24+
@contextlib.contextmanager
25+
def register_event_handler(context, event_handler):
26+
# Code to acquire resource, e.g.:
27+
try:
28+
yield context.register_event_handler(event_handler, append=True)
29+
finally:
30+
context.unregister_event_handler(event_handler)
31+
32+
def _get_on_process_start(execute_process_action, pyevent):
33+
event_handlers.OnProcessStart(
34+
target_action=execute_process_action, on_start=lambda _1, _2: pyevent.set())
35+
36+
async def _wait_for_event(
37+
launch_context, execute_process_action, get_launch_event_handler, timeout=None
38+
):
39+
pyevent = asyncio.Event()
40+
event_handler = get_launch_event_handler(execute_process_action, pyevent)
41+
with register_event_handler(launch_context, event_handler):
42+
await asyncio.wait_for(pyevent.wait(), timeout)
43+
44+
async def _wait_for_event_with_condition(
45+
launch_context, execute_process_action, get_launch_event_handler, condition, timeout=None
46+
):
47+
pyevent = asyncio.Event()
48+
event_handler = get_launch_event_handler(execute_process_action, pyevent)
49+
cond_value = condition()
50+
with register_event_handler(launch_context, event_handler):
51+
start = time.time()
52+
now = start
53+
while not cond_value and (timeout is None or now < start + timeout):
54+
await asyncio.wait_for(pyevent.wait(), start - now + timeout)
55+
pyevent.clear()
56+
cond_value = condition()
57+
now = time.time()
58+
return cond_value
59+
60+
def _wait_for_event_sync(
61+
launch_context, execute_process_action, get_launch_event_handler, timeout=None
62+
):
63+
pyevent = threading.Event()
64+
event_handler = get_launch_event_handler(execute_process_action, pyevent)
65+
with register_event_handler(launch_context, event_handler):
66+
pyevent.wait(timeout)
67+
68+
def _wait_for_event_with_condition_sync(
69+
launch_context, execute_process_action, get_launch_event_handler, condition, timeout=None
70+
):
71+
pyevent = threading.Event()
72+
event_handler = get_launch_event_handler(execute_process_action, pyevent)
73+
cond_value = condition()
74+
with register_event_handler(launch_context, event_handler):
75+
start = time.time()
76+
now = start
77+
while not cond_value and (timeout is None or now < start + timeout):
78+
pyevent.wait(start - now + timeout)
79+
pyevent.clear()
80+
cond_value = condition()
81+
now = time.time()
82+
return cond_value
83+
84+
def _get_stdout_event_handler(action, pyevent):
85+
return event_handlers.OnProcessIO(
86+
target_action=action, on_stdout=lambda _1: pyevent.set())
87+
88+
async def wait_for_output(
89+
launch_context, execute_process_action, validate_output, timeout=None
90+
):
91+
def condition():
92+
try:
93+
return validate_output(execute_process_action.get_stdout())
94+
except AssertionError:
95+
return False
96+
success = await _wait_for_event_with_condition(
97+
launch_context, execute_process_action, _get_stdout_event_handler, condition, timeout)
98+
if not success:
99+
# Validate the output again, this time not catching assertion errors.
100+
# This allows the user to use asserts directly, errors will be nicely rendeded by pytest.
101+
return validate_output(execute_process_action.get_stdout())
102+
103+
104+
def wait_for_output_sync(
105+
launch_context, execute_process_action, validate_output, timeout=None
106+
):
107+
def condition():
108+
try:
109+
return validate_output(execute_process_action.get_stdout())
110+
except AssertionError:
111+
return False
112+
success = _wait_for_event_with_condition_sync(
113+
launch_context, execute_process_action, _get_stdout_event_handler, condition, timeout)
114+
if not success:
115+
# Validate the output again, this time not catching assertion errors.
116+
# This allows the user to use asserts directly, errors will be nicely rendeded by pytest.
117+
return validate_output(execute_process_action.get_stdout()) in (None, True)

0 commit comments

Comments
 (0)