Skip to content

Commit 8fbb63c

Browse files
committed
LCM/Zenoh transport stub
1 parent daabc2d commit 8fbb63c

File tree

3 files changed

+32
-8
lines changed

3 files changed

+32
-8
lines changed

dimos/multiprocess/actors3/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import pytest
22
from dask.distributed import Client, LocalCluster
33

4-
from dimos.multiprocess.actors3.base import In, LCMTransport, Out, RemoteOut, rpc
4+
from dimos.multiprocess.actors3.base import In, LCMTransport, Out, RemoteOut, ZenohTransport, rpc
55
from dimos.multiprocess.actors3.module_dask import Module
66

77

dimos/multiprocess/actors3/base.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def connect(self, selfstream: RemoteIn[T], otherstream: RemoteOut[T]) -> None:
6060
def broadcast(self, selfstream: Out[T], value: T): ...
6161

6262

63-
class LCMTransport(Transport[T]):
63+
class PubSubTransport(Transport[T]):
6464
topic: str
6565
type: type
6666

@@ -69,9 +69,17 @@ def __init__(self, topic: str, type: type):
6969
self.type = type
7070

7171
def __str__(self) -> str:
72-
return colors.green("LCM(") + colors.blue(self.topic) + colors.green(")")
72+
return (
73+
colors.green(f"{self.__class__.__name__}(")
74+
+ colors.blue(self.topic)
75+
+ colors.green(")")
76+
)
7377

74-
def broadcast(self, selfstream: Out[T], value: T): ...
78+
79+
class LCMTransport(PubSubTransport[T]): ...
80+
81+
82+
class ZenohTransport(PubSubTransport[T]): ...
7583

7684

7785
class State(enum.Enum):
@@ -176,8 +184,6 @@ class In(Stream[T]):
176184
def __str__(self):
177185
return super().__str__() + ("" if not self.connection else f" <- {self.connection}")
178186

179-
def subscribe(self, cb): ...
180-
181187
def __reduce__(self): # noqa: D401
182188
if self.owner is None or not hasattr(self.owner, "ref"):
183189
raise ValueError("Cannot serialise Out without an owner ref")
@@ -187,10 +193,18 @@ def __reduce__(self): # noqa: D401
187193
def state(self) -> State: # noqa: D401
188194
return State.UNBOUND if self.owner is None else State.READY
189195

196+
# actual message passing implementation
197+
def connect_remote(self):
198+
self._transport.connect(self.connection)
199+
200+
def disconnect_remote(self):
201+
self._transport.disconnect()
202+
203+
def subscribe(self, cb): ...
204+
190205

191206
class RemoteIn(RemoteStream[T]):
192207
def connect(self, other: RemoteOut[T]) -> None:
193-
print("CONENCT REQU", other)
194208
return self.owner.connect_stream(self.name, other).result()
195209

196210

dimos/multiprocess/actors3/test_base.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,16 @@
1515
import time
1616
from threading import Event, Thread
1717

18-
from dimos.multiprocess.actors3 import In, LCMTransport, Module, Out, RemoteOut, dimos, rpc
18+
from dimos.multiprocess.actors3 import (
19+
In,
20+
LCMTransport,
21+
Module,
22+
Out,
23+
RemoteOut,
24+
ZenohTransport,
25+
dimos,
26+
rpc,
27+
)
1928
from dimos.robot.unitree_webrtc.type.lidar import LidarMessage
2029
from dimos.robot.unitree_webrtc.type.odometry import Odometry
2130
from dimos.types.vector import Vector
@@ -108,6 +117,7 @@ def test_deployment(dimos):
108117
target_stream = RemoteOut[Vector](Vector, "target")
109118

110119
robot.lidar.transport = LCMTransport("/lidar", LidarMessage)
120+
robot.odometry.transport = ZenohTransport("/odom", LidarMessage)
111121

112122
print("\n")
113123
print("lidar stream", robot.lidar)

0 commit comments

Comments
 (0)