diff --git a/src/agnocast_sample_application/CMakeLists.txt b/src/agnocast_sample_application/CMakeLists.txt index 1602e3fad..887916d7f 100644 --- a/src/agnocast_sample_application/CMakeLists.txt +++ b/src/agnocast_sample_application/CMakeLists.txt @@ -90,6 +90,12 @@ target_include_directories(no_rclcpp_client PRIVATE ${agnocastlib_INCLUDE_DIRS} ) +add_executable(sim_time_timer src/sim_time_timer.cpp) +ament_target_dependencies(sim_time_timer rclcpp agnocastlib) +target_include_directories(sim_time_timer PRIVATE + ${agnocastlib_INCLUDE_DIRS} +) + add_library(listener_component SHARED src/minimal_subscriber.cpp) ament_target_dependencies(listener_component rclcpp rclcpp_components agnocastlib agnocast_sample_interfaces) target_include_directories(listener_component PRIVATE ${agnocastlib_INCLUDE_DIRS}) @@ -151,6 +157,9 @@ install(TARGETS no_rclcpp_server install(TARGETS no_rclcpp_client DESTINATION lib/${PROJECT_NAME}) +install(TARGETS sim_time_timer + DESTINATION lib/${PROJECT_NAME}) + install(DIRECTORY launch DESTINATION share/${PROJECT_NAME}/ ) diff --git a/src/agnocast_sample_application/src/sim_time_timer.cpp b/src/agnocast_sample_application/src/sim_time_timer.cpp new file mode 100644 index 000000000..3dd13dee7 --- /dev/null +++ b/src/agnocast_sample_application/src/sim_time_timer.cpp @@ -0,0 +1,73 @@ +/** + * @brief Sample application to demonstrate create_timer with use_sim_time support + * + * This sample creates a timer using agnocast::Node::create_timer() which supports + * ROS_TIME (simulation time). When use_sim_time:=true, the timer will use the + * /clock topic time instead of wall clock time. + * + * Usage: + * # Terminal 1: Run with use_sim_time enabled + * ros2 run agnocast_sample_application sim_time_timer --ros-args -p use_sim_time:=true + * + * # Terminal 2: Publish clock messages + * ros2 topic pub /clock rosgraph_msgs/msg/Clock "{clock: {sec: 0, nanosec: 0}}" --once + * ros2 topic pub /clock rosgraph_msgs/msg/Clock "{clock: {sec: 1, nanosec: 0}}" --once + * ros2 topic pub /clock rosgraph_msgs/msg/Clock "{clock: {sec: 2, nanosec: 0}}" --once + * + * # Or run without use_sim_time (uses wall clock) + * ros2 run agnocast_sample_application sim_time_timer + */ + +#include "agnocast/agnocast.hpp" + +#include + +using namespace std::chrono_literals; + +class SimTimeTimerNode : public agnocast::Node +{ +public: + SimTimeTimerNode() : agnocast::Node("sim_time_timer_node") + { + // Log whether we're using sim time + const bool use_sim_time = this->get_parameter("use_sim_time").as_bool(); + RCLCPP_INFO( + this->get_logger(), "Starting timer node (use_sim_time: %s)", + use_sim_time ? "true" : "false"); + + // Create timer using create_timer() which supports ROS_TIME + // This timer will respect use_sim_time parameter + timer_ = this->create_timer(500ms, std::bind(&SimTimeTimerNode::timer_callback, this)); + + RCLCPP_INFO( + this->get_logger(), "Timer created with period 500ms, clock type: %s", + timer_->is_steady() ? "STEADY_TIME" : "ROS_TIME"); + } + +private: + void timer_callback() + { + const auto now = this->now(); + const int64_t sec = now.seconds(); + const int64_t nsec = now.nanoseconds() % 1000000000; + + RCLCPP_INFO( + this->get_logger(), "Timer callback! Current time: %ld.%09ld (count: %d)", sec, nsec, + callback_count_++); + } + + agnocast::TimerBase::SharedPtr timer_; + int callback_count_ = 0; +}; + +int main(int argc, char * argv[]) +{ + agnocast::init(argc, argv); + + agnocast::AgnocastOnlySingleThreadedExecutor executor; + auto node = std::make_shared(); + executor.add_node(node); + executor.spin(); + + return 0; +} diff --git a/src/agnocastlib/include/agnocast/agnocast_epoll.hpp b/src/agnocastlib/include/agnocast/agnocast_epoll.hpp index 9f22135b9..b2390e26e 100644 --- a/src/agnocastlib/include/agnocast/agnocast_epoll.hpp +++ b/src/agnocastlib/include/agnocast/agnocast_epoll.hpp @@ -6,6 +6,7 @@ #include #include +#include #include namespace agnocast @@ -16,7 +17,8 @@ struct AgnocastExecutable; extern std::atomic need_epoll_updates; constexpr uint32_t TIMER_EVENT_FLAG = 0x80000000; -constexpr uint32_t SHUTDOWN_EVENT_FLAG = 0x40000000; +constexpr uint32_t CLOCK_EVENT_FLAG = 0x40000000; // For clock_eventfd events (ROS_TIME timers) +constexpr uint32_t SHUTDOWN_EVENT_FLAG = 0x20000000; // For shutdown events (AgnocastOnlyExecutor) /// @return true if shutdown event detected, false otherwise bool wait_and_handle_epoll_event( @@ -81,14 +83,37 @@ void prepare_epoll_impl( continue; } - struct epoll_event ev = {}; - ev.events = EPOLLIN; - ev.data.u32 = timer_id | TIMER_EVENT_FLAG; + // Register timerfd (wall clock based firing) + { + std::shared_lock fd_lock(timer_info.fd_mutex); + if (timer_info.timer_fd >= 0) { + struct epoll_event ev = {}; + ev.events = EPOLLIN; + ev.data.u32 = timer_id | TIMER_EVENT_FLAG; + + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_info.timer_fd, &ev) == -1) { + if (errno != EEXIST) { // EEXIST means already registered, which is fine + RCLCPP_ERROR(logger, "epoll_ctl failed for timer: %s", strerror(errno)); + close(agnocast_fd); + exit(EXIT_FAILURE); + } + } + } + } - if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_info.timer_fd, &ev) == -1) { - RCLCPP_ERROR(logger, "epoll_ctl failed for timer: %s", strerror(errno)); - close(agnocast_fd); - exit(EXIT_FAILURE); + // Register clock_eventfd for ROS_TIME timers (simulation time support) + if (timer_info.clock_eventfd >= 0) { + struct epoll_event clock_ev = {}; + clock_ev.events = EPOLLIN; + clock_ev.data.u32 = timer_id | CLOCK_EVENT_FLAG; + + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, timer_info.clock_eventfd, &clock_ev) == -1) { + if (errno != EEXIST) { // EEXIST means already registered, which is fine + RCLCPP_ERROR(logger, "epoll_ctl failed for clock_eventfd: %s", strerror(errno)); + close(agnocast_fd); + exit(EXIT_FAILURE); + } + } } timer_info.need_epoll_update = false; diff --git a/src/agnocastlib/include/agnocast/agnocast_timer_info.hpp b/src/agnocastlib/include/agnocast/agnocast_timer_info.hpp index 3abc292cb..d47657e88 100644 --- a/src/agnocastlib/include/agnocast/agnocast_timer_info.hpp +++ b/src/agnocastlib/include/agnocast/agnocast_timer_info.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include namespace agnocast @@ -19,25 +20,38 @@ struct TimerInfo { ~TimerInfo(); + // Mutex to protect timer_fd and clock_eventfd access. + // Use shared_lock for read operations (read(), epoll_ctl()). + // Use unique_lock for write operations (close(), create_timer_fd()). + std::shared_mutex fd_mutex; + + uint32_t timer_id = 0; int timer_fd = -1; + int clock_eventfd = -1; // eventfd to wake epoll on clock updates (ROS_TIME only) std::weak_ptr timer; rclcpp::CallbackGroup::SharedPtr callback_group; std::atomic last_call_time_ns; std::atomic next_call_time_ns; + std::atomic time_credit{ + 0}; // Credit for time elapsed before ROS time is activated/deactivated std::chrono::nanoseconds period; bool need_epoll_update = true; rclcpp::Clock::SharedPtr clock; + rclcpp::JumpHandler::SharedPtr jump_handler; }; extern std::mutex id2_timer_info_mtx; extern std::unordered_map> id2_timer_info; extern std::atomic next_timer_id; -int create_timer_fd(uint32_t timer_id, std::chrono::nanoseconds period); +int create_timer_fd( + uint32_t timer_id, std::chrono::nanoseconds period, rcl_clock_type_t clock_type); void handle_timer_event(TimerInfo & timer_info); +void handle_clock_event(TimerInfo & timer_info); + uint32_t allocate_timer_id(); void register_timer_info( diff --git a/src/agnocastlib/src/agnocast_epoll.cpp b/src/agnocastlib/src/agnocast_epoll.cpp index 2442cffbe..1ab7377f6 100644 --- a/src/agnocastlib/src/agnocast_epoll.cpp +++ b/src/agnocastlib/src/agnocast_epoll.cpp @@ -40,7 +40,7 @@ bool wait_and_handle_epoll_event( } if ((event_id & TIMER_EVENT_FLAG) != 0U) { - // Timer event + // Timer event (timerfd fired) const uint32_t timer_id = event_id & ~TIMER_EVENT_FLAG; rclcpp::CallbackGroup::SharedPtr callback_group; @@ -64,6 +64,35 @@ bool wait_and_handle_epoll_event( const std::shared_ptr> callable = std::make_shared>([timer_info]() { handle_timer_event(*timer_info); }); + { + std::lock_guard ready_lock{ready_agnocast_executables_mutex}; + ready_agnocast_executables.emplace_back(AgnocastExecutable{callable, callback_group}); + } + } else if ((event_id & CLOCK_EVENT_FLAG) != 0U) { + // Clock event (ROS_TIME clock updated via time jump callback) + const uint32_t timer_id = event_id & ~CLOCK_EVENT_FLAG; + rclcpp::CallbackGroup::SharedPtr callback_group; + + std::shared_ptr timer_info; + { + std::lock_guard lock(id2_timer_info_mtx); + const auto it = id2_timer_info.find(timer_id); + if (it == id2_timer_info.end()) { + RCLCPP_ERROR(logger, "Agnocast internal implementation error: timer info cannot be found"); + close(agnocast_fd); + exit(EXIT_FAILURE); + } + timer_info = it->second; + if (!timer_info->timer.lock()) { + return; // Timer object has been destroyed + } + callback_group = timer_info->callback_group; + } + + // Create a callable that handles the clock event + const std::shared_ptr> callable = + std::make_shared>([timer_info]() { handle_clock_event(*timer_info); }); + { std::lock_guard ready_lock{ready_agnocast_executables_mutex}; ready_agnocast_executables.emplace_back(AgnocastExecutable{callable, callback_group}); diff --git a/src/agnocastlib/src/agnocast_timer_info.cpp b/src/agnocastlib/src/agnocast_timer_info.cpp index a12efcfe3..37dbd2796 100644 --- a/src/agnocastlib/src/agnocast_timer_info.cpp +++ b/src/agnocastlib/src/agnocast_timer_info.cpp @@ -3,6 +3,7 @@ #include "agnocast/agnocast_epoll.hpp" #include "agnocast/agnocast_utils.hpp" +#include #include #include @@ -17,16 +18,119 @@ std::mutex id2_timer_info_mtx; std::unordered_map> id2_timer_info; std::atomic next_timer_id{0}; +void handle_pre_time_jump(TimerInfo & timer_info) +{ + const int64_t now_ns = timer_info.clock->now().nanoseconds(); + if (now_ns == 0) { + // No time credit if clock is uninitialized + return; + } + // Source of time may be changing, save elapsed duration pre jump + // so the timer only waits the remainder in the new epoch (if clock_change occurs) + const int64_t next_call_ns = timer_info.next_call_time_ns.load(std::memory_order_relaxed); + timer_info.time_credit.store(next_call_ns - now_ns, std::memory_order_relaxed); +} + +void handle_post_time_jump(TimerInfo & timer_info, const rcl_time_jump_t & jump) +{ + const int64_t now_ns = timer_info.clock->now().nanoseconds(); + const int64_t last_call_ns = timer_info.last_call_time_ns.load(std::memory_order_relaxed); + const int64_t next_call_ns = timer_info.next_call_time_ns.load(std::memory_order_relaxed); + const int64_t period_ns = timer_info.period.count(); + + if (jump.clock_change == RCL_ROS_TIME_ACTIVATED) { + // ROS time activated: close timerfd (simulation time will use clock_eventfd) + { + std::unique_lock lock(timer_info.fd_mutex); + if (timer_info.timer_fd >= 0) { + close(timer_info.timer_fd); + timer_info.timer_fd = -1; + } + } + + if (now_ns == 0) { + return; + } + const int64_t time_credit = timer_info.time_credit.exchange(0, std::memory_order_relaxed); + if (time_credit != 0) { + timer_info.next_call_time_ns.store( + now_ns - time_credit + period_ns, std::memory_order_relaxed); + timer_info.last_call_time_ns.store(now_ns - time_credit, std::memory_order_relaxed); + } + } else if (jump.clock_change == RCL_ROS_TIME_DEACTIVATED) { + // ROS time deactivated: recreate timerfd (back to system time) + { + std::unique_lock lock(timer_info.fd_mutex); + if (timer_info.timer_fd < 0) { + timer_info.timer_fd = create_timer_fd(timer_info.timer_id, timer_info.period, RCL_ROS_TIME); + timer_info.need_epoll_update = true; + need_epoll_updates.store(true); + } + } + + if (now_ns == 0) { + return; + } + const int64_t time_credit = timer_info.time_credit.exchange(0, std::memory_order_relaxed); + if (time_credit != 0) { + timer_info.next_call_time_ns.store( + now_ns - time_credit + period_ns, std::memory_order_relaxed); + timer_info.last_call_time_ns.store(now_ns - time_credit, std::memory_order_relaxed); + } + } else if (next_call_ns <= now_ns) { + // Post forward jump and timer is ready - wake up epoll + timer_info.time_credit.store(0, std::memory_order_relaxed); // Clear unused time_credit + if (timer_info.clock_eventfd >= 0) { + const uint64_t val = 1; + [[maybe_unused]] auto _ = write(timer_info.clock_eventfd, &val, sizeof(val)); + } + } else if (now_ns < last_call_ns) { + // Post backwards time jump that went further back than 1 period + // Next callback should happen after 1 period + timer_info.time_credit.store(0, std::memory_order_relaxed); // Clear unused time_credit + timer_info.next_call_time_ns.store(now_ns + period_ns, std::memory_order_relaxed); + timer_info.last_call_time_ns.store(now_ns, std::memory_order_relaxed); + } else { + // Other cases (small forward jump where timer is not ready) + timer_info.time_credit.store(0, std::memory_order_relaxed); // Clear unused time_credit + } +} + +void setup_time_jump_callback(TimerInfo & timer_info, const rclcpp::Clock::SharedPtr & clock) +{ + if (clock->get_clock_type() != RCL_ROS_TIME) { + return; + } + + rcl_jump_threshold_t threshold; + threshold.on_clock_change = true; + threshold.min_forward.nanoseconds = 1; + threshold.min_backward.nanoseconds = -1; + + timer_info.jump_handler = clock->create_jump_callback( + [&timer_info]() { handle_pre_time_jump(timer_info); }, + [&timer_info](const rcl_time_jump_t & jump) { handle_post_time_jump(timer_info, jump); }, + threshold); +} + TimerInfo::~TimerInfo() { if (timer_fd >= 0) { close(timer_fd); } + if (clock_eventfd >= 0) { + close(clock_eventfd); + } } -int create_timer_fd(uint32_t timer_id, std::chrono::nanoseconds period) +int create_timer_fd(uint32_t timer_id, std::chrono::nanoseconds period, rcl_clock_type_t clock_type) { - int timer_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); + // Use CLOCK_MONOTONIC for STEADY_TIME, CLOCK_REALTIME for others (SYSTEM_TIME, ROS_TIME) + // This matches rclcpp's behavior where: + // - RCL_STEADY_TIME uses monotonic clock + // - RCL_SYSTEM_TIME and RCL_ROS_TIME use system clock + const int clockid = (clock_type == RCL_STEADY_TIME) ? CLOCK_MONOTONIC : CLOCK_REALTIME; + int timer_fd = timerfd_create(clockid, TFD_NONBLOCK | TFD_CLOEXEC); if (timer_fd == -1) { throw std::runtime_error( "timerfd_create failed for timer_id=" + std::to_string(timer_id) + ": " + @@ -70,17 +174,11 @@ void register_timer_info( uint32_t timer_id, const std::shared_ptr & timer, std::chrono::nanoseconds period, const rclcpp::CallbackGroup::SharedPtr & callback_group, const rclcpp::Clock::SharedPtr & clock) { - if (clock->get_clock_type() != RCL_STEADY_TIME) { - throw std::runtime_error( - "Only RCL_STEADY_TIME is currently supported. " - "RCL_SYSTEM_TIME and RCL_ROS_TIME are not yet implemented."); - } - - const int timer_fd = create_timer_fd(timer_id, period); + const bool is_ros_time = (clock->get_clock_type() == RCL_ROS_TIME); const int64_t now_ns = clock->now().nanoseconds(); auto timer_info = std::make_shared(); - timer_info->timer_fd = timer_fd; + timer_info->timer_id = timer_id; timer_info->timer = timer; timer_info->last_call_time_ns.store(now_ns, std::memory_order_relaxed); timer_info->next_call_time_ns.store(now_ns + period.count(), std::memory_order_relaxed); @@ -89,6 +187,27 @@ void register_timer_info( timer_info->need_epoll_update = true; timer_info->clock = clock; + if (is_ros_time) { + // ROS_TIME timers use clock_eventfd for simulation time support + timer_info->clock_eventfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (timer_info->clock_eventfd == -1) { + throw std::runtime_error( + "eventfd creation failed for timer_id=" + std::to_string(timer_id) + ": " + + std::strerror(errno)); + } + + // Only create timerfd if ros_time is not active (system time mode) + // If ros_time is already active, timer will be driven by clock_eventfd + if (!clock->ros_time_is_active()) { + timer_info->timer_fd = create_timer_fd(timer_id, period, clock->get_clock_type()); + } + } else { + // Non-ROS_TIME timers always use timerfd + timer_info->timer_fd = create_timer_fd(timer_id, period, clock->get_clock_type()); + } + + setup_time_jump_callback(*timer_info, clock); + { std::lock_guard lock(id2_timer_info_mtx); id2_timer_info[timer_id] = std::move(timer_info); @@ -97,13 +216,23 @@ void register_timer_info( need_epoll_updates.store(true); } +bool check_and_execute_timer(TimerInfo & timer_info); + void handle_timer_event(TimerInfo & timer_info) { // TODO(Koichi98): Add canceled check here // Read the number of expirations to clear the event uint64_t expirations = 0; - const ssize_t ret = read(timer_info.timer_fd, &expirations, sizeof(expirations)); + ssize_t ret = -1; + + { + std::shared_lock lock(timer_info.fd_mutex); + if (timer_info.timer_fd < 0) { + return; // Timer fd was closed (ROS time activated) + } + ret = read(timer_info.timer_fd, &expirations, sizeof(expirations)); + } if (ret == -1) { if (errno != EAGAIN && errno != EWOULDBLOCK) { @@ -113,36 +242,68 @@ void handle_timer_event(TimerInfo & timer_info) } if (expirations > 0) { - auto timer = timer_info.timer.lock(); - if (!timer) { - return; // Timer object has been destroyed + check_and_execute_timer(timer_info); + } +} + +void handle_clock_event(TimerInfo & timer_info) +{ + // Read eventfd to clear the event + uint64_t val = 0; + const ssize_t ret = read(timer_info.clock_eventfd, &val, sizeof(val)); + + if (ret == -1) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + RCLCPP_WARN(logger, "Failed to read clock eventfd: %s", strerror(errno)); + return; } + } - const int64_t now_ns = timer_info.clock->now().nanoseconds(); + if (val > 0) { + check_and_execute_timer(timer_info); + } +} - timer_info.last_call_time_ns.store(now_ns, std::memory_order_relaxed); +// Check if timer is ready and execute callback if so +// Returns true if callback was executed +bool check_and_execute_timer(TimerInfo & timer_info) +{ + auto timer = timer_info.timer.lock(); + if (!timer) { + return false; // Timer object has been destroyed + } - const int64_t period_ns = timer_info.period.count(); - int64_t next_call_time_ns = - timer_info.next_call_time_ns.load(std::memory_order_relaxed) + period_ns; - - // in case the timer has missed at least one cycle - if (next_call_time_ns < now_ns) { - if (period_ns == 0) { - // a timer with a period of zero is considered always ready - next_call_time_ns = now_ns; - } else { - // move the next call time forward by as many periods as necessary - const int64_t now_ahead = now_ns - next_call_time_ns; - // rounding up without overflow - const int64_t periods_ahead = 1 + (now_ahead - 1) / period_ns; - next_call_time_ns += periods_ahead * period_ns; - } - } - timer_info.next_call_time_ns.store(next_call_time_ns, std::memory_order_relaxed); + const int64_t now_ns = timer_info.clock->now().nanoseconds(); + const int64_t next_call_ns = timer_info.next_call_time_ns.load(std::memory_order_relaxed); + const int64_t period_ns = timer_info.period.count(); + + // Check if timer is ready (for simulation time support) + if (now_ns < next_call_ns) { + return false; // Not ready yet + } - timer->execute_callback(); + // Update timing + timer_info.last_call_time_ns.store(now_ns, std::memory_order_relaxed); + + int64_t next_call_time_ns = next_call_ns + period_ns; + + // in case the timer has missed at least one cycle + if (next_call_time_ns < now_ns) { + if (period_ns == 0) { + // a timer with a period of zero is considered always ready + next_call_time_ns = now_ns; + } else { + // move the next call time forward by as many periods as necessary + const int64_t now_ahead = now_ns - next_call_time_ns; + // rounding up without overflow + const int64_t periods_ahead = 1 + (now_ahead - 1) / period_ns; + next_call_time_ns += periods_ahead * period_ns; + } } + timer_info.next_call_time_ns.store(next_call_time_ns, std::memory_order_relaxed); + + timer->execute_callback(); + return true; } void unregister_timer_info(uint32_t timer_id)