Skip to content

Commit df60de1

Browse files
committed
actors 3 sketch
1 parent 52a1cf8 commit df60de1

File tree

4 files changed

+377
-0
lines changed

4 files changed

+377
-0
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import pytest
2+
from dask.distributed import Client, LocalCluster
3+
4+
from dimos.multiprocess.actors3.base import In, Out, RemoteOut, rpc
5+
from dimos.multiprocess.actors3.module_dask import Module
6+
7+
8+
def patchdask(dask_client: Client):
9+
def deploy(actor_class, *args, **kwargs):
10+
actor = dask_client.submit(
11+
actor_class,
12+
*args,
13+
**kwargs,
14+
actor=True,
15+
).result()
16+
17+
actor.set_ref(actor).result()
18+
print(f"\033[32msubsystem deployed: [{actor}]\033[0m")
19+
return actor
20+
21+
dask_client.deploy = deploy
22+
return dask_client
23+
24+
25+
@pytest.fixture
26+
def dimos():
27+
process_count = 3 # we chill
28+
cluster = LocalCluster(n_workers=process_count, threads_per_worker=3)
29+
client = Client(cluster)
30+
yield patchdask(client)
31+
client.close()
32+
cluster.close()

dimos/multiprocess/actors3/base.py

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#!/usr/bin/env python3
2+
# Copyright 2025 Dimensional Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from __future__ import annotations
17+
18+
import enum
19+
import inspect
20+
from typing import (
21+
Any,
22+
Callable,
23+
Dict,
24+
Generic,
25+
List,
26+
Protocol,
27+
TypeVar,
28+
get_args,
29+
get_origin,
30+
get_type_hints,
31+
)
32+
33+
from dask.distributed import Actor
34+
35+
from dimos.multiprocess.actors2 import colors
36+
from dimos.multiprocess.actors2.o3dpickle import register_picklers
37+
38+
register_picklers()
39+
T = TypeVar("T")
40+
41+
42+
class State(enum.Enum):
43+
UNBOUND = "unbound" # descriptor defined but not bound
44+
READY = "ready" # bound to owner but not yet connected
45+
CONNECTED = "connected" # input bound to an output
46+
FLOWING = "flowing" # runtime: data observed
47+
48+
49+
class Stream(Generic[T]):
50+
transport = None
51+
52+
def __init__(self, type: type[T], name: str, owner: Any | None = None):
53+
self.name = name
54+
self.owner = owner
55+
self.type = type
56+
57+
@property
58+
def type_name(self) -> str:
59+
return getattr(self.type, "__name__", repr(self.type))
60+
61+
def _color_fn(self) -> Callable[[str], str]:
62+
if self.state == State.UNBOUND:
63+
return colors.orange
64+
if self.state == State.READY:
65+
return colors.blue
66+
if self.state == State.CONNECTED:
67+
return colors.green
68+
return lambda s: s
69+
70+
def __str__(self) -> str: # noqa: D401
71+
return (
72+
self.__class__.__name__
73+
+ " "
74+
+ self._color_fn()(f"{self.name}[{self.type_name}]")
75+
+ " @ "
76+
+ colors.orange(self.owner)
77+
if isinstance(self.owner, Actor)
78+
else colors.green(self.owner)
79+
)
80+
81+
82+
class Out(Stream[T]):
83+
@property
84+
def state(self) -> State: # noqa: D401
85+
return State.UNBOUND if self.owner is None else State.READY
86+
87+
def __reduce__(self): # noqa: D401
88+
if self.owner is None or not hasattr(self.owner, "ref"):
89+
raise ValueError("Cannot serialise Out without an owner ref")
90+
return (RemoteOut, (self.type, self.name, self.owner.ref))
91+
92+
def publish(self, msg): ...
93+
94+
95+
class RemoteOut(Stream[T]):
96+
@property
97+
def state(self) -> State: # noqa: D401
98+
return State.UNBOUND if self.owner is None else State.READY
99+
100+
101+
class In(Stream[T]):
102+
connection: RemoteOut[T] | None = None
103+
104+
def subscribe(self, cb): ...
105+
106+
def __reduce__(self): # noqa: D401
107+
if self.owner is None or not hasattr(self.owner, "ref"):
108+
raise ValueError("Cannot serialise Out without an owner ref")
109+
return (RemoteIn, (self.type, self.name, self.owner.ref))
110+
111+
@property
112+
def state(self) -> State: # noqa: D401
113+
return State.UNBOUND if self.owner is None else State.READY
114+
115+
116+
class RemoteIn(Stream[T]):
117+
@property
118+
def state(self) -> State: # noqa: D401
119+
return State.UNBOUND if self.owner is None else State.READY
120+
121+
def connect(self, other: Out[T]) -> None:
122+
print("sub request from", self, "to", other)
123+
124+
125+
def rpc(fn: Callable[..., Any]) -> Callable[..., Any]:
126+
"""Mark *fn* as remotely callable."""
127+
128+
fn.__rpc__ = True # type: ignore[attr-defined]
129+
return fn
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Copyright 2025 Dimensional Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import inspect
16+
from typing import (
17+
Any,
18+
Callable,
19+
Dict,
20+
Generic,
21+
List,
22+
Protocol,
23+
TypeVar,
24+
get_args,
25+
get_origin,
26+
get_type_hints,
27+
)
28+
29+
from dask.distributed import Actor
30+
31+
from dimos.multiprocess.actors3.base import In, Out, RemoteIn
32+
33+
34+
class Module:
35+
ref: Actor
36+
37+
def __init__(self):
38+
self.ref = None
39+
40+
for name, ann in get_type_hints(self, include_extras=True).items():
41+
origin = get_origin(ann)
42+
if origin is Out:
43+
inner, *_ = get_args(ann) or (Any,)
44+
stream = Out(inner, name, self)
45+
setattr(self, name, stream)
46+
elif origin is In:
47+
inner, *_ = get_args(ann) or (Any,)
48+
stream = In(inner, name, self)
49+
setattr(self, name, stream)
50+
51+
def set_ref(self, ref):
52+
self.ref = ref
53+
54+
def __str__(self):
55+
return f"{self.__class__.__name__}"
56+
57+
@property
58+
def outputs(self) -> dict[str, Out]:
59+
return {
60+
name: s
61+
for name, s in self.__dict__.items()
62+
if isinstance(s, Out) and not name.startswith("_")
63+
}
64+
65+
@property
66+
def inputs(self) -> dict[str, In]:
67+
return {
68+
name: s
69+
for name, s in self.__dict__.items()
70+
if isinstance(s, In) and not name.startswith("_")
71+
}
72+
73+
@property
74+
def rpcs(self) -> List[Callable]:
75+
return [
76+
getattr(self, name)
77+
for name in dir(self)
78+
if callable(getattr(self, name)) and hasattr(getattr(self, name), "__rpc__")
79+
]
80+
81+
def io(self) -> str:
82+
def _box(name: str) -> str:
83+
return [
84+
"┌┴" + "─" * (len(name) + 1) + "┐",
85+
f"│ {name} │",
86+
"└┬" + "─" * (len(name) + 1) + "┘",
87+
]
88+
89+
ret = [
90+
*(f" ├─ {name:<16} {stream}" for name, stream in self.inputs.items()),
91+
*_box(self.__class__.__name__),
92+
*(f" ├─ {name:<16} {stream}" for name, stream in self.outputs.items()),
93+
]
94+
95+
return "\n".join(ret)
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#!/usr/bin/env python3
2+
3+
# Copyright 2025 Dimensional Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
import time
18+
from threading import Event, Thread
19+
20+
from dimos.multiprocess.actors3 import In, Module, Out, RemoteOut, dimos, rpc
21+
from dimos.robot.unitree_webrtc.type.lidar import LidarMessage
22+
from dimos.robot.unitree_webrtc.type.odometry import Odometry
23+
from dimos.types.vector import Vector
24+
from dimos.utils.testing import SensorReplay
25+
26+
# never delete this line
27+
if dimos:
28+
...
29+
30+
31+
class RobotClient(Module):
32+
odometry: Out[Odometry] = None
33+
lidar: Out[LidarMessage] = None
34+
mov: In[Vector] = None
35+
36+
mov_msg_count = 0
37+
38+
def mov_callback(self, msg):
39+
self.mov_msg_count += 1
40+
print("MOV REQ", msg)
41+
42+
def __init__(self):
43+
super().__init__()
44+
print(self)
45+
print("ODOM GET TEST", getattr(self, "lidar"))
46+
self._stop_event = Event()
47+
self._thread = None
48+
49+
def start(self):
50+
self._thread = Thread(target=self.odomloop)
51+
self._thread.start()
52+
self.mov.subscribe(self.mov_callback)
53+
54+
def odomloop(self):
55+
odomdata = SensorReplay("raw_odometry_rotate_walk", autocast=Odometry.from_msg)
56+
lidardata = SensorReplay("office_lidar", autocast=LidarMessage.from_msg)
57+
58+
lidariter = lidardata.iterate()
59+
self._stop_event.clear()
60+
while not self._stop_event.is_set():
61+
for odom in odomdata.iterate():
62+
if self._stop_event.is_set():
63+
return
64+
# print(odom)
65+
odom.pubtime = time.perf_counter()
66+
self.odometry.publish(odom)
67+
68+
lidarmsg = next(lidariter)
69+
lidarmsg.pubtime = time.perf_counter()
70+
self.lidar.publish(lidarmsg)
71+
time.sleep(0.1)
72+
73+
def stop(self):
74+
self._stop_event.set()
75+
if self._thread and self._thread.is_alive():
76+
self._thread.join(timeout=1.0) # Wait up to 1 second for clean shutdown
77+
78+
79+
class Navigation(Module):
80+
mov: Out[Vector] = None
81+
lidar: In[LidarMessage] = None
82+
target_position: In[Vector] = None
83+
odometry: In[Odometry] = None
84+
85+
odom_msg_count = 0
86+
lidar_msg_count = 0
87+
88+
@rpc
89+
def navigate_to(self, target: Vector) -> bool: ...
90+
91+
def __init__(self):
92+
super().__init__()
93+
94+
@rpc
95+
def start(self):
96+
def _odom(msg):
97+
self.odom_msg_count += 1
98+
print("RCV:", (time.perf_counter() - msg.pubtime) * 1000, msg)
99+
self.mov.publish(msg.pos)
100+
101+
self.odometry.subscribe(_odom)
102+
103+
def _lidar(msg):
104+
self.lidar_msg_count += 1
105+
print("RCV:", (time.perf_counter() - msg.pubtime) * 1000, msg)
106+
107+
self.lidar.subscribe(_lidar)
108+
109+
110+
def test_deployment(dimos):
111+
robot = dimos.deploy(RobotClient)
112+
target_stream = RemoteOut[Vector](Vector, "target")
113+
114+
print("\n")
115+
print("lidar stream", robot.lidar)
116+
print("target stream", target_stream)
117+
print("odom stream", robot.odometry)
118+
119+
nav = dimos.deploy(Navigation)
120+
print(nav.io().result())
121+
nav.lidar.connect(robot.lidar)

0 commit comments

Comments
 (0)