Skip to content

Commit 7cd313b

Browse files
ctruedenclaude
andcommitted
Add automatic proxy support for non-serializable objects
This commit implements transparent proxying of non-JSON-serializable objects returned from worker tasks, along with enhanced proxy functionality for better ergonomics. Key changes: - Non-serializable task outputs are automatically exported as worker_object references and wrapped in ProxyObject instances - ProxyObject (renamed from ProxyHandler) now supports: - Immediate attribute access via __getattr__ (returns values or nested proxies) - Calling proxied callables via __call__ - Natural chaining: proxy.obj.method(args) - Auto-generated proxy variables use _appose_auto_N naming pattern Implementation details: - Python: message.py encoder detects non-serializable objects and creates worker_object references with auto-export - Groovy: Messages.java includes catch-all converter for same behavior - Service-side: proxify_worker_objects() recursively converts worker_object dicts to ProxyObject instances in task outputs Tests added: - test_auto_proxy: Tests automatic proxying with datetime, custom classes, and nested objects - test_callable_proxy: Tests proxying of callable objects (lambdas and callable classes) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent e0de14d commit 7cd313b

File tree

5 files changed

+224
-37
lines changed

5 files changed

+224
-37
lines changed

src/appose/python_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ def __init__(self):
177177

178178
# Flag this process as a worker, not a service.
179179
message._worker_mode = True
180+
# Store reference to this worker for auto-export functionality.
181+
message._worker_instance = self
180182

181183
def run(self) -> None:
182184
"""

src/appose/service.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from .syntax import ScriptSyntax, get as syntax_from_name
2222
from .util import process
23-
from .util.message import Args, decode, encode
23+
from .util.message import Args, decode, encode, proxify_worker_objects
2424

2525

2626
class TaskException(Exception):
@@ -685,7 +685,9 @@ def _handle(self, response: Args) -> None:
685685
self.status = TaskStatus.COMPLETE
686686
outputs = response.get("outputs")
687687
if outputs is not None:
688-
self.outputs.update(outputs)
688+
# Convert any worker_object references to ProxyObject instances.
689+
proxified_outputs = proxify_worker_objects(outputs, self.service)
690+
self.outputs.update(proxified_outputs)
689691
elif response_type == ResponseType.CANCELATION:
690692
self.service._tasks.pop(self.uuid, None)
691693
self.status = TaskStatus.CANCELED

src/appose/util/message.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919
# Set to True by python_worker.Worker.__init__().
2020
_worker_mode = False
2121

22+
# Counter for auto-generated proxy variable names.
23+
_proxy_counter = 0
24+
25+
# Reference to the worker instance, needed for auto-exporting.
26+
_worker_instance = None
27+
2228

2329
def encode(data: Args) -> str:
2430
return json.dumps(data, cls=_ApposeJSONEncoder, separators=(",", ":"))
@@ -43,6 +49,23 @@ def default(self, obj):
4349
"shape": obj.shape,
4450
"shm": obj.shm,
4551
}
52+
53+
# If in worker mode and object is not JSON-serializable,
54+
# auto-export it and return a worker_object reference.
55+
if _worker_mode:
56+
global _proxy_counter
57+
var_name = f"_appose_auto_{_proxy_counter}"
58+
_proxy_counter += 1
59+
60+
# Export the object so it persists for future tasks.
61+
if _worker_instance is not None:
62+
_worker_instance.exports[var_name] = obj
63+
64+
return {
65+
"appose_type": "worker_object",
66+
"var_name": var_name,
67+
}
68+
4669
return super().default(obj)
4770

4871

@@ -53,5 +76,41 @@ def _appose_object_hook(obj: dict):
5376
return SharedMemory(name=(obj["name"]), rsize=(obj["rsize"]))
5477
elif atype == "ndarray":
5578
return NDArray(obj["dtype"], obj["shape"], obj["shm"])
79+
elif atype == "worker_object":
80+
# Keep worker_object dicts as-is for now.
81+
# They will be converted to proxies by proxify_worker_objects().
82+
return obj
5683
else:
5784
return obj
85+
86+
87+
def proxify_worker_objects(data: Any, service: Any) -> Any:
88+
"""
89+
Recursively convert worker_object dicts to ProxyObject instances.
90+
91+
This is called on task outputs after JSON deserialization to convert
92+
any worker_object references into actual proxy objects.
93+
94+
Args:
95+
data: The data structure (potentially) containing worker_object dicts.
96+
service: The Service instance to use for creating proxies.
97+
98+
Returns:
99+
The data with worker_object dicts replaced by ProxyObject instances.
100+
"""
101+
if isinstance(data, dict):
102+
if data.get("appose_type") == "worker_object":
103+
# Convert this worker_object dict to a ProxyObject.
104+
from .proxy import create
105+
106+
var_name = data["var_name"]
107+
return create(service, var_name, queue=None)
108+
else:
109+
# Recursively process dict values.
110+
return {k: proxify_worker_objects(v, service) for k, v in data.items()}
111+
elif isinstance(data, list):
112+
# Recursively process list elements.
113+
return [proxify_worker_objects(item, service) for item in data]
114+
else:
115+
# Primitive value, return as-is.
116+
return data

src/appose/util/proxy.py

Lines changed: 55 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,35 @@
33
# SPDX-License-Identifier: BSD-2-Clause
44

55
"""
6-
Utility functions for creating local proxy objects that provide strongly typed
7-
access to remote objects living in Appose worker processes.
6+
Utility functions for creating local proxy objects that provide access to
7+
remote objects living in Appose worker processes.
88
9-
A proxy object forwards method calls to a corresponding object in the worker
10-
process by generating and executing scripts via Tasks. This provides a more
11-
natural, object-oriented API compared to manually constructing script strings
12-
for each method invocation.
9+
A proxy object forwards attribute accesses and method calls to a corresponding
10+
object in the worker process by generating and executing scripts via Tasks.
11+
This provides a natural, object-oriented API compared to manually constructing
12+
script strings for each operation.
1313
14-
Type safety is honor-system based: The interface you provide must match the
15-
actual methods and signatures of the remote object. If there's a mismatch,
16-
you'll get runtime errors from the worker process.
14+
Proxy objects support:
15+
- Attribute access: proxy.field returns the field value (or another proxy)
16+
- Method calls: proxy.method(args) invokes the method remotely
17+
- Chaining: proxy.obj.method(args) works naturally
18+
- Callables: proxy() invokes the proxied object if it's callable
1719
1820
Usage pattern: First, create and export the remote object via a task,
1921
then create a proxy to interact with it:
2022
2123
service = env.python()
2224
service.task("task.export(my_obj=MyClass())").wait_for()
23-
proxy = service.proxy("my_obj", MyInterface)
25+
proxy = service.proxy("my_obj")
2426
result = proxy.some_method(42) # Executes remotely
2527
28+
Automatic proxying: When a task returns a non-JSON-serializable object, it's
29+
automatically exported and returned as a proxy object:
30+
31+
counter = service.task("import collections; collections.Counter('abbc')").wait_for().result()
32+
# counter is now a ProxyObject wrapping the remote Counter
33+
total = counter.total() # Access the total method and call it
34+
2635
Important: Variables must be explicitly exported using task.export(varName=value)
2736
in a previous task before they can be proxied. Exported variables persist across
2837
tasks within the same service.
@@ -77,34 +86,45 @@ def create(service: Service, var: str, queue: str | None = None) -> Any:
7786
RuntimeError: If a proxied method call fails in the worker process.
7887
"""
7988

80-
class ProxyHandler:
89+
class ProxyObject:
8190
def __init__(self, service: Service, var: str, queue: str | None):
8291
self._service = service
8392
self._var = var
8493
self._queue = queue
8594

8695
def __getattr__(self, name: str):
87-
def method(*args):
88-
# Construct map of input arguments.
89-
inputs = {}
90-
arg_names = []
91-
for i, arg in enumerate(args):
92-
arg_name = f"arg{i}"
93-
inputs[arg_name] = arg
94-
arg_names.append(arg_name)
95-
96-
# Use the service's ScriptSyntax to generate the method invocation script.
97-
# This allows support for different languages with varying syntax.
98-
syntax.validate(self._service)
99-
script = self._service._syntax.invoke_method(self._var, name, arg_names)
100-
101-
try:
102-
task = self._service.task(script, inputs, self._queue)
103-
task.wait_for()
104-
return task.result()
105-
except Exception as e:
106-
raise RuntimeError(str(e)) from e
107-
108-
return method
109-
110-
return ProxyHandler(service, var, queue) # type: ignore
96+
# Immediately evaluate the attribute access on the worker.
97+
attr_expr = f"{self._var}.{name}"
98+
99+
try:
100+
task = self._service.task(attr_expr, queue=self._queue)
101+
task.wait_for()
102+
result = task.result()
103+
# If result is a worker_object, it will already be a ProxyObject
104+
# thanks to proxify_worker_objects() in Task._handle().
105+
return result
106+
except Exception as e:
107+
raise RuntimeError(str(e)) from e
108+
109+
def __call__(self, *args):
110+
# Invoke the proxied object as a callable.
111+
# Construct map of input arguments.
112+
inputs = {}
113+
arg_names = []
114+
for i, arg in enumerate(args):
115+
arg_name = f"arg{i}"
116+
inputs[arg_name] = arg
117+
arg_names.append(arg_name)
118+
119+
# Use the service's ScriptSyntax to generate the call script.
120+
syntax.validate(self._service)
121+
script = self._service._syntax.call(self._var, arg_names)
122+
123+
try:
124+
task = self._service.task(script, inputs, self._queue)
125+
task.wait_for()
126+
return task.result()
127+
except Exception as e:
128+
raise RuntimeError(str(e)) from e
129+
130+
return ProxyObject(service, var, queue) # type: ignore

tests/test_syntax.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,107 @@ def walk(self, rate):
173173
fish.dive(100) == "Swam down 100 deep"
174174
or fish.dive(100) == "Swam down 100.0 deep"
175175
)
176+
177+
178+
def test_auto_proxy():
179+
"""Test automatic proxying of non-serializable task outputs."""
180+
env = appose.system()
181+
with env.python() as service:
182+
maybe_debug(service)
183+
184+
# Return a non-serializable object from a task - should auto-proxy
185+
# datetime is not JSON-serializable
186+
dt = service.task("import datetime\ndatetime.datetime(2024, 1, 15, 10, 30, 45)").wait_for().result()
187+
188+
# dt should be a proxy object now
189+
assert dt is not None
190+
191+
# Access attributes on the proxied datetime
192+
assert dt.year == 2024
193+
assert dt.month == 1
194+
assert dt.day == 15
195+
196+
# Call a method
197+
iso_str = dt.isoformat()
198+
assert "2024-01-15T10:30:45" == iso_str
199+
200+
# Access a field that returns a primitive type
201+
# Counter doesn't have simple fields, so let's create a custom class
202+
custom = service.task("""
203+
class CustomClass:
204+
def __init__(self):
205+
self.value = 42
206+
self.name = "test"
207+
208+
def get_double(self):
209+
return self.value * 2
210+
211+
CustomClass()
212+
""").wait_for().result()
213+
214+
# Access primitive fields
215+
assert custom.value == 42
216+
assert custom.name == "test"
217+
218+
# Call a method
219+
assert custom.get_double() == 84
220+
221+
# Test nested object access
222+
nested = service.task("""
223+
class Inner:
224+
def __init__(self):
225+
self.data = "inner_data"
226+
227+
def process(self, x):
228+
return f"processed: {x}"
229+
230+
class Outer:
231+
def __init__(self):
232+
self.inner = Inner()
233+
self.label = "outer"
234+
235+
Outer()
236+
""").wait_for().result()
237+
238+
# Access nested object
239+
assert nested.label == "outer"
240+
inner = nested.inner
241+
assert inner.data == "inner_data"
242+
assert inner.process("test") == "processed: test"
243+
244+
# Or chain it all together
245+
result = nested.inner.process("chained")
246+
assert result == "processed: chained"
247+
248+
249+
def test_callable_proxy():
250+
"""Test that proxied callable objects work correctly."""
251+
env = appose.system()
252+
with env.python() as service:
253+
maybe_debug(service)
254+
255+
# Create a lambda function
256+
func = service.task("lambda x, y: x + y").wait_for().result()
257+
258+
# Call it
259+
result = func(10, 32)
260+
assert result == 42
261+
262+
# Create a custom callable class
263+
callable_obj = service.task("""
264+
class Adder:
265+
def __init__(self, offset):
266+
self.offset = offset
267+
268+
def __call__(self, x):
269+
return x + self.offset
270+
271+
Adder(100)
272+
""").wait_for().result()
273+
274+
# Call the callable object
275+
result = callable_obj(23)
276+
assert result == 123
277+
278+
# Access a field on the callable object
279+
assert callable_obj.offset == 100

0 commit comments

Comments
 (0)