Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* ConditionVariable implementation to fix monotonic_clock.
*
* There is a problem with std::condition_variable in lower versions of gcc,
* which actually uses std::chrono::system_clock instead of std::chrono::steady_clock.
* Bug 41861 (DR887) - [DR887][C++0x] <condition_variable> does not use monotonic_clock
*
* Since new versions of gcc cannot be used, stdext::condition_variable can only be
* manually implemented instead of C++11's std::condition_variable.
*
* This problem is mainly solved by using the POSIX function pthread_condattr_setclock to
* set the attribute of the condition variable to CLOCK_MONOTONIC.
*/

#pragma once

#include <condition_variable>
#include <utility>

#ifdef __unix__

#include <pthread.h>

#include <chrono>
#include <mutex>

namespace stdext {

class ConditionVariable final {
public:
using native_handle_type = pthread_cond_t*;

ConditionVariable(const ConditionVariable&) noexcept = delete;

ConditionVariable& operator=(const ConditionVariable&) noexcept = delete;

ConditionVariable() noexcept;

~ConditionVariable() noexcept;

void notify_one() noexcept;

void notify_all() noexcept;

void wait(std::unique_lock<std::mutex>& lock) noexcept;

template <typename PredicateT>
void wait(std::unique_lock<std::mutex>& lock, PredicateT p) noexcept;

template <typename DurationT>
std::cv_status wait_until(std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<std::chrono::steady_clock, DurationT>& atime) noexcept;

template <typename DurationT>
std::cv_status wait_until(std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<std::chrono::system_clock, DurationT>& atime) noexcept;

template <typename ClockT, typename DurationT>
std::cv_status wait_until(std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<ClockT, DurationT>& atime) noexcept;

template <typename ClockT, typename DurationT, typename PredicateT>
bool wait_until(std::unique_lock<std::mutex>& lock, const std::chrono::time_point<ClockT, DurationT>& atime,
PredicateT p) noexcept;

template <typename RepT, typename PeriodT>
std::cv_status wait_for(std::unique_lock<std::mutex>& lock,
const std::chrono::duration<RepT, PeriodT>& rtime) noexcept;

template <typename RepT, typename PeriodT, typename PredicateT>
bool wait_for(std::unique_lock<std::mutex>& lock, const std::chrono::duration<RepT, PeriodT>& rtime,
PredicateT p) noexcept;

native_handle_type native_handle() noexcept;

private:
template <typename ToDurT, typename RepT, typename PeriodT>
static constexpr ToDurT ceil(const std::chrono::duration<RepT, PeriodT>& d) noexcept;

template <typename TpT, typename UpT>
static constexpr TpT ceil_impl(const TpT& t, const UpT& u) noexcept;

template <typename DurationT>
std::cv_status wait_until_impl(std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<std::chrono::steady_clock, DurationT>& atime) noexcept;

pthread_cond_t cond_{};
};

////////////////////////////////////////////////////////////////
/// Details
////////////////////////////////////////////////////////////////

inline ConditionVariable::ConditionVariable() noexcept {
pthread_condattr_t attr;

pthread_condattr_init(&attr);
pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
pthread_cond_init(&cond_, &attr);
pthread_condattr_destroy(&attr);
}

inline ConditionVariable::~ConditionVariable() noexcept { pthread_cond_destroy(&cond_); }

inline void ConditionVariable::notify_one() noexcept { pthread_cond_signal(&cond_); }

inline void ConditionVariable::notify_all() noexcept { pthread_cond_broadcast(&cond_); }

inline void ConditionVariable::wait(std::unique_lock<std::mutex>& lock) noexcept {
pthread_cond_wait(&cond_, lock.mutex()->native_handle());
}

template <typename PredicateT>
inline void ConditionVariable::wait(std::unique_lock<std::mutex>& lock, PredicateT p) noexcept {
while (!p()) {
wait(lock);
}
}

template <typename DurationT>
inline std::cv_status ConditionVariable::wait_until(
std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<std::chrono::steady_clock, DurationT>& atime) noexcept {
return wait_until_impl(lock, atime);
}

template <typename DurationT>
inline std::cv_status ConditionVariable::wait_until(
std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<std::chrono::system_clock, DurationT>& atime) noexcept {
return wait_until<std::chrono::system_clock, DurationT>(lock, atime);
}

template <typename ClockT, typename DurationT>
inline std::cv_status ConditionVariable::wait_until(std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<ClockT, DurationT>& atime) noexcept {
const typename ClockT::time_point c_entry = ClockT::now();
const std::chrono::steady_clock::time_point s_entry = std::chrono::steady_clock::now();
const auto delta = atime - c_entry;
const auto s_atime = s_entry + ceil<std::chrono::steady_clock::duration>(delta);

if (wait_until_impl(lock, s_atime) == std::cv_status::no_timeout) {
return std::cv_status::no_timeout;
}

if (ClockT::now() < atime) {
return std::cv_status::no_timeout;
}

return std::cv_status::timeout;
}

template <typename ClockT, typename DurationT, typename PredicateT>
inline bool ConditionVariable::wait_until(std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<ClockT, DurationT>& atime,
PredicateT p) noexcept {
while (!p()) {
if (wait_until(lock, atime) == std::cv_status::timeout) {
return p();
}
}

return true;
}

template <typename RepT, typename PeriodT>
inline std::cv_status ConditionVariable::wait_for(std::unique_lock<std::mutex>& lock,
const std::chrono::duration<RepT, PeriodT>& rtime) noexcept {
return wait_until(lock, std::chrono::steady_clock::now() + ceil<std::chrono::steady_clock::duration>(rtime));
}

template <typename RepT, typename PeriodT, typename PredicateT>
inline bool ConditionVariable::wait_for(std::unique_lock<std::mutex>& lock,
const std::chrono::duration<RepT, PeriodT>& rtime, PredicateT p) noexcept {
return wait_until(lock, std::chrono::steady_clock::now() + ceil<std::chrono::steady_clock::duration>(rtime),
std::move(p));
}

inline ConditionVariable::native_handle_type ConditionVariable::native_handle() noexcept { return &cond_; }

template <typename ToDurT, typename RepT, typename PeriodT>
inline constexpr ToDurT ConditionVariable::ceil(const std::chrono::duration<RepT, PeriodT>& d) noexcept { // NOLINT
return ceil_impl(std::chrono::duration_cast<ToDurT>(d), d);
}

template <typename TpT, typename UpT>
inline constexpr TpT ConditionVariable::ceil_impl(const TpT& t, const UpT& u) noexcept { // NOLINT
return (t < u) ? (t + TpT{1}) : t;
}

template <typename DurationT>
inline std::cv_status ConditionVariable::wait_until_impl(
std::unique_lock<std::mutex>& lock,
const std::chrono::time_point<std::chrono::steady_clock, DurationT>& atime) noexcept {
auto s = std::chrono::time_point_cast<std::chrono::seconds>(atime);
auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(atime - s);

struct timespec ts = {static_cast<std::time_t>(s.time_since_epoch().count()), // NOLINT
static_cast<long>(ns.count())}; // NOLINT

pthread_cond_timedwait(&cond_, lock.mutex()->native_handle(), &ts);

return (std::chrono::steady_clock::now() < atime ? std::cv_status::no_timeout : std::cv_status::timeout);
}

using condition_variable = ConditionVariable; // NOLINT

} // namespace stdext

#else

namespace stdext {

using ConditionVariable = std::condition_variable; // NOLINT
using condition_variable = ConditionVariable; // NOLINT

} // namespace stdext

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
#include "iceoryx_hoofs/posix_wrapper/thread.hpp"

#include <thread>

#include <atomic>
#include <mutex>
#include <iostream>

#include "./condition_variable.hpp"

namespace iox
{
namespace concurrent
Expand Down Expand Up @@ -124,6 +127,9 @@ class PeriodicTask
/// @todo use a refactored posix::Timer object once available
posix::Semaphore m_stop{posix::Semaphore::create(posix::CreateUnnamedSingleProcessSemaphore, 0U).value()};
std::thread m_taskExecutor;
std::atomic_bool m_running_flag{false};
std::mutex m_mtx;
stdext::condition_variable m_cv;
};

} // namespace concurrent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ template <typename T>
inline void PeriodicTask<T>::start(const units::Duration interval) noexcept
{
stop();
m_running_flag.store(true);
m_interval = interval;
m_taskExecutor = std::thread(&PeriodicTask::run, this);
posix::setThreadName(m_taskExecutor.native_handle(), m_taskName);
Expand All @@ -60,6 +61,12 @@ inline void PeriodicTask<T>::start(const units::Duration interval) noexcept
template <typename T>
inline void PeriodicTask<T>::stop() noexcept
{
if(m_running_flag.load()) {
std::unique_lock<std::mutex> lock(m_mtx);
m_running_flag.store(false);
m_cv.notify_one();
}

if (m_taskExecutor.joinable())
{
cxx::Expects(!m_stop.post().has_error());
Expand All @@ -76,17 +83,26 @@ inline bool PeriodicTask<T>::isActive() const noexcept
template <typename T>
inline void PeriodicTask<T>::run() noexcept
{
posix::SemaphoreWaitState waitState = posix::SemaphoreWaitState::NO_TIMEOUT;
do
{
IOX_DISCARD_RESULT(m_callable());
// posix::SemaphoreWaitState waitState = posix::SemaphoreWaitState::NO_TIMEOUT;
// do
// {
// IOX_DISCARD_RESULT(m_callable());

/// @todo use a refactored posix::Timer::wait method returning TIMER_TICK and TIMER_STOPPED once available
auto waitResult = m_stop.timedWait(m_interval);
cxx::Expects(!waitResult.has_error());
// /// @todo use a refactored posix::Timer::wait method returning TIMER_TICK and TIMER_STOPPED once available
// auto waitResult = m_stop.timedWait(m_interval);
// cxx::Expects(!waitResult.has_error());

waitState = waitResult.value();
} while (waitState == posix::SemaphoreWaitState::TIMEOUT);
// waitState = waitResult.value();
// } while (waitState == posix::SemaphoreWaitState::TIMEOUT);

std::unique_lock<std::mutex> lock(m_mtx);
for(;;) {
IOX_DISCARD_RESULT(m_callable());
bool ret = m_cv.wait_for(lock, std::chrono::milliseconds(m_interval.toMilliseconds()), [this]() -> bool { return !m_running_flag.load(); });
if (ret) {
break;
}
}
}

} // namespace concurrent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@
#define IOX_HOOFS_LINUX_PLATFORM_SEMAPHORE_HPP

#include <semaphore.h>
#include <time.h>

#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600
#endif

#if defined(__USE_XOPEN2K)
#ifndef IOX_HAS_SEM_CLOCKWAIT
#define IOX_HAS_SEM_CLOCKWAIT
#endif
#endif

#ifndef IOX_HAS_SEM_CLOCKWAIT
#warning "Not support sem_clockwait!"
#endif

using iox_sem_t = sem_t;

Expand All @@ -43,7 +58,11 @@ inline int iox_sem_trywait(iox_sem_t* sem)

inline int iox_sem_timedwait(iox_sem_t* sem, const struct timespec* abs_timeout)
{
#ifdef IOX_HAS_SEM_CLOCKWAIT
return sem_clockwait(sem, CLOCK_MONOTONIC, abs_timeout);
#else
return sem_timedwait(sem, abs_timeout);
#endif
}

inline int iox_sem_close(iox_sem_t* sem)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@
#define IOX_HOOFS_QNX_PLATFORM_SEMAPHORE_HPP

#include <semaphore.h>
#include <time.h>

#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600
#endif

#if defined(__USE_XOPEN2K)
#ifndef IOX_HAS_SEM_CLOCKWAIT
#define IOX_HAS_SEM_CLOCKWAIT
#endif
#endif

#ifndef IOX_HAS_SEM_CLOCKWAIT
#warning "Not support sem_clockwait!"
#endif

using iox_sem_t = sem_t;

Expand All @@ -43,7 +58,11 @@ inline int iox_sem_trywait(iox_sem_t* sem)

inline int iox_sem_timedwait(iox_sem_t* sem, const struct timespec* abs_timeout)
{
#ifdef IOX_HAS_SEM_CLOCKWAIT
return sem_clockwait(sem, CLOCK_MONOTONIC, abs_timeout);
#else
return sem_timedwait(sem, abs_timeout);
#endif
}

inline int iox_sem_close(iox_sem_t* sem)
Expand Down
5 changes: 5 additions & 0 deletions iceoryx_hoofs/source/posix_wrapper/semaphore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ cxx::expected<SemaphoreError> Semaphore::post() noexcept

cxx::expected<SemaphoreWaitState, SemaphoreError> Semaphore::timedWait(const units::Duration abs_timeout) noexcept
{
#ifdef IOX_HAS_SEM_CLOCKWAIT
const struct timespec timeout = abs_timeout.timespec(units::TimeSpecReference::Monotonic);
#else
const struct timespec timeout = abs_timeout.timespec(units::TimeSpecReference::Epoch);
#endif

auto call =
posixCall(iox_sem_timedwait)(getHandle(), &timeout).failureReturnValue(-1).ignoreErrnos(ETIMEDOUT).evaluate();

Expand Down