Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions rclpy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ pybind11_add_module(_rclpy_pybind11
src/rclpy/destroyable.cpp
src/rclpy/duration.cpp
src/rclpy/clock_event.cpp
src/rclpy/events_executor/delayed_event_thread.cpp
src/rclpy/events_executor/events_executor.cpp
src/rclpy/events_executor/events_queue.cpp
src/rclpy/events_executor/rcl_support.cpp
src/rclpy/events_executor/timers_manager.cpp
src/rclpy/exceptions.cpp
src/rclpy/graph.cpp
src/rclpy/guard_condition.cpp
Expand Down Expand Up @@ -166,6 +171,7 @@ if(BUILD_TESTING)
test/test_create_node.py
test/test_create_while_spinning.py
test/test_destruction.py
test/test_events_executor.py
test/test_executor.py
test/test_expand_topic_name.py
test/test_guard_condition.py
Expand Down
15 changes: 15 additions & 0 deletions rclpy/rclpy/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright 2024-2025 Brad Martin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .events_executor import EventsExecutor # noqa: F401
43 changes: 43 additions & 0 deletions rclpy/rclpy/experimental/events_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright 2024-2025 Brad Martin
# Copyright 2024 Merlin Labs, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import faulthandler
import typing

import rclpy.executors
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy


# Try to look like we inherit from the rclpy Executor for type checking purposes without
# getting any of the code from the base class.
def EventsExecutor(*, context: typing.Optional[rclpy.Context] = None) -> rclpy.executors.Executor:
if context is None:
context = rclpy.get_default_context()

# For debugging purposes, if anything goes wrong in C++ make sure we also get a
# Python backtrace dumped with the crash.
faulthandler.enable()

ex = typing.cast(rclpy.executors.Executor, _rclpy.EventsExecutor(context))

# rclpy.Executor does this too. Note, the context itself is smart enough to check
# for bound methods, and check whether the instances they're bound to still exist at
# callback time, so we don't have to worry about tearing down this stale callback at
# destruction time.
# TODO(bmartin427) This should really be done inside of the EventsExecutor
# implementation itself, but I'm unable to figure out a pybind11 incantation that
# allows me to pass this bound method call from C++.
context.on_shutdown(ex.wake)

return ex
14 changes: 14 additions & 0 deletions rclpy/rclpy/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,20 @@ def add_done_callback(self, callback):
if invoke:
callback(self)

def remove_done_callback(self, callback) -> bool:
"""
Remove a previously-added done callback.

Returns true if the given callback was found and removed. Always fails if the Future was
already complete.
"""
with self._lock:
try:
self._callbacks.remove(callback)
except ValueError:
return False
return True


class Task(Future):
"""
Expand Down
3 changes: 3 additions & 0 deletions rclpy/src/rclpy/_rclpy_pybind11.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "destroyable.hpp"
#include "duration.hpp"
#include "clock_event.hpp"
#include "events_executor/events_executor.hpp"
#include "exceptions.hpp"
#include "graph.hpp"
#include "guard_condition.hpp"
Expand Down Expand Up @@ -236,4 +237,6 @@ PYBIND11_MODULE(_rclpy_pybind11, m) {
rclpy::define_signal_handler_api(m);
rclpy::define_clock_event(m);
rclpy::define_lifecycle_api(m);

rclpy::events_executor::define_events_executor(m);
}
72 changes: 72 additions & 0 deletions rclpy/src/rclpy/events_executor/delayed_event_thread.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2025 Brad Martin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "events_executor/delayed_event_thread.hpp"

#include <utility>

namespace rclpy
{
namespace events_executor
{

DelayedEventThread::DelayedEventThread(EventsQueue * events_queue)
: events_queue_(events_queue), thread_([this]() {RunThread();})
{
}

DelayedEventThread::~DelayedEventThread()
{
{
std::unique_lock<std::mutex> lock(mutex_);
done_ = true;
}
cv_.notify_one();
thread_.join();
}

void DelayedEventThread::EnqueueAt(
std::chrono::steady_clock::time_point when, std::function<void()> handler)
{
{
std::unique_lock<std::mutex> lock(mutex_);
when_ = when;
handler_ = handler;
}
cv_.notify_one();
}

void DelayedEventThread::RunThread()
{
std::unique_lock<std::mutex> lock(mutex_);
while (!done_) {
if (handler_) {
// Make sure we don't dispatch a stale wait if it changes while we're waiting.
const auto latched_when = when_;
if (
(std::cv_status::timeout == cv_.wait_until(lock, latched_when)) && handler_ &&
(when_ <= latched_when))
{
auto handler = std::move(handler_);
handler_ = {};
events_queue_->Enqueue(std::move(handler));
}
} else {
// Wait indefinitely until we get signaled that there's something worth looking at.
cv_.wait(lock);
}
}
}

} // namespace events_executor
} // namespace rclpy
63 changes: 63 additions & 0 deletions rclpy/src/rclpy/events_executor/delayed_event_thread.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2025 Brad Martin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLPY__EVENTS_EXECUTOR__DELAYED_EVENT_THREAD_HPP_
#define RCLPY__EVENTS_EXECUTOR__DELAYED_EVENT_THREAD_HPP_

#include <chrono>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <thread>

#include "events_executor/events_queue.hpp"

namespace rclpy
{
namespace events_executor
{

/// This object manages posting an event handler to an EventsQueue after a specified delay. The
/// current delay may be changed or canceled at any time. This is done by way of a self-contained
/// child thread to perform the wait.
class DelayedEventThread
{
public:
/// The pointer is aliased and must live for the lifetime of this object.
explicit DelayedEventThread(EventsQueue *);
~DelayedEventThread();

/// Schedules an event handler to be enqueued at the specified time point. Replaces any previous
/// wait and handler, which will never be dispatched if it has not been already.
void EnqueueAt(std::chrono::steady_clock::time_point when, std::function<void()> handler);

/// Cancels any previously-scheduled handler.
void Cancel() {EnqueueAt({}, {});}

private:
void RunThread();

EventsQueue * const events_queue_;
std::mutex mutex_;
bool done_{};
std::condition_variable cv_;
std::chrono::steady_clock::time_point when_;
std::function<void()> handler_;
std::thread thread_;
};

} // namespace events_executor
} // namespace rclpy

#endif // RCLPY__EVENTS_EXECUTOR__DELAYED_EVENT_THREAD_HPP_
Loading