From 02e6f51809568da5b79f06461a60621d5ec445dc Mon Sep 17 00:00:00 2001 From: juntuanlu Date: Mon, 4 Aug 2025 16:09:52 +0800 Subject: [PATCH 1/2] iox-#1292 Fix bug when system time rollback --- .../concurrent/condition_variable.hpp | 291 ++++++++++++++++++ .../internal/concurrent/periodic_task.hpp | 8 +- .../internal/concurrent/periodic_task.inl | 34 +- .../iceoryx_hoofs/platform/semaphore.hpp | 19 ++ .../iceoryx_hoofs/platform/semaphore.hpp | 19 ++ .../source/posix_wrapper/semaphore.cpp | 5 + 6 files changed, 366 insertions(+), 10 deletions(-) create mode 100644 iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/condition_variable.hpp diff --git a/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/condition_variable.hpp b/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/condition_variable.hpp new file mode 100644 index 0000000000..6b3fc3fc31 --- /dev/null +++ b/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/condition_variable.hpp @@ -0,0 +1,291 @@ +// add by juntuan.lu + +#pragma once + +#include +#include + +#ifdef __unix__ + +#include + +#include +#include + +namespace stdext { + +class ConditionVariable final { + public: + using native_handle_type = pthread_cond_t*; + + /** + * @brief Deleted copy constructor to ensure the class is non-copyable. + */ + ConditionVariable(const ConditionVariable&) noexcept = delete; + + /** + * @brief Deleted copy assignment operator to ensure the class is non-assignable. + */ + ConditionVariable& operator=(const ConditionVariable&) noexcept = delete; + + /** + * @brief Constructs a new condition variable object. + */ + ConditionVariable() noexcept; + + /** + * @brief Destroys the condition variable object. + */ + ~ConditionVariable() noexcept; + + /** + * @brief Unblocks one of the threads that are waiting on the condition variable. + */ + void notify_one() noexcept; + + /** + * @brief Unblocks all of the threads that are waiting on the condition variable. + */ + void notify_all() noexcept; + + /** + * @brief Blocks the calling thread, waiting for the condition variable to be notified. + * + * @param lock The unique lock associated with the condition variable. + */ + void wait(std::unique_lock& lock) noexcept; + + /** + * @brief Blocks the calling thread, waiting for the condition variable to be notified and a predicate to be true. + * + * @param lock The unique lock associated with the condition variable. + * @param p The predicate to be evaluated. + */ + template + void wait(std::unique_lock& lock, PredicateT p) noexcept; + + /** + * @brief Blocks the calling thread, waiting for the condition variable to be notified or a specific time point to be + * reached. + * + * @param lock The unique lock associated with the condition variable. + * @param atime The time point to wait until. + * @return `std::cv_status::no_timeout` if the condition variable was notified, `std::cv_status::timeout` otherwise. + */ + template + std::cv_status wait_until(std::unique_lock& lock, + const std::chrono::time_point& atime) noexcept; + + /** + * @brief Blocks the calling thread, waiting for the condition variable to be notified or a specific time point to be + * reached. + * + * @param lock The unique lock associated with the condition variable. + * @param atime The time point to wait until. + * @return `std::cv_status::no_timeout` if the condition variable was notified, `std::cv_status::timeout` otherwise. + */ + template + std::cv_status wait_until(std::unique_lock& lock, + const std::chrono::time_point& atime) noexcept; + + /** + * @brief Blocks the calling thread, waiting for the condition variable to be notified or a specific time point to be + * reached. + * + * @param lock The unique lock associated with the condition variable. + * @param atime The time point to wait until. + * @return `std::cv_status::no_timeout` if the condition variable was notified, `std::cv_status::timeout` otherwise. + */ + template + std::cv_status wait_until(std::unique_lock& lock, + const std::chrono::time_point& atime) noexcept; + + /** + * @brief Blocks the calling thread, waiting for the condition variable to be notified or a specific time point to be + * reached, and a predicate to be true. + * + * @param lock The unique lock associated with the condition variable. + * @param atime The time point to wait until. + * @param p The predicate to be evaluated. + * @return `true` if the predicate was satisfied before the time point was reached, `false` otherwise. + */ + template + bool wait_until(std::unique_lock& lock, const std::chrono::time_point& atime, + PredicateT p) noexcept; + + /** + * @brief Blocks the calling thread, waiting for the condition variable to be notified or a specific duration to + * elapse. + * + * @param lock The unique lock associated with the condition variable. + * @param rtime The duration to wait for. + * @return `std::cv_status::no_timeout` if the condition variable was notified, `std::cv_status::timeout` otherwise. + */ + template + std::cv_status wait_for(std::unique_lock& lock, + const std::chrono::duration& rtime) noexcept; + + /** + * @brief Blocks the calling thread, waiting for the condition variable to be notified or a specific duration to + * elapse, and a predicate to be true. + * + * @param lock The unique lock associated with the condition variable. + * @param rtime The duration to wait for. + * @param p The predicate to be evaluated. + * @return `true` if the predicate was satisfied before the time duration was reached, `false` otherwise. + */ + template + bool wait_for(std::unique_lock& lock, const std::chrono::duration& rtime, + PredicateT p) noexcept; + + /** + * @brief Retrieves the native handle of the condition variable. + * + * @return The native handle of the condition variable. + */ + [[nodiscard]] native_handle_type native_handle() noexcept; + + private: + template + static constexpr ToDurT ceil(const std::chrono::duration& d) noexcept; + + template + static constexpr TpT ceil_impl(const TpT& t, const UpT& u) noexcept; + + template + std::cv_status wait_until_impl(std::unique_lock& lock, + const std::chrono::time_point& atime) noexcept; + + pthread_cond_t cond_{}; +}; + +//////////////////////////////////////////////////////////////// +/// Details +//////////////////////////////////////////////////////////////// + +inline ConditionVariable::ConditionVariable() noexcept { + pthread_condattr_t attr; + + pthread_condattr_init(&attr); + pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); + pthread_cond_init(&cond_, &attr); + pthread_condattr_destroy(&attr); +} + +inline ConditionVariable::~ConditionVariable() noexcept { pthread_cond_destroy(&cond_); } + +inline void ConditionVariable::notify_one() noexcept { pthread_cond_signal(&cond_); } + +inline void ConditionVariable::notify_all() noexcept { pthread_cond_broadcast(&cond_); } + +inline void ConditionVariable::wait(std::unique_lock& lock) noexcept { + pthread_cond_wait(&cond_, lock.mutex()->native_handle()); +} + +template +inline void ConditionVariable::wait(std::unique_lock& lock, PredicateT p) noexcept { + while (!p()) { + wait(lock); + } +} + +template +inline std::cv_status ConditionVariable::wait_until( + std::unique_lock& lock, + const std::chrono::time_point& atime) noexcept { + return wait_until_impl(lock, atime); +} + +template +inline std::cv_status ConditionVariable::wait_until( + std::unique_lock& lock, + const std::chrono::time_point& atime) noexcept { + return wait_until(lock, atime); +} + +template +inline std::cv_status ConditionVariable::wait_until(std::unique_lock& lock, + const std::chrono::time_point& atime) noexcept { + const typename ClockT::time_point c_entry = ClockT::now(); + const std::chrono::steady_clock::time_point s_entry = std::chrono::steady_clock::now(); + const auto delta = atime - c_entry; + const auto s_atime = s_entry + ceil(delta); + + if (wait_until_impl(lock, s_atime) == std::cv_status::no_timeout) { + return std::cv_status::no_timeout; + } + + if (ClockT::now() < atime) { + return std::cv_status::no_timeout; + } + + return std::cv_status::timeout; +} + +template +inline bool ConditionVariable::wait_until(std::unique_lock& lock, + const std::chrono::time_point& atime, + PredicateT p) noexcept { + while (!p()) { + if (wait_until(lock, atime) == std::cv_status::timeout) { + return p(); + } + } + + return true; +} + +template +inline std::cv_status ConditionVariable::wait_for(std::unique_lock& lock, + const std::chrono::duration& rtime) noexcept { + return wait_until(lock, std::chrono::steady_clock::now() + ceil(rtime)); +} + +template +inline bool ConditionVariable::wait_for(std::unique_lock& lock, + const std::chrono::duration& rtime, PredicateT p) noexcept { + return wait_until(lock, std::chrono::steady_clock::now() + ceil(rtime), + std::move(p)); +} + +inline ConditionVariable::native_handle_type ConditionVariable::native_handle() noexcept { return &cond_; } + +template +inline constexpr ToDurT ConditionVariable::ceil(const std::chrono::duration& d) noexcept { // NOLINT + return ceil_impl(std::chrono::duration_cast(d), d); +} + +template +inline constexpr TpT ConditionVariable::ceil_impl(const TpT& t, const UpT& u) noexcept { // NOLINT + return (t < u) ? (t + TpT{1}) : t; +} + +template +inline std::cv_status ConditionVariable::wait_until_impl( + std::unique_lock& lock, + const std::chrono::time_point& atime) noexcept { + auto s = std::chrono::time_point_cast(atime); + auto ns = std::chrono::duration_cast(atime - s); + + struct timespec ts = {static_cast(s.time_since_epoch().count()), // NOLINT + static_cast(ns.count())}; // NOLINT + + pthread_cond_timedwait(&cond_, lock.mutex()->native_handle(), &ts); + + return (std::chrono::steady_clock::now() < atime ? std::cv_status::no_timeout : std::cv_status::timeout); +} + +using condition_variable = ConditionVariable; // NOLINT + +} // namespace stdext + +#else + +namespace stdext { + +using ConditionVariable = std::condition_variable; // NOLINT +using condition_variable = ConditionVariable; // NOLINT + +} // namespace stdext + +#endif diff --git a/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/periodic_task.hpp b/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/periodic_task.hpp index e63c137295..139b3eb24b 100644 --- a/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/periodic_task.hpp +++ b/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/periodic_task.hpp @@ -23,9 +23,12 @@ #include "iceoryx_hoofs/posix_wrapper/thread.hpp" #include - +#include +#include #include +#include "./condition_variable.hpp" + namespace iox { namespace concurrent @@ -124,6 +127,9 @@ class PeriodicTask /// @todo use a refactored posix::Timer object once available posix::Semaphore m_stop{posix::Semaphore::create(posix::CreateUnnamedSingleProcessSemaphore, 0U).value()}; std::thread m_taskExecutor; + std::atomic_bool m_running_flag{false}; + std::mutex m_mtx; + stdext::condition_variable m_cv; }; } // namespace concurrent diff --git a/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/periodic_task.inl b/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/periodic_task.inl index ac885b55a4..2f008373c2 100644 --- a/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/periodic_task.inl +++ b/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/periodic_task.inl @@ -52,6 +52,7 @@ template inline void PeriodicTask::start(const units::Duration interval) noexcept { stop(); + m_running_flag.store(true); m_interval = interval; m_taskExecutor = std::thread(&PeriodicTask::run, this); posix::setThreadName(m_taskExecutor.native_handle(), m_taskName); @@ -60,6 +61,12 @@ inline void PeriodicTask::start(const units::Duration interval) noexcept template inline void PeriodicTask::stop() noexcept { + if(m_running_flag.load()) { + std::unique_lock lock(m_mtx); + m_running_flag.store(false); + m_cv.notify_one(); + } + if (m_taskExecutor.joinable()) { cxx::Expects(!m_stop.post().has_error()); @@ -76,17 +83,26 @@ inline bool PeriodicTask::isActive() const noexcept template inline void PeriodicTask::run() noexcept { - posix::SemaphoreWaitState waitState = posix::SemaphoreWaitState::NO_TIMEOUT; - do - { - IOX_DISCARD_RESULT(m_callable()); + // posix::SemaphoreWaitState waitState = posix::SemaphoreWaitState::NO_TIMEOUT; + // do + // { + // IOX_DISCARD_RESULT(m_callable()); - /// @todo use a refactored posix::Timer::wait method returning TIMER_TICK and TIMER_STOPPED once available - auto waitResult = m_stop.timedWait(m_interval); - cxx::Expects(!waitResult.has_error()); + // /// @todo use a refactored posix::Timer::wait method returning TIMER_TICK and TIMER_STOPPED once available + // auto waitResult = m_stop.timedWait(m_interval); + // cxx::Expects(!waitResult.has_error()); - waitState = waitResult.value(); - } while (waitState == posix::SemaphoreWaitState::TIMEOUT); + // waitState = waitResult.value(); + // } while (waitState == posix::SemaphoreWaitState::TIMEOUT); + + std::unique_lock lock(m_mtx); + for(;;) { + IOX_DISCARD_RESULT(m_callable()); + bool ret = m_cv.wait_for(lock, std::chrono::milliseconds(m_interval.toMilliseconds()), [this]() -> bool { return !m_running_flag.load(); }); + if (ret) { + break; + } + } } } // namespace concurrent diff --git a/iceoryx_hoofs/platform/linux/include/iceoryx_hoofs/platform/semaphore.hpp b/iceoryx_hoofs/platform/linux/include/iceoryx_hoofs/platform/semaphore.hpp index 7b5d22be9a..2f8c198d03 100644 --- a/iceoryx_hoofs/platform/linux/include/iceoryx_hoofs/platform/semaphore.hpp +++ b/iceoryx_hoofs/platform/linux/include/iceoryx_hoofs/platform/semaphore.hpp @@ -18,6 +18,21 @@ #define IOX_HOOFS_LINUX_PLATFORM_SEMAPHORE_HPP #include +#include + +#ifndef _XOPEN_SOURCE +#define _XOPEN_SOURCE 600 +#endif + +#if defined(__USE_XOPEN2K) +#ifndef IOX_HAS_SEM_CLOCKWAIT +#define IOX_HAS_SEM_CLOCKWAIT +#endif +#endif + +#ifndef IOX_HAS_SEM_CLOCKWAIT +#warning "Not support sem_clockwait!" +#endif using iox_sem_t = sem_t; @@ -43,7 +58,11 @@ inline int iox_sem_trywait(iox_sem_t* sem) inline int iox_sem_timedwait(iox_sem_t* sem, const struct timespec* abs_timeout) { +#ifdef IOX_HAS_SEM_CLOCKWAIT + return sem_clockwait(sem, CLOCK_MONOTONIC, abs_timeout); +#else return sem_timedwait(sem, abs_timeout); +#endif } inline int iox_sem_close(iox_sem_t* sem) diff --git a/iceoryx_hoofs/platform/qnx/include/iceoryx_hoofs/platform/semaphore.hpp b/iceoryx_hoofs/platform/qnx/include/iceoryx_hoofs/platform/semaphore.hpp index 2fbdabb72b..6ca9ee0388 100644 --- a/iceoryx_hoofs/platform/qnx/include/iceoryx_hoofs/platform/semaphore.hpp +++ b/iceoryx_hoofs/platform/qnx/include/iceoryx_hoofs/platform/semaphore.hpp @@ -18,6 +18,21 @@ #define IOX_HOOFS_QNX_PLATFORM_SEMAPHORE_HPP #include +#include + +#ifndef _XOPEN_SOURCE +#define _XOPEN_SOURCE 600 +#endif + +#if defined(__USE_XOPEN2K) +#ifndef IOX_HAS_SEM_CLOCKWAIT +#define IOX_HAS_SEM_CLOCKWAIT +#endif +#endif + +#ifndef IOX_HAS_SEM_CLOCKWAIT +#warning "Not support sem_clockwait!" +#endif using iox_sem_t = sem_t; @@ -43,7 +58,11 @@ inline int iox_sem_trywait(iox_sem_t* sem) inline int iox_sem_timedwait(iox_sem_t* sem, const struct timespec* abs_timeout) { +#ifdef IOX_HAS_SEM_CLOCKWAIT + return sem_clockwait(sem, CLOCK_MONOTONIC, abs_timeout); +#else return sem_timedwait(sem, abs_timeout); +#endif } inline int iox_sem_close(iox_sem_t* sem) diff --git a/iceoryx_hoofs/source/posix_wrapper/semaphore.cpp b/iceoryx_hoofs/source/posix_wrapper/semaphore.cpp index 71bfa5aa14..02d2d7333e 100644 --- a/iceoryx_hoofs/source/posix_wrapper/semaphore.cpp +++ b/iceoryx_hoofs/source/posix_wrapper/semaphore.cpp @@ -110,7 +110,12 @@ cxx::expected Semaphore::post() noexcept cxx::expected Semaphore::timedWait(const units::Duration abs_timeout) noexcept { +#ifdef IOX_HAS_SEM_CLOCKWAIT + const struct timespec timeout = abs_timeout.timespec(units::TimeSpecReference::Monotonic); +#else const struct timespec timeout = abs_timeout.timespec(units::TimeSpecReference::Epoch); +#endif + auto call = posixCall(iox_sem_timedwait)(getHandle(), &timeout).failureReturnValue(-1).ignoreErrnos(ETIMEDOUT).evaluate(); From 5f7d501f5e8aaa1b2e9fceca79210801b6c5db6b Mon Sep 17 00:00:00 2001 From: thun Date: Mon, 4 Aug 2025 16:27:06 +0800 Subject: [PATCH 2/2] Update condition_variable.hpp --- .../concurrent/condition_variable.hpp | 100 +++--------------- 1 file changed, 14 insertions(+), 86 deletions(-) diff --git a/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/condition_variable.hpp b/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/condition_variable.hpp index 6b3fc3fc31..018c3e80a3 100644 --- a/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/condition_variable.hpp +++ b/iceoryx_hoofs/include/iceoryx_hoofs/internal/concurrent/condition_variable.hpp @@ -1,4 +1,16 @@ -// add by juntuan.lu +/* + * ConditionVariable implementation to fix monotonic_clock. + * + * There is a problem with std::condition_variable in lower versions of gcc, + * which actually uses std::chrono::system_clock instead of std::chrono::steady_clock. + * Bug 41861 (DR887) - [DR887][C++0x] does not use monotonic_clock + * + * Since new versions of gcc cannot be used, stdext::condition_variable can only be + * manually implemented instead of C++11's std::condition_variable. + * + * This problem is mainly solved by using the POSIX function pthread_condattr_setclock to + * set the attribute of the condition variable to CLOCK_MONOTONIC. + */ #pragma once @@ -18,132 +30,48 @@ class ConditionVariable final { public: using native_handle_type = pthread_cond_t*; - /** - * @brief Deleted copy constructor to ensure the class is non-copyable. - */ ConditionVariable(const ConditionVariable&) noexcept = delete; - /** - * @brief Deleted copy assignment operator to ensure the class is non-assignable. - */ ConditionVariable& operator=(const ConditionVariable&) noexcept = delete; - /** - * @brief Constructs a new condition variable object. - */ ConditionVariable() noexcept; - /** - * @brief Destroys the condition variable object. - */ ~ConditionVariable() noexcept; - /** - * @brief Unblocks one of the threads that are waiting on the condition variable. - */ void notify_one() noexcept; - /** - * @brief Unblocks all of the threads that are waiting on the condition variable. - */ void notify_all() noexcept; - /** - * @brief Blocks the calling thread, waiting for the condition variable to be notified. - * - * @param lock The unique lock associated with the condition variable. - */ void wait(std::unique_lock& lock) noexcept; - /** - * @brief Blocks the calling thread, waiting for the condition variable to be notified and a predicate to be true. - * - * @param lock The unique lock associated with the condition variable. - * @param p The predicate to be evaluated. - */ template void wait(std::unique_lock& lock, PredicateT p) noexcept; - /** - * @brief Blocks the calling thread, waiting for the condition variable to be notified or a specific time point to be - * reached. - * - * @param lock The unique lock associated with the condition variable. - * @param atime The time point to wait until. - * @return `std::cv_status::no_timeout` if the condition variable was notified, `std::cv_status::timeout` otherwise. - */ template std::cv_status wait_until(std::unique_lock& lock, const std::chrono::time_point& atime) noexcept; - /** - * @brief Blocks the calling thread, waiting for the condition variable to be notified or a specific time point to be - * reached. - * - * @param lock The unique lock associated with the condition variable. - * @param atime The time point to wait until. - * @return `std::cv_status::no_timeout` if the condition variable was notified, `std::cv_status::timeout` otherwise. - */ template std::cv_status wait_until(std::unique_lock& lock, const std::chrono::time_point& atime) noexcept; - /** - * @brief Blocks the calling thread, waiting for the condition variable to be notified or a specific time point to be - * reached. - * - * @param lock The unique lock associated with the condition variable. - * @param atime The time point to wait until. - * @return `std::cv_status::no_timeout` if the condition variable was notified, `std::cv_status::timeout` otherwise. - */ template std::cv_status wait_until(std::unique_lock& lock, const std::chrono::time_point& atime) noexcept; - /** - * @brief Blocks the calling thread, waiting for the condition variable to be notified or a specific time point to be - * reached, and a predicate to be true. - * - * @param lock The unique lock associated with the condition variable. - * @param atime The time point to wait until. - * @param p The predicate to be evaluated. - * @return `true` if the predicate was satisfied before the time point was reached, `false` otherwise. - */ template bool wait_until(std::unique_lock& lock, const std::chrono::time_point& atime, PredicateT p) noexcept; - /** - * @brief Blocks the calling thread, waiting for the condition variable to be notified or a specific duration to - * elapse. - * - * @param lock The unique lock associated with the condition variable. - * @param rtime The duration to wait for. - * @return `std::cv_status::no_timeout` if the condition variable was notified, `std::cv_status::timeout` otherwise. - */ template std::cv_status wait_for(std::unique_lock& lock, const std::chrono::duration& rtime) noexcept; - /** - * @brief Blocks the calling thread, waiting for the condition variable to be notified or a specific duration to - * elapse, and a predicate to be true. - * - * @param lock The unique lock associated with the condition variable. - * @param rtime The duration to wait for. - * @param p The predicate to be evaluated. - * @return `true` if the predicate was satisfied before the time duration was reached, `false` otherwise. - */ template bool wait_for(std::unique_lock& lock, const std::chrono::duration& rtime, PredicateT p) noexcept; - /** - * @brief Retrieves the native handle of the condition variable. - * - * @return The native handle of the condition variable. - */ - [[nodiscard]] native_handle_type native_handle() noexcept; + native_handle_type native_handle() noexcept; private: template