Skip to content
Open
13 changes: 9 additions & 4 deletions SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -782,10 +782,15 @@ else:
'target_posix_pc',
])

if meta.platform in ['linux', 'unix', 'android']:
env.Append(ROC_TARGETS=[
'target_posix_ext',
])
if meta.platform in ['linux', 'android', 'unix']:
if 'ROC_HAVE_SEM_CLOCKWAIT' in env['CPPDEFINES']:
env.Append(ROC_TARGETS=[
'target_posix_sem',
])
else:
env.Append(ROC_TARGETS=[
'target_nosem',
])

if meta.platform in ['linux', 'unix', 'macos', 'windows', 'android']:
env.Append(ROC_TARGETS=[
Expand Down
149 changes: 149 additions & 0 deletions src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright (c) 2020 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include "roc_core/semaphore.h"
#include "roc_core/cpu_instructions.h"
#include "roc_core/errno_to_str.h"
#include "roc_core/log.h"
#include "roc_core/panic.h"

#include <errno.h>
#include <time.h>

namespace roc {
namespace core {

Semaphore::Semaphore(unsigned counter)
: mutex_()
, counter_(counter)
, guard_(0) {
pthread_condattr_t attr;

if (int err = pthread_condattr_init(&attr)) {
roc_panic("semaphore: pthread_condattr_init(): %s", errno_to_str(err).c_str());
}

#if defined(CLOCK_MONOTONIC) && !defined(__APPLE__) && !defined(__MACH__)
if (int err = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC)) {
roc_panic("semaphore: pthread_condattr_setclock(): %s",
errno_to_str(err).c_str());
}
#endif

if (int err = pthread_cond_init(&cond_, &attr)) {
roc_panic("semaphore: pthread_cond_init(): %s", errno_to_str(err).c_str());
}

if (int err = pthread_condattr_destroy(&attr)) {
roc_panic("semaphore: pthread_condattr_destroy(): %s", errno_to_str(err).c_str());
}
}

Semaphore::~Semaphore() {
/* Ensure that signal() and broadcast() are not using condvar.
*/
while (guard_) {
cpu_relax();
}

int err;

#if defined(__APPLE__) && defined(__MACH__)
if ((err = pthread_mutex_lock(&mutex_.mutex_))) {
roc_panic("mutex: pthread_mutex_lock(): %s", errno_to_str(err).c_str());
}

struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 1;

err = pthread_cond_timedwait_relative_np(&cond_, &mutex_.mutex_, &ts);
if (err != 0 && err != ETIMEDOUT) {
roc_panic("mutex: pthread_cond_timedwait_relative_np(): %s",
errno_to_str(err).c_str());
}

if ((err = pthread_mutex_unlock(&mutex_.mutex_))) {
roc_panic("mutex: pthread_mutex_unlock(): %s", errno_to_str(err).c_str());
}
#endif

if ((err = pthread_cond_destroy(&cond_))) {
roc_panic("sem: pthread_cond_destroy(): %s", errno_to_str(err).c_str());
}
}

bool Semaphore::timed_wait(nanoseconds_t deadline) {
if (deadline < 0) {
roc_panic("semaphore: unexpected negative deadline");
}

struct timespec ts;
int err = 0;

mutex_.lock();
while (err == 0 && counter_ == 0) {
#if defined(__APPLE__) && defined(__MACH__)
// On macOS, convert absolute deadline to relative timeout
nanoseconds_t now = timestamp(ClockMonotonic);
if (deadline <= now) {
err = ETIMEDOUT;
break;
}
nanoseconds_t timeout = deadline - now;

ts.tv_sec = time_t(timeout / Second);
ts.tv_nsec = long(timeout % Second);

err = pthread_cond_timedwait_relative_np(&cond_, &mutex_.mutex_, &ts);
#else
ts.tv_sec = time_t(deadline / Second);
ts.tv_nsec = long(deadline % Second);

err = pthread_cond_timedwait(&cond_, &mutex_.mutex_, &ts);
#endif
}

bool acquired = (counter_ > 0);
if (acquired) {
counter_--;
}
mutex_.unlock();

if (err != 0 && err != ETIMEDOUT) {
roc_panic("semaphore: pthread_cond_timedwait(): %s", errno_to_str(err).c_str());
}

return acquired;
}

void Semaphore::wait() {
int err = 0;
mutex_.lock();
while (err == 0 && counter_ == 0) {
if ((err = pthread_cond_wait(&cond_, &mutex_.mutex_))) {
roc_panic("semaphore: pthread_cond_wait(): %s", errno_to_str(err).c_str());
}
}
counter_--;
mutex_.unlock();
}

void Semaphore::post() {
++guard_;
mutex_.lock();
counter_++;
if (int err = pthread_cond_broadcast(&cond_)) {
roc_panic("cond: pthread_cond_broadcast(): %s", errno_to_str(err).c_str());
}
mutex_.unlock();
--guard_;
}

} // namespace core
} // namespace roc
55 changes: 55 additions & 0 deletions src/internal_modules/roc_core/target_nosem/roc_core/semaphore.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2020 Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

//! @file roc_core/target_nosem/roc_core/semaphore.h
//! @brief Semaphore.

#ifndef ROC_CORE_SEMAPHORE_H_
#define ROC_CORE_SEMAPHORE_H_

#include "roc_core/atomic_int.h"
#include "roc_core/attributes.h"
#include "roc_core/mutex.h"
#include "roc_core/noncopyable.h"
#include "roc_core/time.h"
#include <pthread.h>

namespace roc {
namespace core {

//! Semaphore.
class Semaphore : public NonCopyable<> {
public:
//! Initialize semaphore with given counter.
explicit Semaphore(unsigned counter = 0);

~Semaphore();

//! Wait until the counter becomes non-zero, decrement it, and return true.
//! If deadline expires before the counter becomes non-zero, returns false.
//! Deadline should be in the same time domain as core::timestamp().
ROC_NODISCARD bool timed_wait(nanoseconds_t deadline);

//! Wait until the counter becomes non-zero, decrement it, and return.
void wait();

//! Increment counter and wake up blocked waits.
//! This method is implemented using mutex for platforms where
void post();

private:
pthread_cond_t cond_;
core::Mutex mutex_;
AtomicInt<unsigned> counter_;
AtomicInt<int> guard_;
};

} // namespace core
} // namespace roc

#endif // ROC_CORE_SEMAPHORE_H_
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Mutex : public NonCopyable<> {

private:
friend class Cond;
friend class Semaphore;

mutable pthread_mutex_t mutex_;
mutable AtomicInt<int32_t> guard_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "roc_core/semaphore.h"
#include "roc_core/cpu_instructions.h"
#include "roc_core/errno_to_str.h"
#include "roc_core/log.h"
#include "roc_core/panic.h"

#include <errno.h>
Expand All @@ -34,16 +35,17 @@ Semaphore::~Semaphore() {
}

bool Semaphore::timed_wait(nanoseconds_t deadline) {
printf("Enter function of timed_wait");
if (deadline < 0) {
roc_panic("semaphore: unexpected negative deadline");
}

for (;;) {
timespec ts;
ts.tv_sec = long(deadline / Second);
ts.tv_nsec = long(deadline % Second);
timespec ts;
ts.tv_sec = long(deadline / Second);
ts.tv_nsec = long(deadline % Second);

if (sem_timedwait(&sem_, &ts) == 0) {
for (;;) {
if (sem_clockwait(&sem_, CLOCK_MONOTONIC, &ts) == 0) {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

//! @file roc_core/target_posix_ext/roc_core/semaphore.h
//! @file roc_core/target_posix_sem/roc_core/semaphore.h
//! @brief Semaphore.

#ifndef ROC_CORE_SEMAPHORE_H_
Expand Down
Loading
Loading