Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions tf2_ros_py/test/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
from geometry_msgs.msg import TransformStamped
import pytest
import rclpy
import rclpy.executors
import rclpy.task
import tf2_ros as tf2
from tf2_ros.buffer import Buffer


Expand Down Expand Up @@ -130,6 +133,114 @@ def test_await_transform_full_delayed(self):
assert transform == excinfo.value.value
coro.close()

def await_transform_timeout_template(self, transform_coroutine):
# wait for timeout in an async call

# We need a node environment to have an event loop with timers running.
context = rclpy.context.Context()
rclpy.init(context=context)
executor = rclpy.executors.SingleThreadedExecutor(context=context)
node = rclpy.create_node('test_buffer', context=context)
buffer = Buffer(node=node)
current_time = node.get_clock().now()
stop_fut = rclpy.task.Future()

async def async_environment():
try:
# This allows us to test both the await_transform and await_transform_full
await transform_coroutine(buffer, 'foo', 'bar', current_time)

# Indicate that we had a success
stop_fut.set_result(True)
except tf2.LookupException:
# Indicate that we had a timeout
stop_fut.set_result(False)

# This is a workaround to run the wait in the event loop
gc = node.create_guard_condition(async_environment)

# Trigger the guard condition to start the async call
gc.trigger()

# Runs the event loop until the future is done
# or times out (not the timeout that we want to test)
rclpy.spin_until_future_complete(node, stop_fut, executor, timeout_sec=0.2)

# Check if we actually timed out
assert stop_fut.done() and not stop_fut.result()

# Add a transform to the buffer to ensure that the buffer is still functional
transform = self.build_transform('foo', 'bar', current_time)
buffer.set_transform(transform, 'unittest')

# Refresh the future to check if we can still use the buffer
stop_fut = rclpy.task.Future()

# Trigger the guard condition again to ensure that the buffer can still be used
gc.trigger()

# Run the event loop again
rclpy.spin_until_future_complete(node, stop_fut, executor, timeout_sec=0.2)

# Check if we can still get the transform after the timeout
assert stop_fut.done() and stop_fut.result()

def test_await_transform_timeout(self):
# wait for timeout in an async call

def transform_coroutine(buffer, target, source, rclpy_time):
return buffer.wait_for_transform_async(
target,
source,
rclpy_time,
timeout=rclpy.duration.Duration(seconds=0.1)
)

self.await_transform_timeout_template(transform_coroutine)

def test_await_transform_full_timeout(self):
# wait for timeout in an async call

def transform_coroutine(buffer, target, source, rclpy_time):
return buffer.wait_for_transform_full_async(
target,
rclpy_time,
source,
rclpy_time,
target,
timeout=rclpy.duration.Duration(seconds=0.1)
)

self.await_transform_timeout_template(transform_coroutine)

def test_async_lookup_transform_timeout(self):
# wait for timeout in an async call

def transform_coroutine(buffer, target, source, rclpy_time):
return buffer.lookup_transform_async(
target,
source,
rclpy_time,
timeout=rclpy.duration.Duration(seconds=0.1)
)

self.await_transform_timeout_template(transform_coroutine)

def test_async_lookup_transform_full_timeout(self):
# wait for timeout in an async call

def transform_coroutine(buffer, target, source, rclpy_time):
return buffer.lookup_transform_full_async(
target,
rclpy_time,
source,
rclpy_time,
target,
timeout=rclpy.duration.Duration(seconds=0.1)
)

self.await_transform_timeout_template(transform_coroutine)

def test_buffer_non_default_cache(self):
buffer = Buffer(cache_time=rclpy.duration.Duration(seconds=10.0))
clock = rclpy.clock.Clock()
Expand Down
90 changes: 79 additions & 11 deletions tf2_ros_py/tf2_ros/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

from geometry_msgs.msg import TransformStamped
import rclpy
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
from rclpy.clock import JumpThreshold, TimeJump
from rclpy.duration import Duration
from rclpy.node import Node
Expand Down Expand Up @@ -89,6 +90,8 @@ def __init__(
else:
self.clock = rclpy.clock.Clock()

self.node = node

# create a jump callback to clear the buffer if use_sim_true is true and there is a
# jump in time
threshold = JumpThreshold(min_forward=None,
Expand Down Expand Up @@ -154,18 +157,20 @@ async def lookup_transform_async(
self,
target_frame: str,
source_frame: str,
time: Time
time: Time,
timeout: Duration | None = None
) -> TransformStamped:
"""
Get the transform from the source frame to the target frame asyncronously.

:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
:param time: The time at which to get the transform (0 will get the latest).
:param timeout: Time to wait for the target frame to become available.
Needs a Node instance to be set on the Buffer to work.
:return: The transform between the frames.
"""
await self.wait_for_transform_async(target_frame, source_frame, time)
return self.lookup_transform_core(target_frame, source_frame, time)
return await self.wait_for_transform_async(target_frame, source_frame, time, timeout)

def lookup_transform_full(
self,
Expand Down Expand Up @@ -198,7 +203,8 @@ async def lookup_transform_full_async(
target_time: Time,
source_frame: str,
source_time: Time,
fixed_frame: str
fixed_frame: str,
timeout: Duration | None = None
) -> TransformStamped:
"""
Get transform from source frame to target frame using the advanced API asyncronously.
Expand All @@ -208,12 +214,12 @@ async def lookup_transform_full_async(
:param source_frame: Name of the input frame.
:param source_time: The time at which source_frame will be evaluated (0 gets the latest).
:param fixed_frame: Name of the frame to consider constant in time.
:param timeout: Time to wait for the target frame to become available.
Needs a Node instance to be set on the Buffer to work.
:return: The transform between the frames.
"""
await self.wait_for_transform_full_async(
target_frame, target_time, source_frame, source_time, fixed_frame)
return self.lookup_transform_full_core(
target_frame, target_time, source_frame, source_time, fixed_frame)
return await self.wait_for_transform_full_async(
target_frame, target_time, source_frame, source_time, fixed_frame, timeout)

def can_transform(
self,
Expand Down Expand Up @@ -298,22 +304,53 @@ def wait_for_transform_async(
self,
target_frame: str,
source_frame: str,
time: Time
time: Time,
timeout: Duration | None = None
) -> Future:
"""
Wait for a transform from the source frame to the target frame to become possible.

:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
:param time: The time at which to get the transform (0 will get the latest).
:return: A future that becomes true when the transform is available.
:param timeout: Time to wait for the target frame to become available.
Needs a Node instance to be set on the Buffer to work.
:return: A future that contains the transform when it becomes available.
"""
fut = rclpy.task.Future()
if self.can_transform_core(target_frame, source_frame, time)[0]:
# Short cut, the transform is available
fut.set_result(self.lookup_transform(target_frame, source_frame, time))
return fut

timeout_timer = None

def _on_timeout():
# Deactivate the callback to avoid memory leaks
# Cancel the timer after it has triggered once
timeout_timer.cancel()
# This is needed, as this function otherwise leaks resources.
# timer.destroy() is not sufficient!
self.node.destroy_timer(timeout_timer)

# The transform could be available now, so we check if the future is still pending
if not fut.done():
fut.set_exception(
tf2.LookupException(
f"Failed to find transform from '{source_frame}' to '{target_frame}'"
)
)

# Check if a timeout is specified
if timeout is not None:
if self.node is None:
raise RuntimeError('Async timeouts require a Node instance to be set.')

timeout_timer = self.node.create_timer(
timeout.nanoseconds / 1e9, _on_timeout, clock=self.node.get_clock(),
callback_group=MutuallyExclusiveCallbackGroup()
)

def _on_new_data():
try:
if self.can_transform_core(target_frame, source_frame, time)[0]:
Expand All @@ -332,7 +369,8 @@ def wait_for_transform_full_async(
target_time: Time,
source_frame: str,
source_time: Time,
fixed_frame: str
fixed_frame: str,
timeout: Duration | None = None
) -> Future:
"""
Wait for a transform from the source frame to the target frame to become possible.
Expand All @@ -342,6 +380,8 @@ def wait_for_transform_full_async(
:param source_frame: Name of the input frame.
:param source_time: The time at which source_frame will be evaluated (0 gets the latest).
:param fixed_frame: Name of the frame to consider constant in time.
:param timeout: Time to wait for the target frame to become available.
Needs a Node instance to be set on the Buffer to work.
:return: A future that becomes true when the transform is available.
"""
fut = rclpy.task.Future()
Expand All @@ -353,6 +393,34 @@ def wait_for_transform_full_async(
target_frame, target_time, source_frame, source_time, fixed_frame))
return fut

timeout_timer = None

def _on_timeout():
# Deactivate the callback to avoid memory leaks
# Cancel the timer after it has triggered once
timeout_timer.cancel()
# This is needed, as this function otherwise leaks resources.
# timer.destroy() is not sufficient!
self.node.destroy_timer(timeout_timer)

# The transform could be available now, so we check if the future is still pending
if not fut.done():
fut.set_exception(
tf2.LookupException(
f"Failed to find transform from '{source_frame}' to '{target_frame}'"
)
)

# Check if a timeout is specified
if timeout is not None:
if self.node is None:
raise RuntimeError('Async timeouts require a Node instance to be set.')

timeout_timer = self.node.create_timer(
timeout.nanoseconds / 1e9, _on_timeout, clock=self.node.get_clock(),
callback_group=MutuallyExclusiveCallbackGroup()
)

def _on_new_data():
try:
if self.can_transform_full_core(
Expand Down