Skip to content

Commit cc259c0

Browse files
author
flw5469
committed
update: add statetracker wait(), implement semaphore using mutex and cond var in platform without sem_clockwait() to prevent semaphore:timed_wait() to hang when changing sysem time, add test cases for state tracker and semaphore
1 parent 04bd158 commit cc259c0

File tree

10 files changed

+524
-19
lines changed

10 files changed

+524
-19
lines changed

SConstruct

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -782,10 +782,15 @@ else:
782782
'target_posix_pc',
783783
])
784784

785-
if meta.platform in ['linux', 'unix', 'android']:
786-
env.Append(ROC_TARGETS=[
787-
'target_posix_ext',
788-
])
785+
if meta.platform in ['linux', 'android', 'unix']:
786+
if 'ROC_HAVE_SEM_CLOCKWAIT' in env['CPPDEFINES']:
787+
env.Append(ROC_TARGETS=[
788+
'target_posix_sem',
789+
])
790+
else:
791+
env.Append(ROC_TARGETS=[
792+
'target_nosem',
793+
])
789794

790795
if meta.platform in ['linux', 'unix', 'macos', 'windows', 'android']:
791796
env.Append(ROC_TARGETS=[
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright (c) 2020 Roc Streaming authors
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
#include "roc_core/semaphore.h"
10+
#include "roc_core/cpu_instructions.h"
11+
#include "roc_core/errno_to_str.h"
12+
#include "roc_core/log.h"
13+
#include "roc_core/panic.h"
14+
15+
#include <errno.h>
16+
#include <time.h>
17+
18+
namespace roc {
19+
namespace core {
20+
21+
Semaphore::Semaphore(unsigned counter)
22+
: mutex_()
23+
, counter_(counter)
24+
, guard_(0) {
25+
if (int err = pthread_cond_init(&cond_, NULL)) {
26+
roc_panic("sem: pthread_cond_init(): %s", errno_to_str(err).c_str());
27+
}
28+
}
29+
30+
Semaphore::~Semaphore() {
31+
while (guard_) {
32+
cpu_relax();
33+
}
34+
35+
int err = 0;
36+
if ((err = pthread_cond_destroy(&cond_))) {
37+
roc_panic("sem: pthread_cond_destroy(): %s", errno_to_str(err).c_str());
38+
}
39+
}
40+
41+
bool Semaphore::timed_wait(nanoseconds_t deadline) {
42+
if (deadline < 0) {
43+
roc_panic("semaphore: unexpected negative deadline");
44+
}
45+
46+
roc_log(roc::LogDebug, "origin time is %" PRId64 "\n", deadline);
47+
roc_log(roc::LogDebug, "time is %" PRId64 "\n", deadline);
48+
roc_log(roc::LogDebug, "now time is %" PRId64 "\n",
49+
core::timestamp(core::ClockMonotonic));
50+
51+
struct timespec ts;
52+
ts.tv_sec = time_t(deadline / Second);
53+
ts.tv_nsec = long(deadline % Second);
54+
55+
int err = 0;
56+
mutex_.lock();
57+
while (err == 0 && counter_ == 0) {
58+
err = pthread_cond_clockwait(&cond_, &mutex_.mutex_, CLOCK_MONOTONIC, &ts);
59+
printf("finish waiting without sem");
60+
// roc_log(roc::LogDebug, "finish waiting without sem");
61+
62+
if (err != 0 && err != ETIMEDOUT) {
63+
roc_panic("semaphore: pthread_cond_timedwait(): %s",
64+
errno_to_str(err).c_str());
65+
}
66+
}
67+
68+
if (err == 0) {
69+
counter_--;
70+
}
71+
mutex_.unlock();
72+
73+
// return false when err == ETIMEDOUT
74+
return (err == 0);
75+
}
76+
77+
void Semaphore::wait() {
78+
int err = 0;
79+
mutex_.lock();
80+
while (err == 0 && counter_ == 0) {
81+
if ((err = pthread_cond_wait(&cond_, &mutex_.mutex_))) {
82+
roc_panic("semaphore: pthread_cond_wait(): %s", errno_to_str(err).c_str());
83+
}
84+
}
85+
counter_--;
86+
mutex_.unlock();
87+
}
88+
89+
void Semaphore::post() {
90+
++guard_;
91+
mutex_.lock();
92+
counter_++;
93+
if (int err = pthread_cond_broadcast(&cond_)) {
94+
roc_panic("cond: pthread_cond_broadcast(): %s", errno_to_str(err).c_str());
95+
}
96+
mutex_.unlock();
97+
--guard_;
98+
}
99+
100+
} // namespace core
101+
} // namespace roc
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2020 Roc Streaming authors
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
//! @file roc_core/target_nosem/roc_core/semaphore.h
10+
//! @brief Semaphore.
11+
12+
#ifndef ROC_CORE_SEMAPHORE_H_
13+
#define ROC_CORE_SEMAPHORE_H_
14+
15+
#include "roc_core/atomic.h"
16+
#include "roc_core/attributes.h"
17+
#include "roc_core/mutex.h"
18+
#include "roc_core/noncopyable.h"
19+
#include "roc_core/time.h"
20+
#include <pthread.h>
21+
22+
namespace roc {
23+
namespace core {
24+
25+
//! Semaphore.
26+
class Semaphore : public NonCopyable<> {
27+
public:
28+
//! Initialize semaphore with given counter.
29+
explicit Semaphore(unsigned counter = 0);
30+
31+
~Semaphore();
32+
33+
//! Wait until the counter becomes non-zero, decrement it, and return true.
34+
//! If deadline expires before the counter becomes non-zero, returns false.
35+
//! Deadline should be in the same time domain as core::timestamp().
36+
ROC_NODISCARD bool timed_wait(nanoseconds_t deadline);
37+
38+
//! Wait until the counter becomes non-zero, decrement it, and return.
39+
void wait();
40+
41+
//! Increment counter and wake up blocked waits.
42+
//! This method is implemented using mutex for platforms where
43+
void post();
44+
45+
private:
46+
pthread_cond_t cond_;
47+
core::Mutex mutex_;
48+
Atomic<unsigned> counter_;
49+
Atomic<int> guard_;
50+
};
51+
52+
} // namespace core
53+
} // namespace roc
54+
55+
#endif // ROC_CORE_SEMAPHORE_H_

src/internal_modules/roc_core/target_posix/roc_core/mutex.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ class Mutex : public NonCopyable<> {
7070

7171
private:
7272
friend class Cond;
73+
friend class Semaphore;
7374

7475
mutable pthread_mutex_t mutex_;
7576
mutable AtomicInt<int32_t> guard_;

src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp renamed to src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "roc_core/semaphore.h"
1010
#include "roc_core/cpu_instructions.h"
1111
#include "roc_core/errno_to_str.h"
12+
#include "roc_core/log.h"
1213
#include "roc_core/panic.h"
1314

1415
#include <errno.h>
@@ -34,16 +35,17 @@ Semaphore::~Semaphore() {
3435
}
3536

3637
bool Semaphore::timed_wait(nanoseconds_t deadline) {
38+
printf("Enter function of timed_wait");
3739
if (deadline < 0) {
3840
roc_panic("semaphore: unexpected negative deadline");
3941
}
4042

41-
for (;;) {
42-
timespec ts;
43-
ts.tv_sec = long(deadline / Second);
44-
ts.tv_nsec = long(deadline % Second);
43+
timespec ts;
44+
ts.tv_sec = long(deadline / Second);
45+
ts.tv_nsec = long(deadline % Second);
4546

46-
if (sem_timedwait(&sem_, &ts) == 0) {
47+
for (;;) {
48+
if (sem_clockwait(&sem_, CLOCK_MONOTONIC, &ts) == 0) {
4749
return true;
4850
}
4951

src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.h renamed to src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
77
*/
88

9-
//! @file roc_core/target_posix_ext/roc_core/semaphore.h
9+
//! @file roc_core/target_posix_sem/roc_core/semaphore.h
1010
//! @brief Semaphore.
1111

1212
#ifndef ROC_CORE_SEMAPHORE_H_

src/internal_modules/roc_pipeline/state_tracker.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ StateTracker::StateTracker()
1919
, active_sessions_(0)
2020
, pending_packets_(0)
2121
, sem_is_occupied_(0)
22-
, waiting_mask_(0)
2322
, mutex_()
2423
, waiting_con_(mutex_) {
2524
}
@@ -36,24 +35,24 @@ StateTracker::StateTracker()
3635
// Questions:
3736
// - When should the function return true vs false
3837
bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadline) {
39-
38+
// If no state is specified in state_mask, return immediately
39+
if (state_mask == 0) {
40+
return true;
41+
}
42+
4043
mutex_.lock();
4144
for (;;) {
42-
// If no state is specified in state_mask, return immediately
43-
if (state_mask == 0) {
44-
return true;
45-
}
46-
4745
if (static_cast<unsigned>(get_state()) & state_mask) {
46+
mutex_.unlock();
4847
return true;
4948
}
5049

5150
if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) {
51+
mutex_.unlock();
5252
return false;
5353
}
5454

5555
if (sem_is_occupied_.compare_exchange(0, 1)) {
56-
5756
if (deadline >= 0) {
5857
mutex_.unlock();
5958
(void)sem_.timed_wait(deadline);

src/internal_modules/roc_pipeline/state_tracker.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ class StateTracker : public core::NonCopyable<> {
7373
core::Atomic<int> active_sessions_;
7474
core::Atomic<int> pending_packets_;
7575
core::Atomic<int> sem_is_occupied_;
76-
core::Atomic<unsigned> waiting_mask_;
7776
core::Mutex mutex_;
7877
core::Cond waiting_con_;
7978

0 commit comments

Comments
 (0)