Skip to content

Commit 43b0540

Browse files
committed
custom o3d pointcloud picklers
1 parent 15beed6 commit 43b0540

File tree

5 files changed

+112
-29
lines changed

5 files changed

+112
-29
lines changed

dimos/multiprocess/actors2/base.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@
1414

1515
from __future__ import annotations
1616

17+
import copyreg
1718
import enum
1819
import inspect
1920
from typing import Any, Callable, Generic, TypeVar, get_args, get_origin, get_type_hints
2021

2122
from distributed.actor import Actor
2223

2324
import dimos.multiprocess.actors2.colors as colors
25+
from dimos.multiprocess.actors2.o3dpickle import register_picklers
26+
27+
register_picklers()
2428

2529
T = TypeVar("T")
2630

@@ -149,13 +153,12 @@ def __reduce__(self) -> Any:
149153
return (RemoteOut, (self.type, self.name, self.owner.ref if self.owner else None))
150154

151155
def publish(self, value: T):
152-
print("PUB REQ", self.owner, value)
153156
for sub in self.subscribers:
154-
print("PUBLISHING", value, "to", sub)
155157
sub.owner.receive_msg(sub.name, value)
156158

157159
def subscribe(self, remote_input):
158160
print(self, "adding remote input to subscribers", remote_input)
161+
remote_input.owner._try_bind_worker_client()
159162
self.subscribers.append(remote_input)
160163
print(self.subscribers)
161164

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Copyright 2025 Dimensional Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import copyreg
16+
17+
import numpy as np
18+
import open3d as o3d
19+
20+
21+
def reduce_external(obj):
22+
# Convert Vector3dVector to numpy array for pickling
23+
points_array = np.asarray(obj.points)
24+
return (reconstruct_pointcloud, (points_array,))
25+
26+
27+
def reconstruct_pointcloud(points_array):
28+
# Create new PointCloud and assign the points
29+
pc = o3d.geometry.PointCloud()
30+
pc.points = o3d.utility.Vector3dVector(points_array)
31+
return pc
32+
33+
34+
def register_picklers():
35+
# Register for the actual PointCloud class that gets instantiated
36+
# We need to create a dummy PointCloud to get its actual class
37+
_dummy_pc = o3d.geometry.PointCloud()
38+
copyreg.pickle(_dummy_pc.__class__, reduce_external)

dimos/multiprocess/actors2/test_base.py

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from dimos.multiprocess.actors2.base import In, Module, Out, RemoteOut, module, rpc
1919
from dimos.multiprocess.actors2.base_dask import dimos
20+
from dimos.robot.unitree_webrtc.type.lidar import LidarMessage
2021
from dimos.robot.unitree_webrtc.type.map import Map
2122
from dimos.robot.unitree_webrtc.type.odometry import Odometry
2223
from dimos.types.path import Path
@@ -27,33 +28,42 @@
2728
@module
2829
class RobotClient(Module):
2930
odometry: Out[Odometry]
31+
lidar: Out[LidarMessage]
3032

3133
def __init__(self):
3234
self.odometry = Out(Odometry, "odometry", self)
35+
3336
self._stop_event = Event()
3437
self._thread = None
3538

36-
@rpc
3739
def start(self):
3840
self._thread = Thread(target=self.odomloop)
3941
self._thread.start()
4042

4143
def odomloop(self):
4244
odomdata = SensorReplay("raw_odometry_rotate_walk", autocast=Odometry.from_msg)
45+
lidardata = SensorReplay("office_lidar", autocast=LidarMessage.from_msg)
46+
47+
lidariter = lidardata.iterate()
4348
self._stop_event.clear()
4449
while not self._stop_event.is_set():
4550
for odom in odomdata.iterate():
4651
if self._stop_event.is_set():
52+
print("Stopping odometry stream")
4753
return
54+
# print(odom)
4855
odom.pubtime = time.perf_counter()
4956
self.odometry.publish(odom)
57+
58+
lidarmsg = next(lidariter)
59+
lidarmsg.pubtime = time.perf_counter()
60+
self.lidar.publish(lidarmsg)
5061
time.sleep(0.1)
5162

52-
@rpc
5363
def stop(self):
5464
self._stop_event.set()
5565
if self._thread and self._thread.is_alive():
56-
self._thread.join(timeout=1.0)
66+
self._thread.join(timeout=1.0) # Wait up to 1 second for clean shutdown
5767

5868

5969
@module
@@ -66,18 +76,23 @@ def navigate_to(self, target: Vector) -> bool: ...
6676
def __init__(
6777
self,
6878
target_position: In[Vector],
69-
map_stream: In[Map],
79+
lidar: In[LidarMessage],
7080
odometry: In[Odometry],
7181
):
7282
self.target_position = target_position
73-
self.map_stream = map_stream
83+
self.lidar = lidar
7484
self.odometry = odometry
7585
self.target_path = Out(Path, "target_path")
7686

7787
@rpc
7888
def start(self):
7989
print("navigation odom stream is, subscribing", self.odometry)
80-
self.odometry.subscribe(print)
90+
self.odometry.subscribe(
91+
lambda msg: print("RCV:", (time.perf_counter() - msg.pubtime) * 1000, msg)
92+
)
93+
self.lidar.subscribe(
94+
lambda msg: print("RCV:", (time.perf_counter() - msg.pubtime) * 1000, msg)
95+
)
8196

8297

8398
def test_introspection():
@@ -97,46 +112,31 @@ def test_instance_introspection():
97112
robot = RobotClient()
98113
print(robot)
99114

100-
map_stream = Out[Map](Map, "map")
101115
target_stream = Out[Vector](Vector, "map")
102116
print("\n")
103-
print("map stream", map_stream)
117+
print("lidar stream", robot.lidar)
104118
print("target stream", target_stream)
105119
print("odom stream", robot.odometry)
106120

107-
nav = Navigation(target_position=target_stream, map_stream=map_stream, odometry=robot.odometry)
121+
nav = Navigation(target_position=target_stream, lidar=robot.lidar, odometry=robot.odometry)
108122
"""Test introspection of the Navigation module."""
109123
assert hasattr(nav, "inputs")
110124
assert hasattr(nav, "rpcs")
111125
print("\n\n\n" + nav.io(), "\n\n")
112126

113127

114-
# nav.start()
115-
116-
117128
def test_deployment(dimos):
118129
robot = dimos.deploy(RobotClient)
119-
120-
map_stream = RemoteOut[Map](Map, "map")
121130
target_stream = RemoteOut[Vector](Vector, "map")
122-
odom_stream = robot.odometry
131+
123132
print("\n")
124-
print("map stream", map_stream)
133+
print("lidar stream", robot.lidar)
125134
print("target stream", target_stream)
126-
print("odom stream", odom_stream)
127-
128-
# # print(type(odom_stream.owner))
129-
# # print(type(robot))
130-
131-
# # out = Out(Odometry, "odometry", robot)
135+
print("odom stream", robot.odometry)
132136

133137
nav = dimos.deploy(
134-
Navigation, target_position=target_stream, map_stream=map_stream, odometry=odom_stream
138+
Navigation, target_position=target_stream, lidar=robot.lidar, odometry=robot.odometry
135139
)
136-
137-
# # """Test introspection of the Navigation module."""
138-
# # assert hasattr(nav, "inputs")
139-
# # assert hasattr(nav, "rpcs")
140140
print("\n\n\n" + robot.io().result(), "\n")
141141
print(nav.io().result(), "\n\n")
142142

dimos/multiprocess/actors2/test_meta.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from dimos.multiprocess.actors2.base import dimos
2222
from dimos.multiprocess.actors2.meta import ActorReference, In, Out, module, rpc
23+
from dimos.robot.unitree_webrtc.type.lidar import Lidar
2324
from dimos.robot.unitree_webrtc.type.map import Map
2425
from dimos.robot.unitree_webrtc.type.odometry import Odometry
2526
from dimos.types.path import Path
@@ -55,6 +56,7 @@ def receive_message(self, in_name, message):
5556
@module
5657
class RobotClient(Module):
5758
odometry: Out[Odometry]
59+
lidar: Out[Lidar]
5860

5961
def __init__(self):
6062
self.odometry = Out(Odometry, "odometry", self)
@@ -68,6 +70,9 @@ def start(self):
6870

6971
def odomloop(self):
7072
odomdata = SensorReplay("raw_odometry_rotate_walk", autocast=Odometry.from_msg)
73+
lidardata = SensorReplay("office_lidar", autocast=Lidar.from_msg)
74+
75+
lidariter = lidardata.iterate()
7176
self._stop_event.clear()
7277
while not self._stop_event.is_set():
7378
for odom in odomdata.iterate():
@@ -77,6 +82,7 @@ def odomloop(self):
7782
# print(odom)
7883
odom.pubtime = time.perf_counter()
7984
self.odometry.publish(odom)
85+
self.lidar.publish(next(lidariter))
8086
time.sleep(0.1)
8187

8288
def stop(self):
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Copyright 2025 Dimensional Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import pickle
16+
17+
from dimos.multiprocess.actors2.o3dpickle import register_picklers
18+
from dimos.robot.unitree_webrtc.type.lidar import LidarMessage
19+
from dimos.utils.testing import SensorReplay
20+
21+
register_picklers()
22+
23+
24+
def test_enode_decode():
25+
lidardata = SensorReplay("office_lidar", autocast=LidarMessage.from_msg)
26+
lidarmsg = next(lidardata.iterate())
27+
28+
binarypc = pickle.dumps(lidarmsg.pointcloud)
29+
30+
# Test pickling and unpickling
31+
binary = pickle.dumps(lidarmsg)
32+
lidarmsg2 = pickle.loads(binary)
33+
34+
# Verify the decoded message has the same properties
35+
assert isinstance(lidarmsg2, LidarMessage)
36+
assert len(lidarmsg2.pointcloud.points) == len(lidarmsg.pointcloud.points)

0 commit comments

Comments
 (0)