diff --git a/tf2_ros_py/test/test_buffer.py b/tf2_ros_py/test/test_buffer.py index add27a066..14cc6364a 100644 --- a/tf2_ros_py/test/test_buffer.py +++ b/tf2_ros_py/test/test_buffer.py @@ -29,6 +29,9 @@ from geometry_msgs.msg import TransformStamped import pytest import rclpy +import rclpy.executors +import rclpy.task +import tf2_ros as tf2 from tf2_ros.buffer import Buffer @@ -130,6 +133,114 @@ def test_await_transform_full_delayed(self): assert transform == excinfo.value.value coro.close() + def await_transform_timeout_template(self, transform_coroutine): + # wait for timeout in an async call + + # We need a node environment to have an event loop with timers running. + context = rclpy.context.Context() + rclpy.init(context=context) + executor = rclpy.executors.SingleThreadedExecutor(context=context) + node = rclpy.create_node('test_buffer', context=context) + buffer = Buffer(node=node) + current_time = node.get_clock().now() + stop_fut = rclpy.task.Future() + + async def async_environment(): + try: + # This allows us to test both the await_transform and await_transform_full + await transform_coroutine(buffer, 'foo', 'bar', current_time) + + # Indicate that we had a success + stop_fut.set_result(True) + except tf2.LookupException: + # Indicate that we had a timeout + stop_fut.set_result(False) + + # This is a workaround to run the wait in the event loop + gc = node.create_guard_condition(async_environment) + + # Trigger the guard condition to start the async call + gc.trigger() + + # Runs the event loop until the future is done + # or times out (not the timeout that we want to test) + rclpy.spin_until_future_complete(node, stop_fut, executor, timeout_sec=0.2) + + # Check if we actually timed out + assert stop_fut.done() and not stop_fut.result() + + # Add a transform to the buffer to ensure that the buffer is still functional + transform = self.build_transform('foo', 'bar', current_time) + buffer.set_transform(transform, 'unittest') + + # Refresh the future to check if we can still use the buffer + stop_fut = rclpy.task.Future() + + # Trigger the guard condition again to ensure that the buffer can still be used + gc.trigger() + + # Run the event loop again + rclpy.spin_until_future_complete(node, stop_fut, executor, timeout_sec=0.2) + + # Check if we can still get the transform after the timeout + assert stop_fut.done() and stop_fut.result() + + def test_await_transform_timeout(self): + # wait for timeout in an async call + + def transform_coroutine(buffer, target, source, rclpy_time): + return buffer.wait_for_transform_async( + target, + source, + rclpy_time, + timeout=rclpy.duration.Duration(seconds=0.1) + ) + + self.await_transform_timeout_template(transform_coroutine) + + def test_await_transform_full_timeout(self): + # wait for timeout in an async call + + def transform_coroutine(buffer, target, source, rclpy_time): + return buffer.wait_for_transform_full_async( + target, + rclpy_time, + source, + rclpy_time, + target, + timeout=rclpy.duration.Duration(seconds=0.1) + ) + + self.await_transform_timeout_template(transform_coroutine) + + def test_async_lookup_transform_timeout(self): + # wait for timeout in an async call + + def transform_coroutine(buffer, target, source, rclpy_time): + return buffer.lookup_transform_async( + target, + source, + rclpy_time, + timeout=rclpy.duration.Duration(seconds=0.1) + ) + + self.await_transform_timeout_template(transform_coroutine) + + def test_async_lookup_transform_full_timeout(self): + # wait for timeout in an async call + + def transform_coroutine(buffer, target, source, rclpy_time): + return buffer.lookup_transform_full_async( + target, + rclpy_time, + source, + rclpy_time, + target, + timeout=rclpy.duration.Duration(seconds=0.1) + ) + + self.await_transform_timeout_template(transform_coroutine) + def test_buffer_non_default_cache(self): buffer = Buffer(cache_time=rclpy.duration.Duration(seconds=10.0)) clock = rclpy.clock.Clock() diff --git a/tf2_ros_py/tf2_ros/buffer.py b/tf2_ros_py/tf2_ros/buffer.py index 52fcf07a6..9c0730d32 100644 --- a/tf2_ros_py/tf2_ros/buffer.py +++ b/tf2_ros_py/tf2_ros/buffer.py @@ -37,6 +37,7 @@ from geometry_msgs.msg import TransformStamped import rclpy +from rclpy.callback_groups import MutuallyExclusiveCallbackGroup from rclpy.clock import JumpThreshold, TimeJump from rclpy.duration import Duration from rclpy.node import Node @@ -89,6 +90,8 @@ def __init__( else: self.clock = rclpy.clock.Clock() + self.node = node + # create a jump callback to clear the buffer if use_sim_true is true and there is a # jump in time threshold = JumpThreshold(min_forward=None, @@ -154,7 +157,8 @@ async def lookup_transform_async( self, target_frame: str, source_frame: str, - time: Time + time: Time, + timeout: Duration | None = None ) -> TransformStamped: """ Get the transform from the source frame to the target frame asyncronously. @@ -162,10 +166,11 @@ async def lookup_transform_async( :param target_frame: Name of the frame to transform into. :param source_frame: Name of the input frame. :param time: The time at which to get the transform (0 will get the latest). + :param timeout: Time to wait for the target frame to become available. + Needs a Node instance to be set on the Buffer to work. :return: The transform between the frames. """ - await self.wait_for_transform_async(target_frame, source_frame, time) - return self.lookup_transform_core(target_frame, source_frame, time) + return await self.wait_for_transform_async(target_frame, source_frame, time, timeout) def lookup_transform_full( self, @@ -198,7 +203,8 @@ async def lookup_transform_full_async( target_time: Time, source_frame: str, source_time: Time, - fixed_frame: str + fixed_frame: str, + timeout: Duration | None = None ) -> TransformStamped: """ Get transform from source frame to target frame using the advanced API asyncronously. @@ -208,12 +214,12 @@ async def lookup_transform_full_async( :param source_frame: Name of the input frame. :param source_time: The time at which source_frame will be evaluated (0 gets the latest). :param fixed_frame: Name of the frame to consider constant in time. + :param timeout: Time to wait for the target frame to become available. + Needs a Node instance to be set on the Buffer to work. :return: The transform between the frames. """ - await self.wait_for_transform_full_async( - target_frame, target_time, source_frame, source_time, fixed_frame) - return self.lookup_transform_full_core( - target_frame, target_time, source_frame, source_time, fixed_frame) + return await self.wait_for_transform_full_async( + target_frame, target_time, source_frame, source_time, fixed_frame, timeout) def can_transform( self, @@ -298,7 +304,8 @@ def wait_for_transform_async( self, target_frame: str, source_frame: str, - time: Time + time: Time, + timeout: Duration | None = None ) -> Future: """ Wait for a transform from the source frame to the target frame to become possible. @@ -306,7 +313,9 @@ def wait_for_transform_async( :param target_frame: Name of the frame to transform into. :param source_frame: Name of the input frame. :param time: The time at which to get the transform (0 will get the latest). - :return: A future that becomes true when the transform is available. + :param timeout: Time to wait for the target frame to become available. + Needs a Node instance to be set on the Buffer to work. + :return: A future that contains the transform when it becomes available. """ fut = rclpy.task.Future() if self.can_transform_core(target_frame, source_frame, time)[0]: @@ -314,6 +323,34 @@ def wait_for_transform_async( fut.set_result(self.lookup_transform(target_frame, source_frame, time)) return fut + timeout_timer = None + + def _on_timeout(): + # Deactivate the callback to avoid memory leaks + # Cancel the timer after it has triggered once + timeout_timer.cancel() + # This is needed, as this function otherwise leaks resources. + # timer.destroy() is not sufficient! + self.node.destroy_timer(timeout_timer) + + # The transform could be available now, so we check if the future is still pending + if not fut.done(): + fut.set_exception( + tf2.LookupException( + f"Failed to find transform from '{source_frame}' to '{target_frame}'" + ) + ) + + # Check if a timeout is specified + if timeout is not None: + if self.node is None: + raise RuntimeError('Async timeouts require a Node instance to be set.') + + timeout_timer = self.node.create_timer( + timeout.nanoseconds / 1e9, _on_timeout, clock=self.node.get_clock(), + callback_group=MutuallyExclusiveCallbackGroup() + ) + def _on_new_data(): try: if self.can_transform_core(target_frame, source_frame, time)[0]: @@ -332,7 +369,8 @@ def wait_for_transform_full_async( target_time: Time, source_frame: str, source_time: Time, - fixed_frame: str + fixed_frame: str, + timeout: Duration | None = None ) -> Future: """ Wait for a transform from the source frame to the target frame to become possible. @@ -342,6 +380,8 @@ def wait_for_transform_full_async( :param source_frame: Name of the input frame. :param source_time: The time at which source_frame will be evaluated (0 gets the latest). :param fixed_frame: Name of the frame to consider constant in time. + :param timeout: Time to wait for the target frame to become available. + Needs a Node instance to be set on the Buffer to work. :return: A future that becomes true when the transform is available. """ fut = rclpy.task.Future() @@ -353,6 +393,34 @@ def wait_for_transform_full_async( target_frame, target_time, source_frame, source_time, fixed_frame)) return fut + timeout_timer = None + + def _on_timeout(): + # Deactivate the callback to avoid memory leaks + # Cancel the timer after it has triggered once + timeout_timer.cancel() + # This is needed, as this function otherwise leaks resources. + # timer.destroy() is not sufficient! + self.node.destroy_timer(timeout_timer) + + # The transform could be available now, so we check if the future is still pending + if not fut.done(): + fut.set_exception( + tf2.LookupException( + f"Failed to find transform from '{source_frame}' to '{target_frame}'" + ) + ) + + # Check if a timeout is specified + if timeout is not None: + if self.node is None: + raise RuntimeError('Async timeouts require a Node instance to be set.') + + timeout_timer = self.node.create_timer( + timeout.nanoseconds / 1e9, _on_timeout, clock=self.node.get_clock(), + callback_group=MutuallyExclusiveCallbackGroup() + ) + def _on_new_data(): try: if self.can_transform_full_core(