Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 66 additions & 3 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import deque
from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack
<<<<<<< HEAD
=======
from dataclasses import dataclass
from functools import partial
>>>>>>> 9695271 (Fix issues with resuming async tasks awaiting a future (#1469))
import inspect
import os
from threading import Condition
Expand All @@ -25,6 +31,11 @@
from typing import Callable
from typing import ContextManager
from typing import Coroutine
<<<<<<< HEAD
=======
from typing import Deque
from typing import Dict
>>>>>>> 9695271 (Fix issues with resuming async tasks awaiting a future (#1469))
from typing import Generator
from typing import List
from typing import Optional
Expand Down Expand Up @@ -150,6 +161,12 @@ def timeout(self, timeout):
self._timeout = timeout


@dataclass
class TaskData:
source_node: 'Optional[Node]' = None
source_entity: 'Optional[Entity]' = None


class Executor(ContextManager['Executor']):
"""
The base class for an executor.
Expand Down Expand Up @@ -179,8 +196,15 @@ def __init__(self, *, context: Optional[Context] = None) -> None:
self._context = get_default_context() if context is None else context
self._nodes: Set[Node] = set()
self._nodes_lock = RLock()
<<<<<<< HEAD
# Tasks to be executed (oldest first) 3-tuple Task, Entity, Node
self._tasks: List[Tuple[Task, Optional[WaitableEntityType], Optional[Node]]] = []
=======
# all tasks that are not complete or canceled
self._pending_tasks: Dict[Task, TaskData] = {}
# tasks that are ready to execute
self._ready_tasks: Deque[Task[Any]] = deque()
>>>>>>> 9695271 (Fix issues with resuming async tasks awaiting a future (#1469))
self._tasks_lock = Lock()
# This is triggered when wait_for_ready_callbacks should rebuild the wait list
self._guard = GuardCondition(
Expand Down Expand Up @@ -214,10 +238,27 @@ def create_task(self, callback: Union[Callable, Coroutine], *args, **kwargs) ->
"""
task = Task(callback, args, kwargs, executor=self)
with self._tasks_lock:
<<<<<<< HEAD
self._tasks.append((task, None, None))
self._guard.trigger()
# Task inherits from Future
return task
=======
self._pending_tasks[task] = TaskData()
self._call_task_in_next_spin(task)
return task

def _call_task_in_next_spin(self, task: Task) -> None:
"""
Add a task to the executor to be executed in the next spin.

:param task: A task to be run in the executor.
"""
with self._tasks_lock:
self._ready_tasks.append(task)
if self._guard:
self._guard.trigger()
>>>>>>> 9695271 (Fix issues with resuming async tasks awaiting a future (#1469))

def create_future(self) -> Future:
"""Create a Future object attached to the Executor."""
Expand Down Expand Up @@ -537,7 +578,10 @@ async def handler(entity, gc, is_shutdown, work_tracker):
handler, (entity, self._guard, self._is_shutdown, self._work_tracker),
executor=self)
with self._tasks_lock:
self._tasks.append((task, entity, node))
self._pending_tasks[task] = TaskData(
source_entity=entity,
source_node=node
)
return task

def can_execute(self, entity: WaitableEntityType) -> bool:
Expand Down Expand Up @@ -581,8 +625,8 @@ def _wait_for_ready_callbacks(
nodes_to_use = self.get_nodes()

# Yield tasks in-progress before waiting for new work
tasks = None
with self._tasks_lock:
<<<<<<< HEAD
tasks = list(self._tasks)
if tasks:
for task, entity, node in reversed(tasks):
Expand All @@ -595,7 +639,26 @@ def _wait_for_ready_callbacks(
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))
# Get rid of any tasks that are cancelled
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].cancelled(), self._tasks))

=======
# Get rid of any tasks that are done or cancelled
for task in list(self._pending_tasks.keys()):
if task.done() or task.cancelled():
del self._pending_tasks[task]
>>>>>>> 9695271 (Fix issues with resuming async tasks awaiting a future (#1469))

ready_tasks_count = len(self._ready_tasks)
for _ in range(ready_tasks_count):
task = self._ready_tasks.popleft()
task_data = self._pending_tasks[task]
node = task_data.source_node
if node is None or node in nodes_to_use:
entity = task_data.source_entity
yielded_work = True
yield task, entity, node
else:
# Asked not to execute these tasks, so don't do them yet
with self._tasks_lock:
self._ready_tasks.append(task)
# Gather entities that can be waited on
subscriptions: List[Subscription] = []
guards: List[GuardCondition] = []
Expand Down
60 changes: 58 additions & 2 deletions rclpy/rclpy/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,17 @@ def __del__(self):
'The following exception was never retrieved: ' + str(self._exception),
file=sys.stderr)

<<<<<<< HEAD
def __await__(self):
=======
def __await__(self) -> Generator['Future[T]', None, Optional[T]]:
>>>>>>> 9695271 (Fix issues with resuming async tasks awaiting a future (#1469))
# Yield if the task is not finished
while self._pending():
yield
if self._pending():
# This tells the task to suspend until the future is done
yield self
if self._pending():
raise RuntimeError('Future awaited a second time before it was done')
return self.result()

def _pending(self) -> bool:
Expand Down Expand Up @@ -264,6 +271,7 @@ def __call__(self):
self._executing = True

if inspect.iscoroutine(self._handler):
<<<<<<< HEAD
# Execute a coroutine
try:
self._handler.send(None)
Expand All @@ -274,6 +282,9 @@ def __call__(self):
except Exception as e:
self.set_exception(e)
self._complete_task()
=======
self._execute_coroutine_step(self._handler)
>>>>>>> 9695271 (Fix issues with resuming async tasks awaiting a future (#1469))
else:
# Execute a normal function
try:
Expand All @@ -286,7 +297,52 @@ def __call__(self):
finally:
self._task_lock.release()

<<<<<<< HEAD
def _complete_task(self):
=======
def _execute_coroutine_step(self, coro: Coroutine[Any, Any, T]) -> None:
"""Execute or resume a coroutine task."""
try:
result = coro.send(None)
except StopIteration as e:
# The coroutine finished; store the result
self.set_result(e.value)
self._complete_task()
except Exception as e:
# The coroutine raised; store the exception
self.set_exception(e)
self._complete_task()
else:
# The coroutine yielded; suspend the task until it is resumed
executor = self._executor()
if executor is None:
raise RuntimeError(
'Task tried to reschedule but no executor was set: '
'tasks should only be initialized through executor.create_task()')
elif isinstance(result, Future):
# Schedule the task to resume when the future is done
self._add_resume_callback(result, executor)
elif result is None:
# The coroutine yielded None, schedule the task to resume in the next spin
executor._call_task_in_next_spin(self)
else:
raise TypeError(
f'Expected coroutine to yield a Future or None, got: {type(result)}')

def _add_resume_callback(self, future: Future[T], executor: 'Executor') -> None:
future_executor = future._executor()
if future_executor is None:
# The future is not associated with an executor yet, so associate it with ours
future._set_executor(executor)
elif future_executor is not executor:
raise RuntimeError('A task can only await futures associated with the same executor')

# The future is associated with the same executor, so we can resume the task directly
# in the done callback
future.add_done_callback(lambda _: self.__call__())

def _complete_task(self) -> None:
>>>>>>> 9695271 (Fix issues with resuming async tasks awaiting a future (#1469))
"""Cleanup after task finished."""
self._handler = None
self._args = None
Expand Down
38 changes: 7 additions & 31 deletions rclpy/src/rclpy/events_executor/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,15 @@ pybind11::object EventsExecutor::create_task(
// manual refcounting on it instead.
py::handle cb_task_handle = task;
cb_task_handle.inc_ref();
events_queue_.Enqueue(std::bind(&EventsExecutor::IterateTask, this, cb_task_handle));
call_task_in_next_spin(task);
return task;
}

void EventsExecutor::call_task_in_next_spin(pybind11::handle task)
{
events_queue_.Enqueue(std::bind(&EventsExecutor::IterateTask, this, task));
}

pybind11::object EventsExecutor::create_future()
{
using py::literals::operator""_a;
Expand Down Expand Up @@ -163,8 +168,6 @@ void EventsExecutor::spin(std::optional<double> timeout_sec, bool stop_after_use
throw std::runtime_error("Attempt to spin an already-spinning Executor");
}
stop_after_user_callback_ = stop_after_user_callback;
// Any blocked tasks may have become unblocked while we weren't looking.
PostOutstandingTasks();
// Release the GIL while we block. Any callbacks on the events queue that want to touch Python
// will need to reacquire it though.
py::gil_scoped_release gil_release;
Expand Down Expand Up @@ -346,8 +349,6 @@ void EventsExecutor::HandleSubscriptionReady(py::handle subscription, size_t num
got_none = true;
}
}

PostOutstandingTasks();
}

void EventsExecutor::HandleAddedTimer(py::handle timer) {timers_manager_.AddTimer(timer);}
Expand All @@ -374,7 +375,6 @@ void EventsExecutor::HandleTimerReady(py::handle timer)
} else if (stop_after_user_callback_) {
events_queue_.Stop();
}
PostOutstandingTasks();
}

void EventsExecutor::HandleAddedClient(py::handle client)
Expand Down Expand Up @@ -445,8 +445,6 @@ void EventsExecutor::HandleClientReady(py::handle client, size_t number_of_event
}
}
}

PostOutstandingTasks();
}

void EventsExecutor::HandleAddedService(py::handle service)
Expand Down Expand Up @@ -509,8 +507,6 @@ void EventsExecutor::HandleServiceReady(py::handle service, size_t number_of_eve
send_response(response, header);
}
}

PostOutstandingTasks();
}

void EventsExecutor::HandleAddedWaitable(py::handle waitable)
Expand Down Expand Up @@ -776,8 +772,6 @@ void EventsExecutor::HandleWaitableReady(
// execute() is an async method, we need a Task to run it
create_task(execute(data));
}

PostOutstandingTasks();
}

void EventsExecutor::IterateTask(py::handle task)
Expand Down Expand Up @@ -810,26 +804,7 @@ void EventsExecutor::IterateTask(py::handle task)
throw;
}
}
} else {
// Task needs more iteration. Store the handle and revisit it later after the next ready
// entity which may unblock it.
// TODO(bmartin427) This matches the behavior of SingleThreadedExecutor and avoids busy
// looping, but I don't love it because if the task is waiting on something other than an rcl
// entity (e.g. an asyncio sleep, or a Future triggered from another thread, or even another
// Task), there can be arbitrarily long latency before some rcl activity causes us to go
// revisit that Task.
blocked_tasks_.push_back(task);
}
}

void EventsExecutor::PostOutstandingTasks()
{
for (auto & task : blocked_tasks_) {
events_queue_.Enqueue(std::bind(&EventsExecutor::IterateTask, this, task));
}
// Clear the entire outstanding tasks list. Any tasks that need further iteration will re-add
// themselves during IterateTask().
blocked_tasks_.clear();
}

void EventsExecutor::HandleCallbackExceptionInNodeEntity(
Expand Down Expand Up @@ -888,6 +863,7 @@ void define_events_executor(py::object module)
.def(py::init<py::object>(), py::arg("context"))
.def_property_readonly("context", &EventsExecutor::get_context)
.def("create_task", &EventsExecutor::create_task, py::arg("callback"))
.def("_call_task_in_next_spin", &EventsExecutor::call_task_in_next_spin, py::arg("task"))
.def("create_future", &EventsExecutor::create_future)
.def("shutdown", &EventsExecutor::shutdown, py::arg("timeout_sec") = py::none())
.def("add_node", &EventsExecutor::add_node, py::arg("node"))
Expand Down
9 changes: 1 addition & 8 deletions rclpy/src/rclpy/events_executor/events_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class EventsExecutor
pybind11::object get_context() const {return rclpy_context_;}
pybind11::object create_task(
pybind11::object callback, pybind11::args args = {}, const pybind11::kwargs & kwargs = {});
void call_task_in_next_spin(pybind11::handle task);
pybind11::object create_future();
bool shutdown(std::optional<double> timeout_sec = {});
bool add_node(pybind11::object node);
Expand Down Expand Up @@ -148,11 +149,6 @@ class EventsExecutor
/// create_task() implementation for details.
void IterateTask(pybind11::handle task);

/// Posts a call to IterateTask() for every outstanding entry in tasks_; should be invoked from
/// other Handle*Ready() methods to check if any asynchronous Tasks have been unblocked by the
/// newly-handled event.
void PostOutstandingTasks();

void HandleCallbackExceptionInNodeEntity(
const pybind11::error_already_set &, pybind11::handle entity,
const std::string & node_entity_attr);
Expand Down Expand Up @@ -188,9 +184,6 @@ class EventsExecutor
pybind11::set services_;
pybind11::set waitables_;

/// Collection of asynchronous Tasks awaiting new events to further iterate.
std::vector<pybind11::handle> blocked_tasks_;

/// Cache for rcl pointers underlying each waitables_ entry, because those are harder to retrieve
/// than the other entity types.
std::unordered_map<pybind11::handle, WaitableSubEntities, PythonHasher> waitable_entities_;
Expand Down
Loading