Skip to content

Commit 9b94372

Browse files
Add spin_until_complete and spin_for
Co-authored-by: Hubert Liberacki <[email protected]> Signed-off-by: Hubert Liberacki <[email protected]> Signed-off-by: Christophe Bedard <[email protected]>
1 parent c8d3481 commit 9b94372

File tree

3 files changed

+183
-11
lines changed

3 files changed

+183
-11
lines changed

rclpy/rclpy/__init__.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
This will invalidate all entities derived from the context.
4141
"""
4242

43+
from typing import Callable
4344
from typing import List
4445
from typing import Optional
4546
from typing import TYPE_CHECKING
@@ -242,6 +243,52 @@ def spin(node: 'Node', executor: Optional['Executor'] = None) -> None:
242243
executor.remove_node(node)
243244

244245

246+
def spin_for(node: 'Node', executor: 'Executor' = None, duration_sec: float = None) -> None:
247+
"""
248+
Execute work for some time.
249+
250+
Callbacks will be executed by the provided executor until the context associated with the
251+
executor is shut down or the given time duration passes.
252+
253+
:param node: A node to add to the executor to check for work.
254+
:param executor: The executor to use, or the global executor if ``None``.
255+
:param timeout_sec: Seconds to wait (blocking).
256+
"""
257+
executor = get_global_executor() if executor is None else executor
258+
try:
259+
executor.add_node(node)
260+
executor.spin_for(duration_sec)
261+
finally:
262+
executor.remove_node(node)
263+
264+
265+
def spin_until_complete(
266+
node: 'Node',
267+
condition: Callable[[], bool],
268+
executor: Optional['Executor'] = None,
269+
timeout_sec: Optional[float] = None,
270+
) -> None:
271+
"""
272+
Execute work until the condition is complete.
273+
274+
Callbacks and other work will be executed by the provided executor until ``condition()``
275+
returns ``True`` or the context associated with the executor is shutdown.
276+
277+
:param node: A node to add to the executor to check for work.
278+
:param condition: The callable condition to wait on. If this condition is not related to what
279+
the executor is waiting on and the timeout is infinite, this could block forever.
280+
:param executor: The executor to use, or the global executor if ``None``.
281+
:param timeout_sec: Seconds to wait. Block until the condition is complete
282+
if ``None`` or negative. Don't wait if 0.
283+
"""
284+
executor = get_global_executor() if executor is None else executor
285+
try:
286+
executor.add_node(node)
287+
executor.spin_until_complete(condition, timeout_sec)
288+
finally:
289+
executor.remove_node(node)
290+
291+
245292
def spin_until_future_complete(
246293
node: 'Node',
247294
future: Future,

rclpy/rclpy/executors.py

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -307,32 +307,44 @@ def spin(self) -> None:
307307
while self._context.ok() and not self._is_shutdown:
308308
self.spin_once()
309309

310-
def spin_until_future_complete(
310+
def spin_for(self, duration_sec: Optional[float] = None) -> None:
311+
"""Execute callbacks until shutdown or timeout."""
312+
self.spin_until_complete(lambda: False, duration_sec)
313+
314+
def spin_until_complete(
311315
self,
312-
future: Future,
313-
timeout_sec: Optional[float] = None
316+
condition: Callable[[], bool],
317+
timeout_sec: Optional[float] = None,
314318
) -> None:
315-
"""Execute callbacks until a given future is done or a timeout occurs."""
316-
# Make sure the future wakes this executor when it is done
317-
future.add_done_callback(lambda x: self.wake())
318-
319+
"""Execute callbacks until a given condition is complete or a timeout occurs."""
319320
if timeout_sec is None or timeout_sec < 0:
320-
while self._context.ok() and not future.done() and not self._is_shutdown:
321-
self.spin_once_until_future_complete(future, timeout_sec)
321+
while self._context.ok() and not condition() and not self._is_shutdown:
322+
self.spin_once_until_complete(condition, timeout_sec)
322323
else:
323324
start = time.monotonic()
324325
end = start + timeout_sec
325326
timeout_left = TimeoutObject(timeout_sec)
326327

327-
while self._context.ok() and not future.done() and not self._is_shutdown:
328-
self.spin_once_until_future_complete(future, timeout_left)
328+
while self._context.ok() and not condition() and not self._is_shutdown:
329+
self.spin_once_until_complete(condition, timeout_left)
329330
now = time.monotonic()
330331

331332
if now >= end:
332333
return
333334

334335
timeout_left.timeout = end - now
335336

337+
def spin_until_future_complete(
338+
self,
339+
future: Future,
340+
timeout_sec: Optional[float] = None,
341+
) -> None:
342+
"""Execute callbacks until a given future is done or a timeout occurs."""
343+
future.add_done_callback(lambda x: self.wake())
344+
def condition() -> bool:
345+
return future.done()
346+
self.spin_until_complete(condition, timeout_sec)
347+
336348
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
337349
"""
338350
Wait for and execute a single callback.
@@ -346,6 +358,23 @@ def spin_once(self, timeout_sec: Optional[float] = None) -> None:
346358
"""
347359
raise NotImplementedError()
348360

361+
def spin_once_until_complete(
362+
self,
363+
condition: Callable[[], bool],
364+
timeout_sec: Optional[Union[float, TimeoutObject]] = None,
365+
) -> None:
366+
"""
367+
Wait for and execute a single callback.
368+
369+
This should behave in the same way as :meth:`spin_once`.
370+
If needed by the implementation, it should awake other threads waiting.
371+
372+
:param condition: The callable condition to wait on.
373+
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
374+
Don't wait if 0.
375+
"""
376+
raise NotImplementedError()
377+
349378
def spin_once_until_future_complete(
350379
self,
351380
future: Future,
@@ -826,6 +855,13 @@ def _spin_once_impl(
826855
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
827856
self._spin_once_impl(timeout_sec)
828857

858+
def spin_once_until_complete(
859+
self,
860+
condition: Callable[[], bool],
861+
timeout_sec: Optional[Union[float, TimeoutObject]] = None,
862+
) -> None:
863+
self._spin_once_impl(timeout_sec, condition)
864+
829865
def spin_once_until_future_complete(
830866
self,
831867
future: Future,
@@ -898,6 +934,13 @@ def _spin_once_impl(
898934
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
899935
self._spin_once_impl(timeout_sec)
900936

937+
def spin_once_until_complete(
938+
self,
939+
condition: Callable[[], bool],
940+
timeout_sec: float = None,
941+
) -> None:
942+
self._spin_once_impl(timeout_sec, condition)
943+
901944
def spin_once_until_future_complete(
902945
self,
903946
future: Future,

rclpy/test/test_executor.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,22 @@ def test_executor_add_node(self):
380380
assert not executor.add_node(self.node)
381381
assert id(executor) == id(self.node.executor)
382382

383+
def test_executor_spin_for(self):
384+
self.assertIsNotNone(self.node.handle)
385+
executor = SingleThreadedExecutor(context=self.context)
386+
executor.add_node(self.node)
387+
388+
def timer_callback():
389+
pass
390+
timer = self.node.create_timer(1.0, timer_callback)
391+
392+
start = time.monotonic()
393+
executor.spin_for(duration_sec=10.0)
394+
end = time.monotonic()
395+
self.assertGreaterEqual(end - start, 10.0)
396+
397+
timer.cancel()
398+
383399
def test_executor_spin_until_future_complete_timeout(self):
384400
self.assertIsNotNone(self.node.handle)
385401
executor = SingleThreadedExecutor(context=self.context)
@@ -402,6 +418,72 @@ def timer_callback():
402418

403419
timer.cancel()
404420

421+
def test_executor_spin_until_complete_condition_done(self):
422+
self.assertIsNotNone(self.node.handle)
423+
executor = SingleThreadedExecutor(context=self.context)
424+
executor.add_node(self.node)
425+
426+
def timer_callback():
427+
pass
428+
timer = self.node.create_timer(0.1, timer_callback)
429+
430+
condition_var = False
431+
432+
def set_condition():
433+
nonlocal condition_var
434+
condition_var = True
435+
436+
def condition():
437+
nonlocal condition_var
438+
return condition_var
439+
440+
# Condition complete timeout_sec > 0
441+
self.assertFalse(condition())
442+
t = threading.Thread(target=lambda: set_condition())
443+
t.start()
444+
executor.spin_until_complete(condition, timeout_sec=1.0)
445+
self.assertTrue(condition())
446+
447+
# timeout_sec = None
448+
condition_var = False
449+
self.assertFalse(condition())
450+
t = threading.Thread(target=lambda: set_condition())
451+
t.start()
452+
executor.spin_until_complete(condition, timeout_sec=None)
453+
self.assertTrue(condition())
454+
455+
# Condition complete timeout < 0
456+
condition_var = False
457+
self.assertFalse(condition())
458+
t = threading.Thread(target=lambda: set_condition())
459+
t.start()
460+
executor.spin_until_complete(condition, timeout_sec=-1)
461+
self.assertTrue(condition())
462+
463+
timer.cancel()
464+
465+
def test_executor_spin_until_complete_do_not_wait(self):
466+
self.assertIsNotNone(self.node.handle)
467+
executor = SingleThreadedExecutor(context=self.context)
468+
executor.add_node(self.node)
469+
470+
def timer_callback():
471+
pass
472+
timer = self.node.create_timer(0.1, timer_callback)
473+
474+
condition_var = False
475+
476+
def condition():
477+
nonlocal condition_var
478+
return condition_var
479+
480+
# Do not wait timeout_sec = 0
481+
self.assertFalse(condition())
482+
executor.spin_until_complete(condition, timeout_sec=0)
483+
self.assertFalse(condition())
484+
485+
timer.cancel()
486+
405487
def test_executor_spin_until_future_complete_future_done(self):
406488
self.assertIsNotNone(self.node.handle)
407489
executor = SingleThreadedExecutor(context=self.context)

0 commit comments

Comments
 (0)