Skip to content

Commit 630aa42

Browse files
committed
Add AbstractExecutor and BaseExecutor
Signed-off-by: Nadav Elkabets <[email protected]>
1 parent a7c8631 commit 630aa42

File tree

1 file changed

+229
-81
lines changed

1 file changed

+229
-81
lines changed

rclpy/rclpy/executors.py

Lines changed: 229 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,236 @@ def timeout(self, timeout: float) -> None:
174174
self._timeout = timeout
175175

176176

177-
class Executor(ContextManager['Executor']):
177+
class AbstractExecutor:
178+
"""Abstract Executor API."""
179+
180+
@property
181+
def context(self) -> Context:
182+
"""Get the context associated with the executor."""
183+
raise NotImplementedError()
184+
185+
def wake(self) -> None:
186+
"""
187+
Wake the executor because something changed.
188+
189+
This is used to tell the executor when entities are created or destroyed.
190+
"""
191+
raise NotImplementedError()
192+
193+
def shutdown(self, timeout_sec: Optional[float] = None) -> bool:
194+
"""
195+
Stop executing callbacks and wait for their completion.
196+
197+
:param timeout_sec: Seconds to wait. Block forever if ``None`` or negative.
198+
Don't wait if 0.
199+
:return: ``True`` if all outstanding callbacks finished executing, or ``False`` if the
200+
timeout expires before all outstanding work is done.
201+
"""
202+
raise NotImplementedError()
203+
204+
def spin(self) -> None:
205+
"""Execute callbacks until shutdown."""
206+
raise NotImplementedError()
207+
208+
def spin_until_future_complete(
209+
self,
210+
future: Future[Any],
211+
timeout_sec: Optional[float] = None
212+
) -> None:
213+
"""Execute callbacks until a given future is done or a timeout occurs."""
214+
raise NotImplementedError()
215+
216+
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
217+
"""
218+
Wait for and execute a single callback.
219+
220+
This method should not be called from multiple threads.
221+
222+
:param timeout_sec: Seconds to wait. Block forever if ``None`` or negative.
223+
Don't wait if 0.
224+
"""
225+
raise NotImplementedError()
226+
227+
def spin_once_until_future_complete(
228+
self,
229+
future: Future[Any],
230+
timeout_sec: Optional[Union[float, TimeoutObject]] = None
231+
) -> None:
232+
"""
233+
Wait for and execute a single callback.
234+
235+
This should behave in the same way as :meth:`spin_once`.
236+
If needed by the implementation, it should awake other threads waiting.
237+
238+
:param future: The executor will wait until this future is done.
239+
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
240+
Don't wait if 0.
241+
"""
242+
raise NotImplementedError()
243+
244+
@overload
245+
def create_task(
246+
self,
247+
callback: Callable[..., Coroutine[Any, Any, T]],
248+
*args: Any,
249+
**kwargs: Any
250+
) -> Task[T]: ...
251+
252+
@overload
253+
def create_task(
254+
self,
255+
callback: Callable[..., T],
256+
*args: Any,
257+
**kwargs: Any
258+
) -> Task[T]: ...
259+
260+
def create_task(
261+
self,
262+
callback: Callable[..., Any],
263+
*args: Any,
264+
**kwargs: Any
265+
) -> Task[Any]:
266+
"""
267+
Add a callback or coroutine to be executed during :meth:`spin` and return a Future.
268+
269+
Arguments to this function are passed to the callback.
270+
271+
.. warning:: Created task is queued in the executor in FIFO order,
272+
but users should not rely on the task execution order.
273+
274+
:param callback: A callback to be run in the executor.
275+
"""
276+
277+
def create_future(self) -> Future[Any]:
278+
"""Create a Future object attached to the Executor."""
279+
raise NotImplementedError()
280+
281+
def add_node(self, node: 'Node') -> bool:
282+
"""
283+
Add a node whose callbacks should be managed by this executor.
284+
285+
:param node: The node to add to the executor.
286+
:return: ``True`` if the node was added, ``False`` otherwise.
287+
"""
288+
raise NotImplementedError()
289+
290+
def remove_node(self, node: 'Node') -> None:
291+
"""
292+
Stop managing this node's callbacks.
293+
294+
:param node: The node to remove from the executor.
295+
"""
296+
raise NotImplementedError()
297+
298+
def get_nodes(self) -> List['Node']:
299+
"""Return nodes that have been added to this executor."""
300+
raise NotImplementedError()
301+
302+
303+
class BaseExecutor(AbstractExecutor):
304+
"""The base class for an executor."""
305+
306+
def create_future(self) -> Future:
307+
return Future(executor=self)
308+
309+
def _take_subscription(
310+
self,
311+
sub: Subscription[Any]
312+
) -> Optional[Callable[[], Coroutine[None, None, None]]]:
313+
try:
314+
with sub.handle:
315+
msg_info = sub.handle.take_message(sub.msg_type, sub.raw)
316+
if msg_info is None:
317+
return None
318+
319+
if sub._callback_type is Subscription.CallbackType.MessageOnly:
320+
msg_tuple: Union[Tuple[Msg], Tuple[Msg, MessageInfo]] = (msg_info[0], )
321+
else:
322+
msg_tuple = msg_info
323+
324+
async def _execute() -> None:
325+
await await_or_execute(sub.callback, *msg_tuple)
326+
327+
return _execute
328+
except InvalidHandle:
329+
# Subscription is a Destroyable, which means that on __enter__ it can throw an
330+
# InvalidHandle exception if the entity has already been destroyed. Handle that here
331+
# by just returning an empty argument, which means we will skip doing any real work
332+
# in _execute_subscription below
333+
pass
334+
335+
return None
336+
337+
def _take_client(
338+
self,
339+
client: Client[Any, Any]
340+
) -> Optional[Callable[[], Coroutine[None, None, None]]]:
341+
try:
342+
with client.handle:
343+
header_and_response = client.handle.take_response(client.srv_type.Response)
344+
345+
async def _execute() -> None:
346+
header, response = header_and_response
347+
if header is None:
348+
return
349+
try:
350+
sequence = header.request_id.sequence_number
351+
future = client.get_pending_request(sequence)
352+
except KeyError:
353+
# The request was cancelled
354+
pass
355+
else:
356+
if isinstance(future, Future) and not future._executor():
357+
future._set_executor(self)
358+
359+
future.set_result(response)
360+
return _execute
361+
362+
except InvalidHandle:
363+
# Client is a Destroyable, which means that on __enter__ it can throw an
364+
# InvalidHandle exception if the entity has already been destroyed. Handle that here
365+
# by just returning an empty argument, which means we will skip doing any real work
366+
# in _execute_client below
367+
pass
368+
369+
return None
370+
371+
def _take_service(
372+
self,
373+
srv: Service[Any, Any]
374+
) -> Optional[Callable[[], Coroutine[None, None, None]]]:
375+
try:
376+
with srv.handle:
377+
request_and_header = srv.handle.service_take_request(srv.srv_type.Request)
378+
379+
async def _execute() -> None:
380+
(request, header) = request_and_header
381+
if header is None:
382+
return
383+
384+
response = await await_or_execute(srv.callback, request, srv.srv_type.Response())
385+
srv.send_response(response, header)
386+
return _execute
387+
except InvalidHandle:
388+
# Service is a Destroyable, which means that on __enter__ it can throw an
389+
# InvalidHandle exception if the entity has already been destroyed. Handle that here
390+
# by just returning an empty argument, which means we will skip doing any real work
391+
# in _execute_service below
392+
pass
393+
394+
return None
395+
396+
397+
class Executor(ContextManager['Executor'], BaseExecutor):
178398
"""
179-
The base class for an executor.
399+
The base class for a wait-set based executor.
180400
181401
An executor controls the threading model used to process callbacks. Callbacks are units of work
182402
like subscription callbacks, timer callbacks, service calls, and received client responses. An
183403
executor controls which threads callbacks get executed in.
184404
185405
A custom executor must define :meth:`spin_once`.
406+
A custom executor should use :meth:`wait_for_ready_callbacks` to get work.
186407
If the executor has any cleanup then it should also define :meth:`shutdown`.
187408
188409
:param context: The context to be associated with, or ``None`` for the default global context.
@@ -226,30 +447,14 @@ def __init__(self, *, context: Optional[Context] = None) -> None:
226447

227448
@property
228449
def context(self) -> Context:
229-
"""Get the context associated with the executor."""
230450
return self._context
231451

232-
@overload
233-
def create_task(self, callback: Callable[..., Coroutine[Any, Any, T]],
234-
*args: Any, **kwargs: Any
235-
) -> Task[T]: ...
236-
237-
@overload
238-
def create_task(self, callback: Callable[..., T], *args: Any, **kwargs: Any
239-
) -> Task[T]: ...
240-
241-
def create_task(self, callback: Callable[..., Any], *args: Any, **kwargs: Any
242-
) -> Task[Any]:
243-
"""
244-
Add a callback or coroutine to be executed during :meth:`spin` and return a Future.
245-
246-
Arguments to this function are passed to the callback.
247-
248-
.. warning:: Created task is queued in the executor in FIFO order,
249-
but users should not rely on the task execution order.
250-
251-
:param callback: A callback to be run in the executor.
252-
"""
452+
def create_task(
453+
self,
454+
callback: Callable[..., Any],
455+
*args: Any,
456+
**kwargs: Any
457+
) -> Task[Any]:
253458
task = Task(callback, args, kwargs, executor=self)
254459
with self._tasks_lock:
255460
self._tasks.append((task, None, None))
@@ -259,14 +464,6 @@ def create_task(self, callback: Callable[..., Any], *args: Any, **kwargs: Any
259464
return task
260465

261466
def shutdown(self, timeout_sec: Optional[float] = None) -> bool:
262-
"""
263-
Stop executing callbacks and wait for their completion.
264-
265-
:param timeout_sec: Seconds to wait. Block forever if ``None`` or negative.
266-
Don't wait if 0.
267-
:return: ``True`` if all outstanding callbacks finished executing, or ``False`` if the
268-
timeout expires before all outstanding work is done.
269-
"""
270467
with self._shutdown_lock:
271468
if not self._is_shutdown:
272469
self._is_shutdown = True
@@ -298,12 +495,6 @@ def __del__(self) -> None:
298495
self._sigint_gc.destroy()
299496

300497
def add_node(self, node: 'Node') -> bool:
301-
"""
302-
Add a node whose callbacks should be managed by this executor.
303-
304-
:param node: The node to add to the executor.
305-
:return: ``True`` if the node was added, ``False`` otherwise.
306-
"""
307498
with self._nodes_lock:
308499
if node not in self._nodes:
309500
self._nodes.add(node)
@@ -315,11 +506,6 @@ def add_node(self, node: 'Node') -> bool:
315506
return False
316507

317508
def remove_node(self, node: 'Node') -> None:
318-
"""
319-
Stop managing this node's callbacks.
320-
321-
:param node: The node to remove from the executor.
322-
"""
323509
with self._nodes_lock:
324510
try:
325511
self._nodes.remove(node)
@@ -331,21 +517,14 @@ def remove_node(self, node: 'Node') -> None:
331517
self._guard.trigger()
332518

333519
def wake(self) -> None:
334-
"""
335-
Wake the executor because something changed.
336-
337-
This is used to tell the executor when entities are created or destroyed.
338-
"""
339520
if self._guard:
340521
self._guard.trigger()
341522

342523
def get_nodes(self) -> List['Node']:
343-
"""Return nodes that have been added to this executor."""
344524
with self._nodes_lock:
345525
return list(self._nodes)
346526

347527
def spin(self) -> None:
348-
"""Execute callbacks until shutdown."""
349528
while self._context.ok() and not self._is_shutdown:
350529
self.spin_once()
351530

@@ -354,7 +533,6 @@ def spin_until_future_complete(
354533
future: Future[Any],
355534
timeout_sec: Optional[float] = None
356535
) -> None:
357-
"""Execute callbacks until a given future is done or a timeout occurs."""
358536
# Make sure the future wakes this executor when it is done
359537
future.add_done_callback(lambda x: self.wake())
360538

@@ -385,36 +563,6 @@ def spin_until_future_complete(
385563

386564
timeout_left.timeout = end - now
387565

388-
def spin_once(self, timeout_sec: Optional[float] = None) -> None:
389-
"""
390-
Wait for and execute a single callback.
391-
392-
A custom executor should use :meth:`wait_for_ready_callbacks` to get work.
393-
394-
This method should not be called from multiple threads.
395-
396-
:param timeout_sec: Seconds to wait. Block forever if ``None`` or negative.
397-
Don't wait if 0.
398-
"""
399-
raise NotImplementedError()
400-
401-
def spin_once_until_future_complete(
402-
self,
403-
future: Future[Any],
404-
timeout_sec: Optional[Union[float, TimeoutObject]] = None
405-
) -> None:
406-
"""
407-
Wait for and execute a single callback.
408-
409-
This should behave in the same way as :meth:`spin_once`.
410-
If needed by the implementation, it should awake other threads waiting.
411-
412-
:param future: The executor will wait until this future is done.
413-
:param timeout_sec: Maximum seconds to wait. Block forever if ``None`` or negative.
414-
Don't wait if 0.
415-
"""
416-
raise NotImplementedError()
417-
418566
def _spin_once_until_future_complete(
419567
self,
420568
future: Future[Any],

0 commit comments

Comments
 (0)