Skip to content

Commit c4b1d68

Browse files
committed
Add timeouts to async python buffer interfaces
Signed-off-by: Florian Vahl <[email protected]>
1 parent 2c15d17 commit c4b1d68

File tree

2 files changed

+164
-5
lines changed

2 files changed

+164
-5
lines changed

tf2_ros_py/test/test_buffer.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
from geometry_msgs.msg import TransformStamped
3030
import pytest
3131
import rclpy
32+
import rclpy.executors
33+
import rclpy.task
34+
import tf2_ros as tf2
3235
from tf2_ros.buffer import Buffer
3336

3437

@@ -130,6 +133,91 @@ def test_await_transform_full_delayed(self):
130133
assert transform == excinfo.value.value
131134
coro.close()
132135

136+
137+
def await_transform_timeout_template(self, transform_coroutine):
138+
# wait for timeout in an async call
139+
140+
# We need a node environment to have an event loop with timers running.
141+
context = rclpy.context.Context()
142+
rclpy.init(context=context)
143+
executor = rclpy.executors.SingleThreadedExecutor(context=context)
144+
node = rclpy.create_node('test_buffer', context=context)
145+
buffer = Buffer(node=node)
146+
current_time = node.get_clock().now()
147+
stop_fut = rclpy.task.Future()
148+
149+
async def async_environment():
150+
try:
151+
# This allows us to test both the await_transform and await_transform_full
152+
await transform_coroutine(buffer, 'foo', 'bar', current_time)
153+
154+
# Indicate that we had a success
155+
stop_fut.set_result(True)
156+
except tf2.LookupException as e:
157+
# Indicate that we had a timeout
158+
stop_fut.set_result(False)
159+
160+
# This is a workaround to run the wait in the event loop
161+
gc = node.create_guard_condition(async_environment)
162+
163+
# Trigger the guard condition to start the async call
164+
gc.trigger()
165+
166+
# Runs the event loop until the future is done or times out (not the timeout that we want to test)
167+
rclpy.spin_until_future_complete(node, stop_fut, executor, timeout_sec=0.2)
168+
169+
# Check if we actually timed out
170+
assert stop_fut.done() and not stop_fut.result()
171+
172+
# Add a transform to the buffer to ensure that the buffer is still functional
173+
transform = self.build_transform('foo', 'bar', current_time)
174+
buffer.set_transform(transform, 'unittest')
175+
176+
# Refresh the future to check if we can still use the buffer
177+
stop_fut = rclpy.task.Future()
178+
179+
# Trigger the guard condition again to ensure that the buffer can still be used
180+
gc.trigger()
181+
182+
# Run the event loop again
183+
rclpy.spin_until_future_complete(node, stop_fut, executor, timeout_sec=0.2)
184+
185+
# Check if we can still get the transform after the timeout
186+
assert stop_fut.done() and stop_fut.result()
187+
188+
189+
def test_await_transform_timeout(self):
190+
# wait for timeout in an async call
191+
192+
def transform_coroutine(buffer, target, source, rclpy_time):
193+
return buffer.wait_for_transform_async(target, source, rclpy_time, timeout=rclpy.duration.Duration(seconds=0.1))
194+
195+
self.await_transform_timeout_template(transform_coroutine)
196+
197+
def test_await_transform_full_timeout(self):
198+
# wait for timeout in an async call
199+
200+
def transform_coroutine(buffer, target, source, rclpy_time):
201+
return buffer.wait_for_transform_full_async(target, rclpy_time, source, rclpy_time, target, timeout=rclpy.duration.Duration(seconds=0.1))
202+
203+
self.await_transform_timeout_template(transform_coroutine)
204+
205+
def test_async_lookup_transform_timeout(self):
206+
# wait for timeout in an async call
207+
208+
def transform_coroutine(buffer, target, source, rclpy_time):
209+
return buffer.lookup_transform_async(target, source, rclpy_time, timeout=rclpy.duration.Duration(seconds=0.1))
210+
211+
self.await_transform_timeout_template(transform_coroutine)
212+
213+
def test_async_lookup_transform_full_timeout(self):
214+
# wait for timeout in an async call
215+
216+
def transform_coroutine(buffer, target, source, rclpy_time):
217+
return buffer.lookup_transform_full_async(target, rclpy_time, source, rclpy_time, target, timeout=rclpy.duration.Duration(seconds=0.1))
218+
219+
self.await_transform_timeout_template(transform_coroutine)
220+
133221
def test_buffer_non_default_cache(self):
134222
buffer = Buffer(cache_time=rclpy.duration.Duration(seconds=10.0))
135223
clock = rclpy.clock.Clock()

tf2_ros_py/tf2_ros/buffer.py

Lines changed: 76 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737

3838
from geometry_msgs.msg import TransformStamped
3939
import rclpy
40+
from rclpy.callback_groups import MutuallyExclusiveCallbackGroup
4041
from rclpy.clock import JumpThreshold, TimeJump
4142
from rclpy.duration import Duration
4243
from rclpy.node import Node
@@ -89,6 +90,8 @@ def __init__(
8990
else:
9091
self.clock = rclpy.clock.Clock()
9192

93+
self.node = node
94+
9295
# create a jump callback to clear the buffer if use_sim_true is true and there is a
9396
# jump in time
9497
threshold = JumpThreshold(min_forward=None,
@@ -154,14 +157,17 @@ async def lookup_transform_async(
154157
self,
155158
target_frame: str,
156159
source_frame: str,
157-
time: Time
160+
time: Time,
161+
timeout: Duration | None = None
158162
) -> TransformStamped:
159163
"""
160164
Get the transform from the source frame to the target frame asyncronously.
161165
162166
:param target_frame: Name of the frame to transform into.
163167
:param source_frame: Name of the input frame.
164168
:param time: The time at which to get the transform (0 will get the latest).
169+
:param timeout: Time to wait for the target frame to become available. Needs a Node instance
170+
to be set on the Buffer to work.
165171
:return: The transform between the frames.
166172
"""
167173
return await self.wait_for_transform_async(target_frame, source_frame, time, timeout)
@@ -197,7 +203,8 @@ async def lookup_transform_full_async(
197203
target_time: Time,
198204
source_frame: str,
199205
source_time: Time,
200-
fixed_frame: str
206+
fixed_frame: str,
207+
timeout: Duration | None = None
201208
) -> TransformStamped:
202209
"""
203210
Get transform from source frame to target frame using the advanced API asyncronously.
@@ -207,6 +214,8 @@ async def lookup_transform_full_async(
207214
:param source_frame: Name of the input frame.
208215
:param source_time: The time at which source_frame will be evaluated (0 gets the latest).
209216
:param fixed_frame: Name of the frame to consider constant in time.
217+
:param timeout: Time to wait for the target frame to become available. Needs a Node instance
218+
to be set on the Buffer to work.
210219
:return: The transform between the frames.
211220
"""
212221
return await self.wait_for_transform_full_async(
@@ -295,22 +304,53 @@ def wait_for_transform_async(
295304
self,
296305
target_frame: str,
297306
source_frame: str,
298-
time: Time
307+
time: Time,
308+
timeout: Duration | None = None
299309
) -> Future:
300310
"""
301311
Wait for a transform from the source frame to the target frame to become possible.
302312
303313
:param target_frame: Name of the frame to transform into.
304314
:param source_frame: Name of the input frame.
305315
:param time: The time at which to get the transform (0 will get the latest).
306-
:return: A future that becomes true when the transform is available.
316+
:param timeout: Time to wait for the target frame to become available. Needs a Node instance
317+
to be set on the Buffer to work.
318+
:return: A future that contains the transform when it becomes available.
307319
"""
308320
fut = rclpy.task.Future()
309321
if self.can_transform_core(target_frame, source_frame, time)[0]:
310322
# Short cut, the transform is available
311323
fut.set_result(self.lookup_transform(target_frame, source_frame, time))
312324
return fut
313325

326+
timeout_timer = None
327+
328+
def _on_timeout():
329+
# Deactivate the callback to avoid memory leaks
330+
# Cancel the timer after it has triggered once
331+
timeout_timer.cancel()
332+
# This is needed, as this function otherwise leaks resources.
333+
# timer.destroy() is not sufficient!
334+
self.node.destroy_timer(timeout_timer)
335+
336+
# The transform could be available now, so we check if the future is still pending
337+
if not fut.done():
338+
fut.set_exception(
339+
tf2.LookupException(
340+
f"Failed to find transform from '{source_frame}' to '{target_frame}'"
341+
)
342+
)
343+
344+
# Check if a timeout is specified
345+
if timeout is not None:
346+
if self.node is None:
347+
raise RuntimeError("Async timeouts require a Node instance to be set.")
348+
349+
timeout_timer = self.node.create_timer(
350+
timeout.nanoseconds / 1e9, _on_timeout, clock=self.node.get_clock(),
351+
callback_group=MutuallyExclusiveCallbackGroup()
352+
)
353+
314354
def _on_new_data():
315355
try:
316356
if self.can_transform_core(target_frame, source_frame, time)[0]:
@@ -329,7 +369,8 @@ def wait_for_transform_full_async(
329369
target_time: Time,
330370
source_frame: str,
331371
source_time: Time,
332-
fixed_frame: str
372+
fixed_frame: str,
373+
timeout: Duration | None = None
333374
) -> Future:
334375
"""
335376
Wait for a transform from the source frame to the target frame to become possible.
@@ -339,6 +380,8 @@ def wait_for_transform_full_async(
339380
:param source_frame: Name of the input frame.
340381
:param source_time: The time at which source_frame will be evaluated (0 gets the latest).
341382
:param fixed_frame: Name of the frame to consider constant in time.
383+
:param timeout: Time to wait for the target frame to become available. Needs a Node instance
384+
to be set on the Buffer to work.
342385
:return: A future that becomes true when the transform is available.
343386
"""
344387
fut = rclpy.task.Future()
@@ -350,6 +393,34 @@ def wait_for_transform_full_async(
350393
target_frame, target_time, source_frame, source_time, fixed_frame))
351394
return fut
352395

396+
timeout_timer = None
397+
398+
def _on_timeout():
399+
# Deactivate the callback to avoid memory leaks
400+
# Cancel the timer after it has triggered once
401+
timeout_timer.cancel()
402+
# This is needed, as this function otherwise leaks resources.
403+
# timer.destroy() is not sufficient!
404+
self.node.destroy_timer(timeout_timer)
405+
406+
# The transform could be available now, so we check if the future is still pending
407+
if not fut.done():
408+
fut.set_exception(
409+
tf2.LookupException(
410+
f"Failed to find transform from '{source_frame}' to '{target_frame}'"
411+
)
412+
)
413+
414+
# Check if a timeout is specified
415+
if timeout is not None:
416+
if self.node is None:
417+
raise RuntimeError("Async timeouts require a Node instance to be set.")
418+
419+
timeout_timer = self.node.create_timer(
420+
timeout.nanoseconds / 1e9, _on_timeout, clock=self.node.get_clock(),
421+
callback_group=MutuallyExclusiveCallbackGroup()
422+
)
423+
353424
def _on_new_data():
354425
try:
355426
if self.can_transform_full_core(

0 commit comments

Comments
 (0)