Skip to content

Commit 74accea

Browse files
committed
dimos core built
1 parent 03ca7d6 commit 74accea

File tree

7 files changed

+752
-0
lines changed

7 files changed

+752
-0
lines changed

dimos/core/__init__.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import pytest
2+
from dask.distributed import Client, LocalCluster
3+
4+
import dimos.core.colors as colors
5+
from dimos.core.core import In, Out, RemoteOut, rpc
6+
from dimos.core.module_dask import Module
7+
from dimos.core.transport import LCMTransport, ZenohTransport, pLCMTransport
8+
9+
10+
def patchdask(dask_client: Client):
11+
def deploy(actor_class, *args, **kwargs):
12+
actor = dask_client.submit(
13+
actor_class,
14+
*args,
15+
**kwargs,
16+
actor=True,
17+
).result()
18+
19+
actor.set_ref(actor).result()
20+
print(f"\033[32msubsystem deployed: [{actor}]\033[0m")
21+
return actor
22+
23+
dask_client.deploy = deploy
24+
return dask_client
25+
26+
27+
@pytest.fixture
28+
def dimos():
29+
process_count = 3 # we chill
30+
cluster = LocalCluster(n_workers=process_count, threads_per_worker=3)
31+
client = Client(cluster)
32+
yield patchdask(client)
33+
client.close()
34+
cluster.close()
35+
36+
37+
def start(n):
38+
cluster = LocalCluster(n_workers=n, threads_per_worker=3)
39+
client = Client(cluster)
40+
return patchdask(client)
41+
42+
43+
def stop(client: Client):
44+
client.close()
45+
client.cluster.close()

dimos/core/colors.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Copyright 2025 Dimensional Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
def green(text: str) -> str:
17+
"""Return the given text in green color."""
18+
return f"\033[92m{text}\033[0m"
19+
20+
21+
def blue(text: str) -> str:
22+
"""Return the given text in blue color."""
23+
return f"\033[94m{text}\033[0m"
24+
25+
26+
def red(text: str) -> str:
27+
"""Return the given text in red color."""
28+
return f"\033[91m{text}\033[0m"
29+
30+
31+
def yellow(text: str) -> str:
32+
"""Return the given text in yellow color."""
33+
return f"\033[93m{text}\033[0m"
34+
35+
36+
def cyan(text: str) -> str:
37+
"""Return the given text in cyan color."""
38+
return f"\033[96m{text}\033[0m"
39+
40+
41+
def orange(text: str) -> str:
42+
"""Return the given text in orange color."""
43+
return f"\033[38;5;208m{text}\033[0m"

dimos/core/core.py

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
#!/usr/bin/env python3
2+
# Copyright 2025 Dimensional Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from __future__ import annotations
17+
18+
import enum
19+
import inspect
20+
import traceback
21+
from typing import (
22+
Any,
23+
Callable,
24+
Dict,
25+
Generic,
26+
List,
27+
Optional,
28+
Protocol,
29+
TypeVar,
30+
get_args,
31+
get_origin,
32+
get_type_hints,
33+
)
34+
35+
from dask.distributed import Actor
36+
37+
import dimos.core.colors as colors
38+
from dimos.core.o3dpickle import register_picklers
39+
40+
register_picklers()
41+
T = TypeVar("T")
42+
43+
44+
class Transport(Protocol[T]):
45+
# used by local Output
46+
def broadcast(self, selfstream: Out[T], value: T): ...
47+
48+
# used by local Input
49+
def subscribe(self, selfstream: In[T], callback: Callable[[T], any]) -> None: ...
50+
51+
52+
class DaskTransport(Transport[T]):
53+
subscribers: List[Callable[[T], None]]
54+
_started: bool = False
55+
56+
def __init__(self):
57+
self.subscribers = []
58+
59+
def __str__(self) -> str:
60+
return colors.yellow("DaskTransport")
61+
62+
def __reduce__(self):
63+
return (DaskTransport, ())
64+
65+
def broadcast(self, selfstream: RemoteIn[T], msg: T) -> None:
66+
for subscriber in self.subscribers:
67+
# there is some sort of a bug here with losing worker loop
68+
# print(subscriber.owner, subscriber.owner._worker, subscriber.owner._client)
69+
# subscriber.owner._try_bind_worker_client()
70+
# print(subscriber.owner, subscriber.owner._worker, subscriber.owner._client)
71+
72+
subscriber.owner.dask_receive_msg(subscriber.name, msg).result()
73+
74+
def dask_receive_msg(self, msg) -> None:
75+
for subscriber in self.subscribers:
76+
try:
77+
subscriber(msg)
78+
except Exception as e:
79+
print(
80+
colors.red("Error in DaskTransport subscriber callback:"),
81+
e,
82+
traceback.format_exc(),
83+
)
84+
85+
# for outputs
86+
def dask_register_subscriber(self, remoteInput: RemoteIn[T]) -> None:
87+
self.subscribers.append(remoteInput)
88+
89+
# for inputs
90+
def subscribe(self, selfstream: In[T], callback: Callable[[T], None]) -> None:
91+
if not self._started:
92+
selfstream.connection.owner.dask_register_subscriber(
93+
selfstream.connection.name, selfstream
94+
).result()
95+
self._started = True
96+
self.subscribers.append(callback)
97+
98+
99+
class State(enum.Enum):
100+
UNBOUND = "unbound" # descriptor defined but not bound
101+
READY = "ready" # bound to owner but not yet connected
102+
CONNECTED = "connected" # input bound to an output
103+
FLOWING = "flowing" # runtime: data observed
104+
105+
106+
class Stream(Generic[T]):
107+
_transport: Optional[Transport]
108+
109+
def __init__(
110+
self,
111+
type: type[T],
112+
name: str,
113+
owner: Optional[Any] = None,
114+
transport: Optional[Transport] = None,
115+
):
116+
self.name = name
117+
self.owner = owner
118+
self.type = type
119+
if transport:
120+
self._transport = transport
121+
if not hasattr(self, "_transport"):
122+
self._transport = None
123+
124+
@property
125+
def type_name(self) -> str:
126+
return getattr(self.type, "__name__", repr(self.type))
127+
128+
def _color_fn(self) -> Callable[[str], str]:
129+
if self.state == State.UNBOUND:
130+
return colors.orange
131+
if self.state == State.READY:
132+
return colors.blue
133+
if self.state == State.CONNECTED:
134+
return colors.green
135+
return lambda s: s
136+
137+
def __str__(self) -> str: # noqa: D401
138+
return (
139+
self.__class__.__name__
140+
+ " "
141+
+ self._color_fn()(f"{self.name}[{self.type_name}]")
142+
+ " @ "
143+
+ (
144+
colors.orange(self.owner)
145+
if isinstance(self.owner, Actor)
146+
else colors.green(self.owner)
147+
)
148+
+ ("" if not self._transport else " via " + str(self._transport))
149+
)
150+
151+
152+
class Out(Stream[T]):
153+
_transport: Transport
154+
155+
def __init__(self, *argv, **kwargs):
156+
super().__init__(*argv, **kwargs)
157+
if not hasattr(self, "_transport") or self._transport is None:
158+
self._transport = DaskTransport()
159+
160+
@property
161+
def transport(self) -> Transport[T]:
162+
return self._transport
163+
164+
@property
165+
def state(self) -> State: # noqa: D401
166+
return State.UNBOUND if self.owner is None else State.READY
167+
168+
def __reduce__(self): # noqa: D401
169+
if self.owner is None or not hasattr(self.owner, "ref"):
170+
raise ValueError("Cannot serialise Out without an owner ref")
171+
return (
172+
RemoteOut,
173+
(
174+
self.type,
175+
self.name,
176+
self.owner.ref,
177+
self._transport,
178+
),
179+
)
180+
181+
def publish(self, msg):
182+
self._transport.broadcast(self, msg)
183+
184+
185+
class RemoteStream(Stream[T]):
186+
@property
187+
def state(self) -> State: # noqa: D401
188+
return State.UNBOUND if self.owner is None else State.READY
189+
190+
@property
191+
def transport(self) -> Transport[T]:
192+
return self._transport
193+
194+
@transport.setter
195+
def transport(self, value: Transport[T]) -> None:
196+
self.owner.set_transport(self.name, value).result()
197+
self._transport = value
198+
199+
200+
class RemoteOut(RemoteStream[T]):
201+
def connect(self, other: RemoteIn[T]):
202+
return other.connect(self)
203+
204+
205+
class In(Stream[T]):
206+
connection: Optional[RemoteOut[T]] = None
207+
208+
def __str__(self):
209+
return super().__str__() + ("" if not self.connection else f" <- {self.connection}")
210+
211+
def __reduce__(self): # noqa: D401
212+
if self.owner is None or not hasattr(self.owner, "ref"):
213+
raise ValueError("Cannot serialise Out without an owner ref")
214+
return (RemoteIn, (self.type, self.name, self.owner.ref, self._transport))
215+
216+
@property
217+
def transport(self) -> Transport[T]:
218+
return self.connection.transport
219+
220+
@property
221+
def state(self) -> State: # noqa: D401
222+
return State.UNBOUND if self.owner is None else State.READY
223+
224+
def subscribe(self, cb):
225+
# print("SUBBING", self, self.connection._transport)
226+
self.connection._transport.subscribe(self, cb)
227+
228+
229+
class RemoteIn(RemoteStream[T]):
230+
def connect(self, other: RemoteOut[T]) -> None:
231+
return self.owner.connect_stream(self.name, other).result()
232+
233+
234+
def rpc(fn: Callable[..., Any]) -> Callable[..., Any]:
235+
fn.__rpc__ = True # type: ignore[attr-defined]
236+
return fn
237+
238+
239+
daskTransport = DaskTransport() # singleton instance for use in Out/RemoteOut

0 commit comments

Comments
 (0)