Skip to content

Commit 04bd158

Browse files
author
flw5469
committed
feat: implemented semaphore and condition variable based statetracker
1 parent 4d4abbb commit 04bd158

File tree

2 files changed

+109
-10
lines changed

2 files changed

+109
-10
lines changed

src/internal_modules/roc_pipeline/state_tracker.cpp

Lines changed: 93 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,74 @@
77
*/
88

99
#include "roc_pipeline/state_tracker.h"
10+
#include "roc_core/log.h"
1011
#include "roc_core/panic.h"
1112

1213
namespace roc {
1314
namespace pipeline {
1415

1516
StateTracker::StateTracker()
16-
: halt_state_(-1)
17+
: sem_(0)
18+
, halt_state_(-1)
1719
, active_sessions_(0)
18-
, pending_packets_(0) {
20+
, pending_packets_(0)
21+
, sem_is_occupied_(0)
22+
, waiting_mask_(0)
23+
, mutex_()
24+
, waiting_con_(mutex_) {
25+
}
26+
27+
// StateTracker::~StateTracker() {
28+
// mutex_.unlock();
29+
// }
30+
31+
// This method should block until the state becomes any of the states specified by the
32+
// mask, or deadline expires. E.g. if mask is ACTIVE | PAUSED, it should block until state
33+
// becomes either ACTIVE or PAUSED. (Currently only two states are used, but later more
34+
// states will be needed). Deadline should be an absolute timestamp.
35+
36+
// Questions:
37+
// - When should the function return true vs false
38+
bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadline) {
39+
40+
mutex_.lock();
41+
for (;;) {
42+
// If no state is specified in state_mask, return immediately
43+
if (state_mask == 0) {
44+
return true;
45+
}
46+
47+
if (static_cast<unsigned>(get_state()) & state_mask) {
48+
return true;
49+
}
50+
51+
if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) {
52+
return false;
53+
}
54+
55+
if (sem_is_occupied_.compare_exchange(0, 1)) {
56+
57+
if (deadline >= 0) {
58+
mutex_.unlock();
59+
(void)sem_.timed_wait(deadline);
60+
61+
} else {
62+
mutex_.unlock();
63+
sem_.wait();
64+
}
65+
66+
mutex_.lock();
67+
sem_is_occupied_ = 0;
68+
waiting_con_.broadcast();
69+
70+
} else {
71+
if (deadline >= 0) {
72+
(void)waiting_con_.timed_wait(deadline);
73+
} else {
74+
waiting_con_.wait();
75+
}
76+
}
77+
}
1978
}
2079

2180
sndio::DeviceState StateTracker::get_state() const {
@@ -65,22 +124,50 @@ size_t StateTracker::num_sessions() const {
65124
}
66125

67126
void StateTracker::register_session() {
68-
active_sessions_++;
127+
if (active_sessions_++ == 0) {
128+
signal_state_change();
129+
}
69130
}
70131

71132
void StateTracker::unregister_session() {
72-
if (--active_sessions_ < 0) {
133+
int prev_sessions = active_sessions_--;
134+
if (prev_sessions == 0) {
73135
roc_panic("state tracker: unpaired register/unregister session");
136+
} else if (prev_sessions == 1 && pending_packets_ == 0) {
137+
signal_state_change();
74138
}
139+
140+
// if (--active_sessions_ < 0) {
141+
// roc_panic("state tracker: unpaired register/unregister session");
142+
// }
75143
}
76144

77145
void StateTracker::register_packet() {
78-
pending_packets_++;
146+
if (pending_packets_++ == 0 && active_sessions_ == 0) {
147+
signal_state_change();
148+
}
79149
}
80150

81151
void StateTracker::unregister_packet() {
82-
if (--pending_packets_ < 0) {
152+
int prev_packets = pending_packets_--;
153+
if (prev_packets == 0) {
83154
roc_panic("state tracker: unpaired register/unregister packet");
155+
} else if (prev_packets == 1 && active_sessions_ == 0) {
156+
signal_state_change();
157+
}
158+
159+
// if (--pending_packets_ < 0) {
160+
// roc_panic("state tracker: unpaired register/unregister packet");
161+
// }
162+
}
163+
164+
void StateTracker::signal_state_change() {
165+
// if (waiting_mask_ != 0 && (static_cast<unsigned>(get_state()) & waiting_mask_)) {
166+
// sem_.post();
167+
// }
168+
if (sem_is_occupied_) {
169+
roc_log(LogDebug, "signaling");
170+
sem_.post();
84171
}
85172
}
86173

src/internal_modules/roc_pipeline/state_tracker.h

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
#ifndef ROC_PIPELINE_STATE_TRACKER_H_
1313
#define ROC_PIPELINE_STATE_TRACKER_H_
1414

15-
#include "roc_core/atomic_int.h"
15+
#include "roc_core/atomic.h"
1616
#include "roc_core/noncopyable.h"
17+
#include "roc_core/semaphore.h"
1718
#include "roc_core/stddefs.h"
19+
#include "roc_core/time.h"
1820
#include "roc_sndio/device_defs.h"
1921

2022
namespace roc {
@@ -32,6 +34,9 @@ class StateTracker : public core::NonCopyable<> {
3234
//! Initialize all counters to zero.
3335
StateTracker();
3436

37+
//! Block until state becomes any of the ones specified by state_mask.
38+
bool wait_state(unsigned state_mask, core::nanoseconds_t deadline);
39+
3540
//! Compute current state.
3641
sndio::DeviceState get_state() const;
3742

@@ -63,9 +68,16 @@ class StateTracker : public core::NonCopyable<> {
6368
void unregister_packet();
6469

6570
private:
66-
core::AtomicInt<int32_t> halt_state_;
67-
core::AtomicInt<int32_t> active_sessions_;
68-
core::AtomicInt<int32_t> pending_packets_;
71+
core::Semaphore sem_;
72+
core::Atomic<int> halt_state_;
73+
core::Atomic<int> active_sessions_;
74+
core::Atomic<int> pending_packets_;
75+
core::Atomic<int> sem_is_occupied_;
76+
core::Atomic<unsigned> waiting_mask_;
77+
core::Mutex mutex_;
78+
core::Cond waiting_con_;
79+
80+
void signal_state_change();
6981
};
7082

7183
} // namespace pipeline

0 commit comments

Comments
 (0)