Skip to content

Commit f4dc5b2

Browse files
authored
fix: Transform obtained from GetROS2TransformTool can be old (#651)
1 parent dcd9348 commit f4dc5b2

File tree

5 files changed

+208
-8
lines changed

5 files changed

+208
-8
lines changed

examples/rosbot-xl-demo.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def initialize_agent() -> Runnable[ReActAgentState, ReActAgentState]:
5050
"examples/embodiments/rosbotxl_embodiment.json"
5151
)
5252

53-
connector = ROS2Connector(executor_type="multi_threaded")
53+
connector = ROS2Connector(executor_type="multi_threaded", use_sim_time=True)
5454
tools: List[BaseTool] = [
5555
GetROS2TransformConfiguredTool(
5656
connector=connector,

src/rai_core/rai/communication/ros2/connectors/base.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from rclpy.duration import Duration
2626
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
2727
from rclpy.node import Node
28+
from rclpy.parameter import Parameter
2829
from rclpy.qos import QoSProfile
2930
from tf2_ros import Buffer, LookupException, TransformListener, TransformStamped
3031

@@ -55,6 +56,8 @@ class ROS2BaseConnector(ROS2ActionMixin, ROS2ServiceMixin, BaseConnector[T]):
5556
Whether to destroy subscribers after receiving a message, by default False.
5657
executor_type : Literal["single_threaded", "multi_threaded"], optional
5758
Type of executor to use for processing ROS2 callbacks, by default "multi_threaded".
59+
use_sim_time : bool, optional
60+
Whether to use simulation time or system time, by default False.
5861
5962
Methods
6063
-------
@@ -100,6 +103,7 @@ def __init__(
100103
node_name: str = f"rai_ros2_connector_{str(uuid.uuid4())[-12:]}",
101104
destroy_subscribers: bool = False,
102105
executor_type: Literal["single_threaded", "multi_threaded"] = "multi_threaded",
106+
use_sim_time: bool = False,
103107
):
104108
"""Initialize the ROS2BaseConnector.
105109
@@ -127,6 +131,10 @@ def __init__(
127131
)
128132
self._executor_type = executor_type
129133
self._node = Node(node_name)
134+
if use_sim_time:
135+
self._node.set_parameters(
136+
[Parameter("use_sim_time", Parameter.Type.BOOL, True)]
137+
)
130138
self._topic_api = ROS2TopicAPI(self._node, destroy_subscribers)
131139
self._service_api = ROS2ServiceAPI(self._node)
132140
self._actions_api = ROS2ActionAPI(self._node)

src/rai_core/rai/tools/ros2/generic/topics.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import json
1616
from typing import Any, Dict, List, Literal, Tuple, Type
1717

18+
import rclpy.time
1819
import rosidl_runtime_py.set_message
1920
import rosidl_runtime_py.utilities
2021
from cv_bridge import CvBridge
@@ -243,10 +244,34 @@ class GetROS2TransformTool(BaseROS2Tool):
243244
description: str = "Get the transform between two frames"
244245
args_schema: Type[GetROS2TransformToolInput] = GetROS2TransformToolInput
245246

247+
# Staleness threshold in seconds (10 missed cycles at 10Hz = 1 second)
248+
STALE_TRANSFORM_THRESHOLD_SEC: float = 1.0
249+
246250
def _run(self, target_frame: str, source_frame: str, timeout_sec: float) -> str:
247251
transform = self.connector.get_transform(
248252
target_frame=target_frame,
249253
source_frame=source_frame,
250254
timeout_sec=timeout_sec,
251255
)
252-
return stringify_dict(ros2_message_to_dict(transform))
256+
257+
transform_time = rclpy.time.Time.from_msg(transform.header.stamp)
258+
current_time = self.connector._node.get_clock().now()
259+
260+
age_seconds = (current_time - transform_time).nanoseconds / 1e9
261+
262+
result = stringify_dict(ros2_message_to_dict(transform))
263+
264+
# Add clear warning for stale data at the beginning of response
265+
if abs(age_seconds) > self.STALE_TRANSFORM_THRESHOLD_SEC:
266+
if age_seconds == float("inf"):
267+
result = (
268+
"WARNING: Robot position data has invalid timestamp (potentially stale). The robot may have moved since then.\n\n"
269+
+ result
270+
)
271+
else:
272+
result = (
273+
f"WARNING: Robot position data is {age_seconds:.1f} seconds old (potentially stale). The robot may have moved since then.\n\n"
274+
+ result
275+
)
276+
277+
return result

tests/communication/ros2/helpers.py

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from rclpy.callback_groups import CallbackGroup, ReentrantCallbackGroup
2929
from rclpy.executors import MultiThreadedExecutor
3030
from rclpy.node import Node
31+
from rosgraph_msgs.msg import Clock
3132
from sensor_msgs.msg import Image
3233
from std_msgs.msg import String
3334
from std_srvs.srv import SetBool
@@ -232,8 +233,16 @@ def publish_message(self) -> None:
232233

233234

234235
class TransformPublisher(Node):
235-
def __init__(self, topic: str):
236+
def __init__(self, topic: str, use_sim_time: bool = False):
236237
super().__init__("test_transform_publisher")
238+
if use_sim_time:
239+
self.set_parameters(
240+
[
241+
rclpy.parameter.Parameter(
242+
"use_sim_time", rclpy.parameter.Parameter.Type.BOOL, True
243+
)
244+
]
245+
)
237246
self.tf_broadcaster = TransformBroadcaster(self)
238247
self.timer = self.create_timer(0.1, self.publish_transform)
239248
self.frame_id = "base_link"
@@ -254,6 +263,22 @@ def publish_transform(self) -> None:
254263
self.tf_broadcaster.sendTransform(msg)
255264

256265

266+
class ClockPublisher(Node):
267+
def __init__(self):
268+
super().__init__("test_clock_publisher")
269+
self.publisher = self.create_publisher(Clock, "/clock", 10)
270+
self.timer = self.create_timer(0.1, self.publish_clock)
271+
self.start_time = time.time()
272+
273+
def publish_clock(self) -> None:
274+
msg = Clock()
275+
# Publish simulation time that advances faster than real time
276+
sim_time = (time.time() - self.start_time) * 2.0 # 2x speed
277+
msg.clock.sec = int(sim_time)
278+
msg.clock.nanosec = int((sim_time % 1.0) * 1e9)
279+
self.publisher.publish(msg)
280+
281+
257282
def multi_threaded_spinner(
258283
nodes: List[Node],
259284
) -> Tuple[List[MultiThreadedExecutor], List[threading.Thread]]:
@@ -274,15 +299,37 @@ def multi_threaded_spinner(
274299
def shutdown_executors_and_threads(
275300
executors: List[MultiThreadedExecutor], threads: List[threading.Thread]
276301
) -> None:
277-
# First shutdown executors
302+
# Cancel any ongoing tasks/guard conditions
278303
for executor in executors:
279-
executor.shutdown()
280-
# Small delay to allow executors to finish pending operations
281-
time.sleep(0.5)
282-
# Then join threads with a timeout
304+
try:
305+
if hasattr(executor, "_futures"):
306+
for future in executor._futures:
307+
if not future.done():
308+
future.cancel()
309+
except Exception as e:
310+
print(f"Error canceling ongoing tasks: {e}")
311+
312+
# Join threads with a timeout
283313
for thread in threads:
284314
thread.join(timeout=2.0)
285315

316+
# Clean up any remaining nodes
317+
for executor in executors:
318+
try:
319+
for node in executor.get_nodes():
320+
node.destroy_node()
321+
except Exception as e:
322+
print(f"Error destroying node: {e}")
323+
324+
time.sleep(0.5)
325+
326+
# Shutdown executors
327+
for executor in executors:
328+
try:
329+
executor.shutdown()
330+
except Exception as e:
331+
print(f"Error shutting down executor: {e}")
332+
286333

287334
@pytest.fixture(scope="function")
288335
def ros_setup() -> Generator[None, None, None]:

tests/tools/ros2/test_topic_tools.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
)
3838

3939
from tests.communication.ros2.helpers import (
40+
ClockPublisher,
4041
ImagePublisher,
4142
MessagePublisher,
4243
MessageSubscriber,
@@ -225,3 +226,122 @@ def test_get_transform_tool(ros_setup: None, request: pytest.FixtureRequest) ->
225226
assert "rotation" in response
226227
finally:
227228
shutdown_executors_and_threads(executors, threads)
229+
230+
231+
def test_get_transform_tool_with_stale_tf_data(
232+
ros_setup: None, request: pytest.FixtureRequest
233+
) -> None:
234+
"""Test that GetROS2TransformTool returns stale transform data with warning."""
235+
topic_name = f"{request.node.originalname}_topic" # type: ignore
236+
connector = ROS2Connector()
237+
publisher = TransformPublisher(topic=topic_name)
238+
executors, threads = multi_threaded_spinner([publisher])
239+
tool = GetROS2TransformTool(connector=connector)
240+
241+
# Let the publisher run for a bit to establish transforms
242+
time.sleep(1.0)
243+
response = tool._run(
244+
source_frame="map",
245+
target_frame="base_link",
246+
timeout_sec=5.0,
247+
) # type: ignore
248+
assert "translation" in response
249+
assert "rotation" in response
250+
251+
# stop the /tf publisher
252+
publisher.timer.cancel()
253+
254+
# wait for some time to pass then get the transform again
255+
time.sleep(2.0)
256+
257+
try:
258+
response = tool._run(
259+
source_frame="map",
260+
target_frame="base_link",
261+
timeout_sec=5.0,
262+
) # type: ignore
263+
264+
response_lower = response.lower()
265+
staleness_indicators = [
266+
"stale",
267+
"old",
268+
"outdated",
269+
"warning",
270+
"invalid",
271+
]
272+
staleness_warning_present = any(
273+
indicator in response_lower for indicator in staleness_indicators
274+
)
275+
276+
# Stale data should be warned about
277+
if not staleness_warning_present:
278+
pytest.fail(
279+
f"Response: {response}. "
280+
"Expected behavior: Either include staleness warning in response or raise appropriate error."
281+
)
282+
283+
finally:
284+
# Clean shutdown
285+
shutdown_executors_and_threads(executors, threads)
286+
287+
288+
def test_get_transform_tool_with_stale_tf_data_sim_time(
289+
ros_setup: None, request: pytest.FixtureRequest
290+
) -> None:
291+
"""Test that GetROS2TransformTool returns stale transform data with warning when using sim time."""
292+
topic_name = f"{request.node.originalname}_topic" # type: ignore
293+
connector = ROS2Connector(use_sim_time=True)
294+
295+
# Create clock publisher to provide simulation time
296+
clock_publisher = ClockPublisher()
297+
publisher = TransformPublisher(topic=topic_name, use_sim_time=True)
298+
executors, threads = multi_threaded_spinner([clock_publisher, publisher])
299+
tool = GetROS2TransformTool(connector=connector)
300+
301+
# Let the publishers run for a bit to establish transforms and clock
302+
time.sleep(1.0)
303+
response = tool._run(
304+
source_frame="map",
305+
target_frame="base_link",
306+
timeout_sec=5.0,
307+
) # type: ignore
308+
assert "translation" in response
309+
assert "rotation" in response
310+
311+
# stop the /tf publisher
312+
publisher.timer.cancel()
313+
314+
# wait for some time to pass then get the transform again
315+
time.sleep(2.0)
316+
317+
try:
318+
response = tool._run(
319+
source_frame="map",
320+
target_frame="base_link",
321+
timeout_sec=5.0,
322+
) # type: ignore
323+
324+
response_lower = response.lower()
325+
staleness_indicators = [
326+
"stale",
327+
"old",
328+
"outdated",
329+
"warning",
330+
"invalid",
331+
]
332+
staleness_warning_present = any(
333+
indicator in response_lower for indicator in staleness_indicators
334+
)
335+
336+
# Stale data should be warned about
337+
if not staleness_warning_present:
338+
pytest.fail(
339+
f"Response: {response}. "
340+
"Expected behavior: Either include staleness warning in response or raise appropriate error."
341+
)
342+
343+
finally:
344+
clock_publisher.timer.cancel()
345+
# wait for the timer to be canceled
346+
time.sleep(0.1)
347+
shutdown_executors_and_threads(executors, threads)

0 commit comments

Comments
 (0)