Skip to content

Commit f06f239

Browse files
committed
Bring Service classes more in sync with Java impl
* Add missing Service class methods: - kill, wait_for, is_alive, invalid_lines, error_lines * Make crash reporting report stdout and stderr lines: - As per apposed/appose-java@177bb42. * Migrate corresponding unit tests. * Add missing type hints.
1 parent 9990eb0 commit f06f239

File tree

2 files changed

+270
-22
lines changed

2 files changed

+270
-22
lines changed

src/appose/service.py

Lines changed: 105 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from __future__ import annotations
1010

11+
import os
1112
import subprocess
1213
import tempfile
1314
import threading
@@ -54,6 +55,8 @@ def __init__(
5455
self._tasks: dict[str, "Task"] = {}
5556
self._service_id: int = Service._service_count
5657
Service._service_count += 1
58+
self._invalid_lines: list[str] = []
59+
self._error_lines: list[str] = []
5760
self._process: subprocess.Popen | None = None
5861
self._stdout_thread: threading.Thread | None = None
5962
self._stderr_thread: threading.Thread | None = None
@@ -298,7 +301,60 @@ def close(self) -> None:
298301
raise RuntimeError("Service has not been started")
299302
self._process.stdin.close()
300303

304+
def kill(self) -> None:
305+
"""
306+
Force the service's worker process to begin shutting down. Any tasks still
307+
pending completion will be interrupted, reporting TaskStatus.CRASHED.
308+
309+
To shut down the service more gently, allowing any pending tasks to run to
310+
completion, use close() instead.
311+
312+
To wait until the service's worker process has completely shut down
313+
and all output has been reported, call wait_for() afterward.
314+
"""
315+
self._process.kill()
316+
317+
def wait_for(self) -> int:
318+
"""
319+
Wait for the service's worker process to terminate.
320+
321+
Returns:
322+
Exit value of the worker process.
323+
"""
324+
self._process.wait()
325+
326+
# Wait for worker output processing threads to finish up.
327+
self._stdout_thread.join()
328+
self._stderr_thread.join()
329+
self._monitor_thread.join()
301330

331+
return self._process.returncode
332+
333+
def is_alive(self) -> bool:
334+
"""
335+
Return true if the service's worker process is currently running,
336+
or false if it has not yet started or has already shut down or crashed.
337+
338+
Returns:
339+
Whether the service's worker process is currently running.
340+
"""
341+
return self._process is not None and self._process.poll() is None
342+
343+
def invalid_lines(self) -> list[str]:
344+
"""
345+
Unparseable lines emitted by the worker process on its stdout stream,
346+
collected over the lifetime of the service.
347+
Can be useful for analyzing why a worker process has crashed.
348+
"""
349+
return self._invalid_lines
350+
351+
def error_lines(self) -> list[str]:
352+
"""
353+
Lines emitted by the worker process on its stderr stream,
354+
collected over the lifetime of the service.
355+
Can be useful for analyzing why a worker process has crashed.
356+
"""
357+
return self._error_lines
302358

303359
def _stdout_loop(self) -> None:
304360
"""
@@ -310,7 +366,7 @@ def _stdout_loop(self) -> None:
310366
try:
311367
line = None if stdout is None else stdout.readline()
312368
except Exception:
313-
# Something went wrong reading the line. Panic!
369+
# Something went wrong reading the stdout line. Panic!
314370
self._debug_service(format_exc())
315371
break
316372

@@ -336,22 +392,26 @@ def _stdout_loop(self) -> None:
336392
# Something went wrong decoding the line of JSON.
337393
# Skip it and keep going, but log it first.
338394
self._debug_service(f"<INVALID> {line}")
395+
self._invalid_lines.append(line.rstrip("\n\r"))
339396

340397
def _stderr_loop(self) -> None:
341398
"""
342399
Input loop processing lines from the worker's stderr stream.
343400
"""
344-
# noinspection PyBroadException
345-
try:
346-
while True:
347-
stderr = self._process.stderr
401+
while True:
402+
stderr = self._process.stderr
403+
# noinspection PyBroadException
404+
try:
348405
line = None if stderr is None else stderr.readline()
349-
if not line: # readline returns empty string upon EOF
350-
self._debug_service("<worker stderr closed>")
351-
return
352-
self._debug_worker(line)
353-
except Exception:
354-
self._debug_service(format_exc())
406+
except Exception:
407+
# Something went wrong reading the stderr line. Panic!
408+
self._debug_service(format_exc())
409+
break
410+
if not line: # readline returns empty string upon EOF
411+
self._debug_service("<worker stderr closed>")
412+
break
413+
self._debug_worker(line)
414+
self._error_lines.append(line.rstrip("\n\r"))
355415

356416
def _monitor_loop(self) -> None:
357417
# Wait until the worker process terminates.
@@ -364,15 +424,33 @@ def _monitor_loop(self) -> None:
364424
f"<worker process terminated with exit code {exit_code}>"
365425
)
366426
task_count = len(self._tasks)
367-
if task_count > 0:
368-
self._debug_service(
369-
f"<worker process terminated with {task_count} pending tasks>"
370-
)
427+
if task_count == 0:
428+
# No hanging tasks to clean up.
429+
return
430+
431+
self._debug_service(
432+
"<worker process terminated with "
433+
+ f"{task_count} pending task{'' if task_count == 1 else 's'}>"
434+
)
371435

372436
# Notify any remaining tasks about the process crash.
437+
nl = os.linesep
438+
error_parts = [f"Worker crashed with exit code {exit_code}."]
439+
error_parts.append("")
440+
error_parts.append("[stdout]")
441+
if len(self._invalid_lines) == 0:
442+
error_parts.append("<none>")
443+
else:
444+
error_parts.extend(self._invalid_lines)
445+
error_parts.append("")
446+
error_parts.append("[stderr]")
447+
if len(self._error_lines) == 0:
448+
error_parts.append("<none>")
449+
else:
450+
error_parts.extend(self._error_lines)
451+
error = nl.join(error_parts) + nl
373452
for task in self._tasks.values():
374-
task._crash()
375-
453+
task._crash(error)
376454
self._tasks.clear()
377455

378456
def _debug_service(self, message: str) -> None:
@@ -405,12 +483,17 @@ class TaskStatus(Enum):
405483
FAILED = "FAILED"
406484
CRASHED = "CRASHED"
407485

408-
def is_finished(self):
486+
def is_finished(self) -> bool:
409487
"""
410488
True iff status is COMPLETE, CANCELED, FAILED, or CRASHED.
411489
"""
490+
return self == TaskStatus.COMPLETE or self.is_error()
491+
492+
def is_error(self) -> bool:
493+
"""
494+
True iff status is CANCELED, FAILED, or CRASHED.
495+
"""
412496
return self in (
413-
TaskStatus.COMPLETE,
414497
TaskStatus.CANCELED,
415498
TaskStatus.FAILED,
416499
TaskStatus.CRASHED,
@@ -434,7 +517,7 @@ class ResponseType(Enum):
434517
True iff response type is COMPLETE, CANCELED, FAILED, or CRASHED.
435518
"""
436519

437-
def is_terminal(self):
520+
def is_terminal(self) -> bool:
438521
return self in (
439522
ResponseType.COMPLETION,
440523
ResponseType.CANCELATION,
@@ -629,9 +712,10 @@ def _handle(self, response: Args) -> None:
629712
with self.cv:
630713
self.cv.notify_all()
631714

632-
def _crash(self):
715+
def _crash(self, error: str):
633716
event = TaskEvent(self, ResponseType.CRASH)
634717
self.status = TaskStatus.CRASHED
718+
self.error = error
635719
for listener in self.listeners:
636720
listener(event)
637721
with self.cv:

tests/test_service.py

Lines changed: 165 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44

55

66
import appose
7-
from appose.service import TaskStatus
7+
from appose.service import ResponseType, TaskException, TaskStatus
88
from tests.test_base import execute_and_assert, maybe_debug
99
from pathlib import Path
10+
import time
11+
import os
12+
import re
1013

1114
collatz_groovy = """
1215
// Computes the stopping time of a given value
@@ -190,3 +193,164 @@ def test_init_numpy():
190193
actual_val = result[i]
191194
assert isinstance(actual_val, (int, float))
192195
assert abs(actual_val - expected_val) < 1e-10
196+
197+
198+
def test_task_failure_python():
199+
env = appose.system()
200+
with env.python() as service:
201+
maybe_debug(service)
202+
script = "whee\n"
203+
try:
204+
service.task(script).wait_for()
205+
raise AssertionError("Expected TaskException for failed script")
206+
except TaskException as e:
207+
expected_error = "NameError: name 'whee' is not defined"
208+
assert expected_error in str(e)
209+
210+
211+
def test_startup_crash():
212+
env = appose.system()
213+
python_exes = ["python", "python3", "python.exe"]
214+
service = env.service(python_exes, "-c", "import nonexistentpackage").start()
215+
# Wait up to 500ms for the crash.
216+
for i in range(100):
217+
if not service.is_alive():
218+
break
219+
time.sleep(0.005)
220+
assert not service.is_alive()
221+
# Check that the crash happened and was recorded correctly.
222+
error_lines = service.error_lines()
223+
assert error_lines is not None
224+
assert len(error_lines) > 0
225+
error = error_lines[-1]
226+
assert error == "ModuleNotFoundError: No module named 'nonexistentpackage'"
227+
228+
229+
def test_python_sys_exit():
230+
env = appose.system()
231+
with env.python() as service:
232+
maybe_debug(service)
233+
234+
# Launch a task that calls sys.exit. This is a nasty thing to do
235+
# because Python does not exit the worker process when sys.exit is
236+
# called within a dedicated threading.Thread; the thread just dies.
237+
# So in addition to testing the Python code here, we are also testing
238+
# that Appose's python_worker handles this situation well.
239+
try:
240+
service.task("import sys\nsys.exit(123)").wait_for()
241+
raise AssertionError("Expected TaskException for sys.exit")
242+
except TaskException as e:
243+
# The failure should be either "thread death" or a "SystemExit" message.
244+
assert "thread death" in str(e) or "SystemExit: 123" in str(e)
245+
246+
247+
def test_crash_with_active_task():
248+
env = appose.system()
249+
with env.python() as service:
250+
maybe_debug(service)
251+
# Create a "long-running" task.
252+
script = (
253+
"import sys\n"
254+
"sys.stderr.write('one\\n')\n"
255+
"sys.stderr.flush()\n"
256+
"print('two')\n"
257+
"sys.stdout.flush()\n"
258+
"sys.stderr.write('three\\n')\n"
259+
"sys.stderr.flush()\n"
260+
"task.update('halfway')\n"
261+
"print('four')\n"
262+
"sys.stdout.flush()\n"
263+
"sys.stderr.write('five\\n')\n"
264+
"sys.stderr.flush()\n"
265+
"print('six')\n"
266+
"sys.stdout.flush()\n"
267+
"sys.stderr.write('seven\\n')\n"
268+
"task.update('crash-me')\n"
269+
"import time; time.sleep(999)\n"
270+
)
271+
ready = [False]
272+
273+
def on_crash_me(event):
274+
if event.message == "crash-me":
275+
ready[0] = True
276+
277+
task = service.task(script)
278+
task.listen(on_crash_me)
279+
280+
# Record any crash reported in the task notifications.
281+
reported_error = [None]
282+
283+
def on_crash(event):
284+
if event.response_type == ResponseType.CRASH:
285+
reported_error[0] = task.error
286+
287+
task.listen(on_crash)
288+
289+
# Launch the task.
290+
task.start()
291+
292+
# Simulate a crash after the script has emitted its output.
293+
while not ready[0]:
294+
time.sleep(0.005)
295+
service.kill()
296+
297+
# Wait for the service to fully shut down after the crash.
298+
exit_code = service.wait_for()
299+
assert exit_code != 0
300+
301+
# Is the task flagged as crashed?
302+
assert TaskStatus.CRASHED == task.status
303+
304+
# Was the crash error successfully and consistently recorded?
305+
assert reported_error[0] is not None
306+
nl = os.linesep
307+
assert service.invalid_lines() == ["two", "four", "six"]
308+
assert service.error_lines() == ["one", "three", "five", "seven"]
309+
expected = (
310+
f"Worker crashed with exit code ###.{nl}"
311+
f"{nl}"
312+
f"[stdout]{nl}"
313+
f"two{nl}"
314+
f"four{nl}"
315+
f"six{nl}"
316+
f"{nl}"
317+
f"[stderr]{nl}"
318+
f"one{nl}"
319+
f"three{nl}"
320+
f"five{nl}"
321+
f"seven{nl}"
322+
)
323+
generalized_error = re.sub(r"exit code -?[0-9]+", "exit code ###", task.error)
324+
assert expected == generalized_error
325+
326+
327+
def test_task_result():
328+
"""Tests Task.result() convenience method."""
329+
env = appose.system()
330+
with env.python() as service:
331+
maybe_debug(service)
332+
333+
# Create a task that produces a result.
334+
task = service.task("'success'").wait_for()
335+
assert TaskStatus.COMPLETE == task.status
336+
337+
# Test the result() convenience method.
338+
result = task.result()
339+
assert result == "success"
340+
341+
# Verify it's the same as directly accessing outputs.
342+
assert task.outputs.get("result") == result
343+
344+
345+
def test_task_result_null():
346+
"""Tests Task.result() returns None when no result is set."""
347+
env = appose.system()
348+
with env.python() as service:
349+
maybe_debug(service)
350+
351+
# Create a task that doesn't set a result.
352+
task = service.task("print('no result')").wait_for()
353+
assert TaskStatus.COMPLETE == task.status
354+
355+
# result() should return None.
356+
assert task.result() is None

0 commit comments

Comments
 (0)