Skip to content

Commit 35450f4

Browse files
Replace spin_until_future_complete with spin_until_complete, add spin_for method
Co-authored-by: Hubert Liberacki <[email protected]> Signed-off-by: Hubert Liberacki <[email protected]> Signed-off-by: Christophe Bedard <[email protected]>
1 parent d2fd8fe commit 35450f4

16 files changed

+311
-128
lines changed

rclpy/rclpy/__init__.py

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,16 +33,20 @@
3333
After a node is created, items of work can be done (e.g. subscription callbacks) by *spinning* on
3434
the node.
3535
The following functions can be used to process work that is waiting to be executed: :func:`spin`,
36-
:func:`spin_once`, and :func:`spin_until_future_complete`.
36+
:func:`spin_once`, and :func:`spin_until_complete`.
3737
3838
When finished with a previously initialized :class:`.Context` (ie. done using
3939
all ROS nodes associated with the context), the :func:`shutdown` function should be called.
4040
This will invalidate all entities derived from the context.
4141
"""
4242

43+
from typing import Callable
4344
from typing import List
4445
from typing import Optional
4546
from typing import TYPE_CHECKING
47+
from typing import Union
48+
49+
import warnings
4650

4751
from rclpy.context import Context
4852
from rclpy.parameter import Parameter
@@ -242,6 +246,52 @@ def spin(node: 'Node', executor: Optional['Executor'] = None) -> None:
242246
executor.remove_node(node)
243247

244248

249+
def spin_for(node: 'Node', executor: 'Executor' = None, duration_sec: float = None) -> None:
250+
"""
251+
Execute block until the context associated with the executor is shutdown or time duration pass.
252+
253+
Callbacks will be executed by the provided executor.
254+
255+
This function blocks.
256+
257+
:param node: A node to add to the executor to check for work.
258+
:param executor: The executor to use, or the global executor if ``None``.
259+
:param timeout_sec: Seconds to wait.
260+
"""
261+
executor = get_global_executor() if executor is None else executor
262+
try:
263+
executor.add_node(node)
264+
executor.spin_for(duration_sec)
265+
finally:
266+
executor.remove_node(node)
267+
268+
269+
def spin_until_complete(
270+
node: 'Node',
271+
condition: Union[Future, Callable[[], bool]],
272+
executor: Optional['Executor'] = None,
273+
timeout_sec: Optional[float] = None,
274+
) -> None:
275+
"""
276+
Execute work until the condition is complete.
277+
278+
Callbacks and other work will be executed by the provided executor until ``condition()`` or
279+
``future.done()`` returns ``True`` or the context associated with the executor is shutdown.
280+
281+
:param node: A node to add to the executor to check for work.
282+
:param condition: The callable or future object to wait on.
283+
:param executor: The executor to use, or the global executor if ``None``.
284+
:param timeout_sec: Seconds to wait. Block until the condition is complete
285+
if ``None`` or negative. Don't wait if 0.
286+
"""
287+
executor = get_global_executor() if executor is None else executor
288+
try:
289+
executor.add_node(node)
290+
executor.spin_until_complete(condition, timeout_sec)
291+
finally:
292+
executor.remove_node(node)
293+
294+
245295
def spin_until_future_complete(
246296
node: 'Node',
247297
future: Future,
@@ -260,9 +310,5 @@ def spin_until_future_complete(
260310
:param timeout_sec: Seconds to wait. Block until the future is complete
261311
if ``None`` or negative. Don't wait if 0.
262312
"""
263-
executor = get_global_executor() if executor is None else executor
264-
try:
265-
executor.add_node(node)
266-
executor.spin_until_future_complete(future, timeout_sec)
267-
finally:
268-
executor.remove_node(node)
313+
warnings.warn('Deprecated in favor of spin_until_complete.')
314+
spin_until_complete(node, future, executor, timeout_sec)

rclpy/rclpy/executors.py

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -307,32 +307,53 @@ def spin(self) -> None:
307307
while self._context.ok() and not self._is_shutdown:
308308
self.spin_once()
309309

310-
def spin_until_future_complete(
310+
def spin_for(self, duration_sec: Optional[float] = None) -> None:
311+
"""Execute callbacks until shutdown, or timeout."""
312+
self.spin_until_complete(lambda: False, duration_sec)
313+
314+
def spin_until_complete(
311315
self,
312-
future: Future,
313-
timeout_sec: Optional[float] = None
316+
condition: Union[Future, Callable[[], bool]],
317+
timeout_sec: Optional[float] = None,
314318
) -> None:
315-
"""Execute callbacks until a given future is done or a timeout occurs."""
316-
# Make sure the future wakes this executor when it is done
317-
future.add_done_callback(lambda x: self.wake())
319+
"""Execute callbacks until a given condition is done or a timeout occurs."""
320+
# Common conditon for safisfying both Callable and Future
321+
finish_condition = None
322+
if (isinstance(condition, Future)):
323+
# Make sure the future wakes this executor when it is done
324+
condition.add_done_callback(lambda x: self.wake())
325+
def finish_condition(): return condition.done()
326+
elif (callable(condition)):
327+
def finish_condition(): return condition()
328+
else:
329+
raise TypeError('Condition has to be of Future or Callable type')
318330

319331
if timeout_sec is None or timeout_sec < 0:
320-
while self._context.ok() and not future.done() and not self._is_shutdown:
321-
self.spin_once_until_future_complete(future, timeout_sec)
332+
while self._context.ok() and not finish_condition() and not self._is_shutdown:
333+
self.spin_once_until_complete(finish_condition, timeout_sec)
322334
else:
323335
start = time.monotonic()
324336
end = start + timeout_sec
325337
timeout_left = TimeoutObject(timeout_sec)
326338

327-
while self._context.ok() and not future.done() and not self._is_shutdown:
328-
self.spin_once_until_future_complete(future, timeout_left)
339+
while self._context.ok() and not finish_condition() and not self._is_shutdown:
340+
self.spin_once_until_complete(finish_condition, timeout_left)
329341
now = time.monotonic()
330342

331343
if now >= end:
332344
return
333345

334346
timeout_left.timeout = end - now
335347

348+
def spin_until_future_complete(
349+
self,
350+
future: Future,
351+
timeout_sec: Optional[float] = None,
352+
) -> None:
353+
"""Execute callbacks until a given future is done or a timeout occurs."""
354+
warnings.warn('Deprecated in favor of spin_until_complete.')
355+
self.spin_until_complete(future, timeout_sec)
356+
336357
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
337358
"""
338359
Wait for and execute a single callback.
@@ -346,6 +367,23 @@ def spin_once(self, timeout_sec: Optional[float] = None) -> None:
346367
"""
347368
raise NotImplementedError()
348369

370+
def spin_once_until_complete(
371+
self,
372+
condition: Union[Future, Callable[[], bool]],
373+
timeout_sec: Optional[Union[float, TimeoutObject]] = None,
374+
) -> None:
375+
"""
376+
Wait for and execute a single callback.
377+
378+
This should behave in the same way as :meth:`spin_once`.
379+
If needed by the implementation, it should awake other threads waiting.
380+
381+
:param condition: The executor will wait until this condition is done.
382+
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
383+
Don't wait if 0.
384+
"""
385+
raise NotImplementedError()
386+
349387
def spin_once_until_future_complete(
350388
self,
351389
future: Future,
@@ -361,6 +399,7 @@ def spin_once_until_future_complete(
361399
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
362400
Don't wait if 0.
363401
"""
402+
warnings.warn('Deprecated in favor of spin_once_until_complete.')
364403
raise NotImplementedError()
365404

366405
def _take_timer(self, tmr):
@@ -826,13 +865,23 @@ def _spin_once_impl(
826865
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
827866
self._spin_once_impl(timeout_sec)
828867

868+
def spin_once_until_complete(
869+
self,
870+
condition: Union[Future, Callable[[], bool]],
871+
timeout_sec: Optional[Union[float, TimeoutObject]] = None,
872+
) -> None:
873+
if isinstance(condition, Future):
874+
condition.add_done_callback(lambda x: self.wake())
875+
condition = condition.done
876+
self._spin_once_impl(timeout_sec, condition)
877+
829878
def spin_once_until_future_complete(
830879
self,
831880
future: Future,
832881
timeout_sec: Optional[Union[float, TimeoutObject]] = None
833882
) -> None:
834-
future.add_done_callback(lambda x: self.wake())
835-
self._spin_once_impl(timeout_sec, future.done)
883+
warnings.warn('Deprecated in favor of spin_once_until_complete.')
884+
self.spin_once_until_complete(future.done, timeout_sec)
836885

837886

838887
class MultiThreadedExecutor(Executor):
@@ -898,10 +947,20 @@ def _spin_once_impl(
898947
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
899948
self._spin_once_impl(timeout_sec)
900949

950+
def spin_once_until_complete(
951+
self,
952+
condition: Union[Future, Callable[[], bool]],
953+
timeout_sec: float = None,
954+
) -> None:
955+
if isinstance(condition, Future):
956+
condition.add_done_callback(lambda x: self.wake())
957+
condition = condition.done
958+
self._spin_once_impl(timeout_sec, condition)
959+
901960
def spin_once_until_future_complete(
902961
self,
903962
future: Future,
904963
timeout_sec: Optional[Union[float, TimeoutObject]] = None
905964
) -> None:
906-
future.add_done_callback(lambda x: self.wake())
907-
self._spin_once_impl(timeout_sec, future.done)
965+
warnings.warn('Deprecated in favor of spin_once_until_complete.')
966+
self.spin_once_until_complete(future.done, timeout_sec)

rclpy/rclpy/parameter_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def __init__(
6464
Parameter('int_param', Parameter.Type.INTEGER, 88),
6565
Parameter('string/param', Parameter.Type.STRING, 'hello world').to_parameter_msg(),
6666
])
67-
self.executor.spin_until_future_complete(future)
67+
self.executor.spin_until_complete(future)
6868
results = future.result() # rcl_interfaces.srv.SetParameters.Response
6969
7070
For more on service names, see: `ROS 2 docs`_.

rclpy/test/test_action_client.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ def test_send_goal_async(self):
159159
try:
160160
self.assertTrue(ac.wait_for_server(timeout_sec=2.0))
161161
future = ac.send_goal_async(Fibonacci.Goal())
162-
rclpy.spin_until_future_complete(self.node, future, self.executor)
162+
rclpy.spin_until_complete(self.node, future, self.executor)
163163
self.assertTrue(future.done())
164164
goal_handle = future.result()
165165
self.assertTrue(goal_handle.accepted)
@@ -177,7 +177,7 @@ def test_send_goal_async_with_feedback_after_goal(self):
177177
Fibonacci.Goal(),
178178
feedback_callback=self.feedback_callback,
179179
goal_uuid=goal_uuid)
180-
rclpy.spin_until_future_complete(self.node, future, self.executor)
180+
rclpy.spin_until_complete(self.node, future, self.executor)
181181

182182
# Publish feedback after goal has been accepted
183183
self.mock_action_server.publish_feedback(goal_uuid)
@@ -202,7 +202,7 @@ def test_send_goal_async_with_feedback_before_goal(self):
202202
Fibonacci.Goal(),
203203
feedback_callback=self.feedback_callback,
204204
goal_uuid=goal_uuid)
205-
rclpy.spin_until_future_complete(self.node, future, self.executor)
205+
rclpy.spin_until_complete(self.node, future, self.executor)
206206

207207
# Check the feedback was not received
208208
self.assertEqual(self.feedback, None)
@@ -220,12 +220,12 @@ def test_send_goal_async_with_feedback_after_goal_result_requested(self):
220220
Fibonacci.Goal(),
221221
feedback_callback=self.feedback_callback,
222222
goal_uuid=goal_uuid)
223-
rclpy.spin_until_future_complete(self.node, goal_future, self.executor)
223+
rclpy.spin_until_complete(self.node, goal_future, self.executor)
224224
self.assertTrue(goal_future.done())
225225
# Then request result
226226
goal_handle = goal_future.result()
227227
result_future = goal_handle.get_result_async()
228-
rclpy.spin_until_future_complete(self.node, result_future, self.executor)
228+
rclpy.spin_until_complete(self.node, result_future, self.executor)
229229
self.assertTrue(result_future.done())
230230

231231
# Publish feedback after goal result is requested
@@ -246,14 +246,14 @@ def test_send_goal_async_with_feedback_for_another_goal(self):
246246
Fibonacci.Goal(),
247247
feedback_callback=self.feedback_callback,
248248
goal_uuid=first_goal_uuid)
249-
rclpy.spin_until_future_complete(self.node, future, self.executor)
249+
rclpy.spin_until_complete(self.node, future, self.executor)
250250

251251
# Send another goal, but without a feedback callback
252252
second_goal_uuid = UUID(uuid=list(uuid.uuid4().bytes))
253253
future = ac.send_goal_async(
254254
Fibonacci.Goal(),
255255
goal_uuid=second_goal_uuid)
256-
rclpy.spin_until_future_complete(self.node, future, self.executor)
256+
rclpy.spin_until_complete(self.node, future, self.executor)
257257

258258
# Publish feedback for the second goal
259259
self.mock_action_server.publish_feedback(second_goal_uuid)
@@ -277,7 +277,7 @@ def test_send_goal_async_with_feedback_for_not_a_goal(self):
277277
Fibonacci.Goal(),
278278
feedback_callback=self.feedback_callback,
279279
goal_uuid=goal_uuid)
280-
rclpy.spin_until_future_complete(self.node, future, self.executor)
280+
rclpy.spin_until_complete(self.node, future, self.executor)
281281

282282
# Publish feedback for a non-existent goal ID
283283
self.mock_action_server.publish_feedback(UUID(uuid=list(uuid.uuid4().bytes)))
@@ -298,9 +298,9 @@ def test_send_goal_multiple(self):
298298
future_0 = ac.send_goal_async(Fibonacci.Goal())
299299
future_1 = ac.send_goal_async(Fibonacci.Goal())
300300
future_2 = ac.send_goal_async(Fibonacci.Goal())
301-
rclpy.spin_until_future_complete(self.node, future_0, executor)
302-
rclpy.spin_until_future_complete(self.node, future_1, executor)
303-
rclpy.spin_until_future_complete(self.node, future_2, executor)
301+
rclpy.spin_until_complete(self.node, future_0, executor)
302+
rclpy.spin_until_complete(self.node, future_1, executor)
303+
rclpy.spin_until_complete(self.node, future_2, executor)
304304
self.assertTrue(future_0.done())
305305
self.assertTrue(future_1.done())
306306
self.assertTrue(future_2.done())
@@ -326,13 +326,13 @@ def test_send_cancel_async(self):
326326

327327
# Send a goal
328328
goal_future = ac.send_goal_async(Fibonacci.Goal())
329-
rclpy.spin_until_future_complete(self.node, goal_future, self.executor)
329+
rclpy.spin_until_complete(self.node, goal_future, self.executor)
330330
self.assertTrue(goal_future.done())
331331
goal_handle = goal_future.result()
332332

333333
# Cancel the goal
334334
cancel_future = goal_handle.cancel_goal_async()
335-
rclpy.spin_until_future_complete(self.node, cancel_future, self.executor)
335+
rclpy.spin_until_complete(self.node, cancel_future, self.executor)
336336
self.assertTrue(cancel_future.done())
337337
self.assertEqual(
338338
cancel_future.result().goals_canceling[0].goal_id,
@@ -347,13 +347,13 @@ def test_get_result_async(self):
347347

348348
# Send a goal
349349
goal_future = ac.send_goal_async(Fibonacci.Goal())
350-
rclpy.spin_until_future_complete(self.node, goal_future, self.executor)
350+
rclpy.spin_until_complete(self.node, goal_future, self.executor)
351351
self.assertTrue(goal_future.done())
352352
goal_handle = goal_future.result()
353353

354354
# Get the goal result
355355
result_future = goal_handle.get_result_async()
356-
rclpy.spin_until_future_complete(self.node, result_future, self.executor)
356+
rclpy.spin_until_complete(self.node, result_future, self.executor)
357357
self.assertTrue(result_future.done())
358358
finally:
359359
ac.destroy()

0 commit comments

Comments
 (0)