Skip to content

Commit e94574e

Browse files
committed
Feat: lock in control loop - monitor-style locking
1 parent 3401d11 commit e94574e

File tree

3 files changed

+91
-58
lines changed

3 files changed

+91
-58
lines changed

dimos/teleop/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ Adds Rerun visualization for debugging. Extends ArmTeleopModule (toggle engage).
4646
| `_publish_msg()` | Change output format |
4747
| `_publish_button_state()` | Change button output |
4848

49+
### Rules for subclasses
50+
51+
- **Do not acquire `self._lock` in overrides.** The control loop already holds it.
52+
Access `self._controllers`, `self._current_poses`, `self._is_engaged`, etc. directly.
53+
- **Keep overrides fast** — they run inside the control loop at `control_loop_hz`.
54+
4955
## File Structure
5056

5157
```

dimos/teleop/quest/quest_extensions.py

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,37 +21,54 @@
2121
- VisualizingTeleopModule: Adds Rerun visualization (uses toggle engage)
2222
"""
2323

24+
from dataclasses import dataclass
2425
from typing import Any
2526

2627
from dimos.core import Out, rpc
2728
from dimos.msgs.geometry_msgs import PoseStamped, TwistStamped
28-
from dimos.teleop.quest.quest_teleop_module import Hand, QuestTeleopModule
29+
from dimos.teleop.quest.quest_teleop_module import Hand, QuestTeleopConfig, QuestTeleopModule
2930
from dimos.teleop.utils.teleop_visualization import (
3031
init_rerun_visualization,
3132
visualize_buttons,
3233
visualize_pose,
3334
)
3435

3536

37+
@dataclass
38+
class TwistTeleopConfig(QuestTeleopConfig):
39+
"""Configuration for TwistTeleopModule."""
40+
41+
linear_scale: float = 1.0
42+
angular_scale: float = 1.0
43+
44+
3645
# Example implementation to show how to extend QuestTeleopModule for different teleop behaviors and outputs.
3746
class TwistTeleopModule(QuestTeleopModule):
3847
"""Quest teleop that outputs TwistStamped instead of PoseStamped.
48+
49+
Config:
50+
- linear_scale: Scale factor for linear (position) values. Default 1.0.
51+
- angular_scale: Scale factor for angular (orientation) values. Default 1.0.
52+
3953
Outputs:
4054
- left_twist: TwistStamped (linear + angular velocity)
4155
- right_twist: TwistStamped (linear + angular velocity)
4256
- buttons: QuestButtons (inherited)
4357
"""
4458

59+
default_config = TwistTeleopConfig
60+
4561
left_twist: Out[TwistStamped]
4662
right_twist: Out[TwistStamped]
4763

4864
def _publish_msg(self, hand: Hand, output_msg: PoseStamped) -> None:
49-
"""Convert PoseStamped to TwistStamped and publish."""
65+
"""Convert PoseStamped to TwistStamped, apply scaling, and publish."""
66+
cfg: TwistTeleopConfig = self.config # type: ignore[assignment]
5067
twist = TwistStamped(
5168
ts=output_msg.ts,
5269
frame_id=output_msg.frame_id,
53-
linear=output_msg.position,
54-
angular=output_msg.orientation.to_euler(),
70+
linear=output_msg.position * cfg.linear_scale,
71+
angular=output_msg.orientation.to_euler() * cfg.angular_scale,
5572
)
5673
if hand == Hand.LEFT:
5774
self.left_twist.publish(twist)
@@ -77,19 +94,18 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
7794

7895
def _handle_engage(self) -> None:
7996
"""Toggle per-hand engage on primary button rising edge."""
80-
with self._lock:
81-
for hand in Hand:
82-
controller = self._controllers.get(hand)
83-
if controller is None:
84-
continue
97+
for hand in Hand:
98+
controller = self._controllers.get(hand)
99+
if controller is None:
100+
continue
85101

86-
pressed = controller.primary
87-
if pressed and not self._prev_primary[hand]:
88-
if self._is_engaged[hand]:
89-
self.disengage(hand)
90-
else:
91-
self.engage(hand)
92-
self._prev_primary[hand] = pressed
102+
pressed = controller.primary
103+
if pressed and not self._prev_primary[hand]:
104+
if self._is_engaged[hand]:
105+
self.disengage(hand)
106+
else:
107+
self.engage(hand)
108+
self._prev_primary[hand] = pressed
93109

94110

95111
class VisualizingTeleopModule(ArmTeleopModule):
@@ -115,9 +131,8 @@ def _get_output_pose(self, hand: Hand) -> PoseStamped | None:
115131
output_pose = super()._get_output_pose(hand)
116132

117133
if output_pose is not None:
118-
with self._lock:
119-
current_pose = self._current_poses.get(hand)
120-
controller = self._controllers.get(hand)
134+
current_pose = self._current_poses.get(hand)
135+
controller = self._controllers.get(hand)
121136
if current_pose is not None:
122137
label = "left" if hand == Hand.LEFT else "right"
123138
visualize_pose(current_pose, label)

dimos/teleop/quest/quest_teleop_module.py

Lines changed: 51 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -124,15 +124,22 @@ def start(self) -> None:
124124
"""Start the Quest teleoperation module."""
125125
super().start()
126126

127-
subscriptions = [
128-
(self.vr_left_pose, lambda msg: self._on_pose_cb(Hand.LEFT, msg)),
129-
(self.vr_right_pose, lambda msg: self._on_pose_cb(Hand.RIGHT, msg)),
130-
(self.vr_left_joy, lambda msg: self._on_joy_cb(Hand.LEFT, msg)),
131-
(self.vr_right_joy, lambda msg: self._on_joy_cb(Hand.RIGHT, msg)),
132-
]
133-
for stream, handler in subscriptions:
127+
input_streams = {
128+
"vr_left_pose": (self.vr_left_pose, lambda msg: self._on_pose(Hand.LEFT, msg)),
129+
"vr_right_pose": (self.vr_right_pose, lambda msg: self._on_pose(Hand.RIGHT, msg)),
130+
"vr_left_joy": (self.vr_left_joy, lambda msg: self._on_joy(Hand.LEFT, msg)),
131+
"vr_right_joy": (self.vr_right_joy, lambda msg: self._on_joy(Hand.RIGHT, msg)),
132+
}
133+
connected = []
134+
for name, (stream, handler) in input_streams.items():
134135
if stream and stream.transport: # type: ignore[attr-defined]
135136
self._disposables.add(Disposable(stream.subscribe(handler))) # type: ignore[attr-defined]
137+
connected.append(name)
138+
139+
if connected:
140+
logger.info(f"Subscribed to: {', '.join(connected)}")
141+
else:
142+
logger.warning("No input streams connected — module will not receive controller data")
136143

137144
self._start_control_loop()
138145
logger.info("Quest Teleoperation Module started")
@@ -190,17 +197,17 @@ def get_status(self) -> QuestTeleopStatus:
190197
# Callbacks and Control Loop
191198
# -------------------------------------------------------------------------
192199

193-
def _on_pose_cb(self, hand: Hand, pose_stamped: PoseStamped) -> None:
200+
def _on_pose(self, hand: Hand, pose_stamped: PoseStamped) -> None:
194201
"""Callback for controller pose, converting WebXR to robot frame."""
195202
is_left = hand == Hand.LEFT
196203
robot_pose_stamped = webxr_to_robot(pose_stamped, is_left_controller=is_left)
197204
with self._lock:
198205
self._current_poses[hand] = robot_pose_stamped
199206

200-
def _on_joy_cb(self, hand: Hand, msg: Joy) -> None:
207+
def _on_joy(self, hand: Hand, joy: Joy) -> None:
201208
"""Callback for Joy message, parsing into QuestControllerState."""
202209
is_left = hand == Hand.LEFT
203-
controller = QuestControllerState.from_joy(msg, is_left=is_left)
210+
controller = QuestControllerState.from_joy(joy, is_left=is_left)
204211
with self._lock:
205212
self._controllers[hand] = controller
206213

@@ -227,27 +234,32 @@ def _stop_control_loop(self) -> None:
227234
logger.info("Control loop stopped")
228235

229236
def _control_loop(self) -> None:
230-
"""Main control loop: compute deltas and publish at fixed rate."""
237+
"""Main control loop: compute deltas and publish at fixed rate.
238+
239+
Holds self._lock for the entire iteration so overridable methods
240+
don't need to acquire it themselves. Callbacks and @rpc methods
241+
still lock independently since they run on other threads.
242+
"""
231243
period = 1.0 / self.config.control_loop_hz
232244

233245
while self._control_loop_running:
234246
loop_start = time.perf_counter()
235247
try:
236-
self._handle_engage()
248+
with self._lock:
249+
self._handle_engage()
237250

238-
for hand in Hand:
239-
if not self._should_publish(hand):
240-
continue
241-
output_pose = self._get_output_pose(hand)
242-
if output_pose is not None:
243-
self._publish_msg(hand, output_pose)
251+
for hand in Hand:
252+
if not self._should_publish(hand):
253+
continue
254+
output_pose = self._get_output_pose(hand)
255+
if output_pose is not None:
256+
self._publish_msg(hand, output_pose)
244257

245-
# Always publish buttons regardless of engage state,
246-
# so UI/listeners can react to button presses (e.g., trigger engage).
247-
with self._lock:
258+
# Always publish buttons regardless of engage state,
259+
# so UI/listeners can react to button presses (e.g., trigger engage).
248260
left = self._controllers.get(Hand.LEFT)
249261
right = self._controllers.get(Hand.RIGHT)
250-
self._publish_button_state(left, right)
262+
self._publish_button_state(left, right)
251263
except Exception:
252264
logger.exception("Error in teleop control loop")
253265

@@ -257,7 +269,10 @@ def _control_loop(self) -> None:
257269
time.sleep(sleep_time)
258270

259271
# -------------------------------------------------------------------------
260-
# Overridable Methods (for subclasses)
272+
# Control Loop Internals
273+
#
274+
# Called with self._lock held by the control loop.
275+
# Do NOT acquire self._lock in overrides.
261276
# -------------------------------------------------------------------------
262277

263278
def _handle_engage(self) -> None:
@@ -266,26 +281,24 @@ def _handle_engage(self) -> None:
266281
Override to customize which button/action triggers engage.
267282
Default: Each controller's primary button (X/A) hold engages that hand.
268283
"""
269-
with self._lock:
270-
for hand in Hand:
271-
controller = self._controllers.get(hand)
272-
if controller is None:
273-
continue
274-
if controller.primary:
275-
if not self._is_engaged[hand]:
276-
self.engage(hand)
277-
else:
278-
if self._is_engaged[hand]:
279-
self.disengage(hand)
284+
for hand in Hand:
285+
controller = self._controllers.get(hand)
286+
if controller is None:
287+
continue
288+
if controller.primary:
289+
if not self._is_engaged[hand]:
290+
self.engage(hand)
291+
else:
292+
if self._is_engaged[hand]:
293+
self.disengage(hand)
280294

281295
def _should_publish(self, hand: Hand) -> bool:
282296
"""Check if we should publish commands for a hand.
283297
284298
Override to add custom conditions.
285299
Default: Returns True if the hand is engaged.
286300
"""
287-
with self._lock:
288-
return self._is_engaged[hand]
301+
return self._is_engaged[hand]
289302

290303
def _get_output_pose(self, hand: Hand) -> PoseStamped | None:
291304
"""Get the pose to publish for a controller.
@@ -294,9 +307,8 @@ def _get_output_pose(self, hand: Hand) -> PoseStamped | None:
294307
apply scaling, add filtering).
295308
Default: Computes delta from initial pose.
296309
"""
297-
with self._lock:
298-
current_pose = self._current_poses.get(hand)
299-
initial_pose = self._initial_poses.get(hand)
310+
current_pose = self._current_poses.get(hand)
311+
initial_pose = self._initial_poses.get(hand)
300312

301313
if current_pose is None or initial_pose is None:
302314
return None

0 commit comments

Comments
 (0)