Skip to content

Commit 3adfb5e

Browse files
ctruedenclaude
andcommitted
Migrate remaining improvements from appose-java
* Implement TaskException exception class * Implement 4 Service syntax methods (get_var, put_var, call, proxy) * Add Task.result() convenience method * Add Service.syntax() accessor * Update Environment to wire syntax into services * Create 3 comprehensive test files (15 tests total) Co-authored-by: Claude <[email protected]>
1 parent 5a53c1e commit 3adfb5e

File tree

7 files changed

+696
-72
lines changed

7 files changed

+696
-72
lines changed

src/appose/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,7 @@ def task_listener(event):
218218

219219
from .builder import SimpleBuilder
220220
from .environment import Environment
221+
from .service import TaskException # noqa: F401
221222
from .shm import NDArray, SharedMemory # noqa: F401
222223

223224
__version__ = "0.7.3.dev0"

src/appose/environment.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import os
3737
from pathlib import Path
3838

39+
from .syntax import GroovySyntax, PythonSyntax, ScriptSyntax
3940
from .util.filepath import find_exe
4041
from .service import Service
4142

@@ -127,6 +128,7 @@ def python(self) -> Service:
127128
python_exes,
128129
"-c",
129130
"import appose.python_worker; appose.python_worker.main()",
131+
syntax=PythonSyntax(),
130132
)
131133

132134
def groovy(
@@ -155,14 +157,18 @@ def groovy(
155157
:raises IOError: If something goes wrong starting the worker process.
156158
"""
157159
return self.java(
158-
"org.apposed.appose.GroovyWorker", class_path=class_path, jvm_args=jvm_args
160+
"org.apposed.appose.GroovyWorker",
161+
class_path=class_path,
162+
jvm_args=jvm_args,
163+
syntax=GroovySyntax(),
159164
)
160165

161166
def java(
162167
self,
163168
main_class: str,
164169
class_path: list[str] | None = None,
165170
jvm_args: list[str] | None = None,
171+
syntax: ScriptSyntax | None = None,
166172
) -> Service:
167173
# Collect classpath elements into a set, to avoid duplicate entries.
168174
cp: dict[str] = {} # NB: Use dict instead of set to maintain insertion order.
@@ -192,9 +198,11 @@ def java(
192198
"jre/bin/java",
193199
"jre/bin/java.exe",
194200
]
195-
return self.service(java_exes, *args)
201+
return self.service(java_exes, *args, syntax=syntax)
196202

197-
def service(self, exes: list[str], *args) -> Service:
203+
def service(
204+
self, exes: list[str], *args, syntax: ScriptSyntax | None = None
205+
) -> Service:
198206
"""
199207
Create a service with the given command line arguments.
200208
@@ -253,4 +261,4 @@ def service(self, exes: list[str], *args) -> Service:
253261
all_args.append(exe_path)
254262
all_args.extend(args)
255263

256-
return Service(self.base(), all_args)
264+
return Service(self.base(), all_args, syntax)

src/appose/service.py

Lines changed: 183 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,33 @@
4141
from typing import Any, Callable
4242
from uuid import uuid4
4343

44+
from .syntax import ScriptSyntax
4445
from .util.types import Args, decode, encode
4546

4647

48+
class TaskException(Exception):
49+
"""
50+
Exception raised when a Task fails to complete successfully.
51+
52+
This exception is raised by Task.wait_for() when the task finishes
53+
in a non-successful state (FAILED, CANCELED, or CRASHED).
54+
"""
55+
56+
def __init__(self, message: str, task: "Task") -> None:
57+
super().__init__(message)
58+
self.task = task
59+
60+
@property
61+
def status(self) -> "TaskStatus":
62+
"""Returns the status of the failed task."""
63+
return self.task.status
64+
65+
@property
66+
def task_error(self) -> str | None:
67+
"""Returns the error message from the task, if available."""
68+
return self.task.error
69+
70+
4771
class Service:
4872
"""
4973
An Appose *service* provides access to a linked Appose *worker* running
@@ -54,7 +78,9 @@ class Service:
5478

5579
_service_count: int = 0
5680

57-
def __init__(self, cwd: str | Path, args: list[str]) -> None:
81+
def __init__(
82+
self, cwd: str | Path, args: list[str], syntax: ScriptSyntax | None = None
83+
) -> None:
5884
self._cwd: Path = Path(cwd)
5985
self._args: list[str] = args[:]
6086
self._tasks: dict[str, "Task"] = {}
@@ -65,6 +91,16 @@ def __init__(self, cwd: str | Path, args: list[str]) -> None:
6591
self._stderr_thread: threading.Thread | None = None
6692
self._monitor_thread: threading.Thread | None = None
6793
self._debug_callback: Callable[[Any], Any] | None = None
94+
self._syntax: ScriptSyntax | None = syntax
95+
96+
def syntax(self) -> ScriptSyntax | None:
97+
"""
98+
Returns the script syntax associated with this service.
99+
100+
Returns:
101+
The script syntax, or None if not configured.
102+
"""
103+
return self._syntax
68104

69105
def debug(self, debug_callback: Callable[[Any], Any]) -> None:
70106
"""
@@ -126,6 +162,110 @@ def task(
126162
self.start()
127163
return Task(self, script, inputs, queue)
128164

165+
def get_var(self, name: str) -> Any:
166+
"""
167+
Retrieves a variable's value from the worker process's global scope.
168+
169+
The variable must have been previously exported using task.export()
170+
to be accessible across tasks.
171+
172+
Args:
173+
name: The name of the variable to retrieve.
174+
175+
Returns:
176+
The value of the variable.
177+
178+
Raises:
179+
TaskException: If the variable retrieval fails.
180+
ValueError: If no script syntax has been configured for this service.
181+
"""
182+
if self._syntax is None:
183+
raise ValueError("No script syntax configured for this service")
184+
script = self._syntax.get_var(name)
185+
task = self.task(script).wait_for()
186+
return task.outputs.get("result")
187+
188+
def put_var(self, name: str, value: Any) -> None:
189+
"""
190+
Sets a variable in the worker process's global scope and exports it
191+
for future use across tasks.
192+
193+
Args:
194+
name: The name of the variable to set in the worker process.
195+
value: The value to assign to the variable.
196+
197+
Raises:
198+
TaskException: If the variable assignment fails.
199+
ValueError: If no script syntax has been configured for this service.
200+
"""
201+
if self._syntax is None:
202+
raise ValueError("No script syntax configured for this service")
203+
inputs = {"_value": value}
204+
script = self._syntax.put_var(name, "_value")
205+
self.task(script, inputs).wait_for()
206+
207+
def call(self, function: str, *args: Any) -> Any:
208+
"""
209+
Calls a function in the worker process with the given arguments and
210+
returns the result.
211+
212+
The function must be accessible in the worker's global scope (either
213+
built-in or previously defined/imported).
214+
215+
Args:
216+
function: The name of the function to call in the worker process.
217+
*args: The arguments to pass to the function.
218+
219+
Returns:
220+
The result of the function call.
221+
222+
Raises:
223+
TaskException: If the function call fails.
224+
ValueError: If no script syntax has been configured for this service.
225+
"""
226+
if self._syntax is None:
227+
raise ValueError("No script syntax configured for this service")
228+
inputs = {}
229+
var_names = []
230+
for i, arg in enumerate(args):
231+
var_name = f"arg{i}"
232+
inputs[var_name] = arg
233+
var_names.append(var_name)
234+
script = self._syntax.call(function, var_names)
235+
task = self.task(script, inputs).wait_for()
236+
return task.outputs.get("result")
237+
238+
def proxy(self, var: str, api: type, queue: str | None = None) -> Any:
239+
"""
240+
Creates a proxy object providing strongly typed access to a remote
241+
object in this service's worker process.
242+
243+
Method calls on the proxy are transparently forwarded to the remote
244+
object via Tasks.
245+
246+
Important: The variable must be explicitly exported using
247+
task.export(varName=value) in a previous task. Only exported variables
248+
are accessible across tasks within the same service.
249+
250+
Args:
251+
var: The name of the exported variable in the worker process
252+
referencing the remote object.
253+
api: The interface/protocol class that the proxy should implement.
254+
queue: Optional queue identifier for task execution. Pass "main" to
255+
ensure execution on the worker's main thread.
256+
257+
Returns:
258+
A proxy object that forwards method calls to the remote object.
259+
260+
Raises:
261+
ValueError: If no script syntax has been configured for this service.
262+
"""
263+
if self._syntax is None:
264+
raise ValueError("No script syntax configured for this service")
265+
from .util.proxy import create
266+
267+
return create(self, var, queue)
268+
129269
def close(self) -> None:
130270
"""
131271
Close the worker process's input stream, in order to shut it down.
@@ -349,16 +489,56 @@ def listen(self, listener: Callable[["TaskEvent"], None]) -> None:
349489

350490
self.listeners.append(listener)
351491

352-
def wait_for(self) -> None:
492+
def wait_for(self) -> "Task":
493+
"""
494+
Wait for this task to complete.
495+
496+
Returns:
497+
This task (for method chaining).
498+
499+
Raises:
500+
TaskException: If the task fails, is canceled, or crashes.
501+
"""
353502
with self.cv:
354503
if self.status == TaskStatus.INITIAL:
355504
self.start()
356505

357506
if self.status not in (TaskStatus.QUEUED, TaskStatus.RUNNING):
358-
return
507+
# Task already finished - check if we need to raise
508+
if self.status != TaskStatus.COMPLETE:
509+
self._raise_if_failed()
510+
return self
359511

360512
self.cv.wait()
361513

514+
# After waiting, check if task failed
515+
self._raise_if_failed()
516+
return self
517+
518+
def result(self) -> Any:
519+
"""
520+
Returns the result of this task.
521+
522+
This is a convenience method that returns outputs["result"].
523+
For tasks that return a single value (e.g., from an expression),
524+
that value is stored in outputs["result"].
525+
526+
Returns:
527+
The task's result value.
528+
"""
529+
return self.outputs.get("result")
530+
531+
def _raise_if_failed(self) -> None:
532+
"""Raise TaskException if this task is in a failed state."""
533+
if self.status == TaskStatus.FAILED:
534+
error_msg = self.error if self.error else "Unknown error"
535+
raise TaskException(f"Task failed: {error_msg}", self)
536+
elif self.status == TaskStatus.CANCELED:
537+
raise TaskException("Task was canceled", self)
538+
elif self.status == TaskStatus.CRASHED:
539+
error_msg = self.error if self.error else "Worker process crashed"
540+
raise TaskException(f"Task crashed: {error_msg}", self)
541+
362542
def cancel(self) -> None:
363543
"""
364544
Send a task cancelation request to the worker process.

tests/test_service.py

Lines changed: 2 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@
2727
# #L%
2828
###
2929

30-
import os
3130

3231
import appose
33-
from appose.service import ResponseType, Service, TaskStatus
32+
from appose.service import TaskStatus
33+
from tests.test_base import execute_and_assert, maybe_debug
3434

3535
collatz_groovy = """
3636
// Computes the stopping time of a given value
@@ -155,66 +155,3 @@ def test_main_thread_queue_python():
155155
assert TaskStatus.COMPLETE == task.status
156156
thread = task.outputs.get("thread")
157157
assert thread != "MainThread"
158-
159-
160-
def execute_and_assert(service: Service, script: str):
161-
task = service.task(script)
162-
163-
# Record the state of the task for each event that occurs.
164-
165-
class TaskState:
166-
def __init__(self, event):
167-
self.response_type = event.response_type
168-
self.message = event.message
169-
self.current = event.current
170-
self.maximum = event.maximum
171-
self.status = event.task.status
172-
self.error = event.task.error
173-
174-
events = []
175-
task.listen(lambda event: events.append(TaskState(event)))
176-
177-
# Wait for task to finish.
178-
task.wait_for()
179-
180-
# Validate the execution result.
181-
assert TaskStatus.COMPLETE == task.status
182-
result = task.outputs["result"]
183-
assert 91 == result
184-
185-
# Validate the events received.
186-
187-
assert 93 == len(events)
188-
189-
launch = events[0]
190-
assert ResponseType.LAUNCH == launch.response_type
191-
assert TaskStatus.RUNNING == launch.status
192-
assert launch.message is None
193-
assert launch.current is None
194-
assert launch.maximum is None
195-
assert launch.error is None
196-
197-
v = 9999
198-
for i in range(91):
199-
v = v // 2 if v % 2 == 0 else 3 * v + 1
200-
update = events[i + 1]
201-
assert ResponseType.UPDATE == update.response_type
202-
assert TaskStatus.RUNNING == update.status
203-
assert f"[{i}] -> {v}" == update.message
204-
assert i == update.current
205-
assert update.maximum is None
206-
assert update.error is None
207-
208-
completion = events[92]
209-
assert ResponseType.COMPLETION == completion.response_type
210-
assert TaskStatus.COMPLETE == completion.status
211-
assert completion.message is None # no message from non-UPDATE response
212-
assert completion.current is None # no current from non-UPDATE response
213-
assert completion.maximum is None # no maximum from non-UPDATE response
214-
assert completion.error is None
215-
216-
217-
def maybe_debug(service):
218-
debug = os.getenv("DEBUG")
219-
if debug:
220-
service.debug(print)

0 commit comments

Comments
 (0)