Skip to content

Added support for timeouts to lightweight semaphore #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 92 additions & 5 deletions common/sema.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@

#include <atomic>
#include <cassert>
#include <cstdint>


#if defined(_WIN32)
//---------------------------------------------------------
// Semaphore (Windows)
//---------------------------------------------------------

#define NOMINMAX
#include <windows.h>
#undef min
#undef max

class Semaphore
{
Expand All @@ -44,6 +44,16 @@ class Semaphore
WaitForSingleObject(m_hSema, INFINITE);
}

bool try_wait()
{
return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT;
}

bool timed_wait(std::uint64_t usecs)
{
return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) != RC_WAIT_TIMEOUT;
}

void signal(int count = 1)
{
ReleaseSemaphore(m_hSema, count, NULL);
Expand Down Expand Up @@ -84,6 +94,23 @@ class Semaphore
semaphore_wait(m_sema);
}

bool try_wait()
{
return timed_wait(0);
}

bool timed_wait(std::uint64_t timeout_usecs)
{
mach_timespec_t ts;
ts.tv_sec = timeout_usecs / 1000000;
ts.tv_nsec = (timeout_usecs % 1000000) * 1000;

// added in OSX 10.10: https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
kern_return_t rc = semaphore_timedwait(m_sema, ts);

return rc != KERN_OPERATION_TIMED_OUT;
}

void signal()
{
semaphore_signal(m_sema);
Expand All @@ -105,6 +132,7 @@ class Semaphore
//---------------------------------------------------------

#include <semaphore.h>
#include <cerrno>

class Semaphore
{
Expand Down Expand Up @@ -137,6 +165,42 @@ class Semaphore
while (rc == -1 && errno == EINTR);
}

bool try_wait()
{
int rc;
do
{
rc = sem_trywait(&m_sema);
}
while (rc == -1 && errno == EINTR);
return !(rc == -1 && errno == EAGAIN);
}

bool timed_wait(std::uint64_t usecs)
{
struct timespec ts;
const int usecs_in_1_sec = 1000000;
const int nsecs_in_1_sec = 1000000000;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += usecs / usecs_in_1_sec;
ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000;
// sem_timedwait bombs if you have more than 1e9 in tv_nsec
// so we have to clean things up before passing it in
if (ts.tv_nsec > nsecs_in_1_sec)
{
ts.tv_nsec -= nsecs_in_1_sec;
++ts.tv_sec;
}

int rc;
do
{
rc = sem_timedwait(&m_sema, &ts);
}
while (rc == -1 && errno == EINTR);
return !(rc == -1 && errno == ETIMEDOUT);
}

void signal()
{
sem_post(&m_sema);
Expand Down Expand Up @@ -168,7 +232,7 @@ class LightweightSemaphore
std::atomic<int> m_count;
Semaphore m_sema;

void waitWithPartialSpinning()
bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1)
{
int oldCount;
// Is there a better way to set the initial spin count?
Expand All @@ -179,13 +243,31 @@ class LightweightSemaphore
{
oldCount = m_count.load(std::memory_order_relaxed);
if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire))
return;
return true;
std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
}
oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
if (oldCount <= 0)
if (oldCount > 0)
return true;
if (timeout_usecs < 0)
{
m_sema.wait();
return true;
}
if (m_sema.timed_wait((std::uint64_t)timeout_usecs))
return true;
// At this point, we've timed out waiting for the semaphore, but the
// count is still decremented indicating we may still be waiting on
// it. So we have to re-adjust the count, but only if the semaphore
// wasn't signaled enough times for us too since then. If it was, we
// need to release the semaphore too.
while (true)
{
oldCount = m_count.load(std::memory_order_acquire);
if (oldCount >= 0 && m_sema.try_wait())
return true;
if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed))
return false;
}
}

Expand All @@ -207,6 +289,11 @@ class LightweightSemaphore
waitWithPartialSpinning();
}

bool wait(std::int64_t timeout_usecs)
{
return tryWait() || waitWithPartialSpinning(timeout_usecs);
}

void signal(int count = 1)
{
int oldCount = m_count.fetch_add(count, std::memory_order_release);
Expand Down