Skip to content

Commit 68bf827

Browse files
committed
fix race condition in time barriers
1 parent 01be6ec commit 68bf827

File tree

5 files changed

+41
-26
lines changed

5 files changed

+41
-26
lines changed

include/reactor-cpp/action.hh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,11 @@ protected:
4747
* Returns false if the wait was interrupted and true otherwise. True
4848
* indicates that the tag is safe to process.
4949
*/
50-
virtual auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
50+
virtual auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
5151
const std::function<bool(void)>& abort_waiting) -> bool {
5252
reactor_assert(!logical_);
53-
return PhysicalTimeBarrier::acquire_tag(tag, lock, cv, abort_waiting);
53+
reactor_assert(lock.owns_lock());
54+
return PhysicalTimeBarrier::acquire_tag(tag, lock, environment()->scheduler(), abort_waiting);
5455
}
5556

5657
BaseAction(const std::string& name, Reactor* container, bool logical, Duration min_delay)

include/reactor-cpp/connection.hh

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "reactor.hh"
2121
#include "time.hh"
2222
#include "time_barrier.hh"
23+
#include <unistd.h>
2324

2425
namespace reactor {
2526

@@ -138,6 +139,7 @@ public:
138139

139140
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
140141
const std::function<bool(void)>& abort_waiting) -> bool override {
142+
reactor_assert(lock.owns_lock());
141143
log_.debug() << "downstream tries to acquire tag " << tag;
142144

143145
if (this->upstream_port() == nullptr) {
@@ -164,7 +166,7 @@ public:
164166
}
165167

166168
// Wait until we receive a release_tag message from upstream
167-
return logical_time_barrier_.acquire_tag(tag, lock, cv, abort_waiting);
169+
return logical_time_barrier_.acquire_tag(tag, lock, abort_waiting);
168170
}
169171

170172
void bind_upstream_port(Port<T>* port) override {
@@ -201,15 +203,15 @@ public:
201203
};
202204
}
203205

204-
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
206+
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
205207
const std::function<bool(void)>& abort_waiting) -> bool override {
206208
// Since this is a delayed connection, we can go back in time and need to
207209
// acquire the latest upstream tag that can create an event at the given
208210
// tag. We also need to consider that given a delay d and a tag g=(t, n),
209211
// for any value of n, g + d = (t, 0). Hence, we need to quire a tag with
210212
// the highest possible microstep value.
211213
auto upstream_tag = tag.subtract(this->min_delay());
212-
return EnclaveConnection<T>::acquire_tag(upstream_tag, lock, cv, abort_waiting);
214+
return EnclaveConnection<T>::acquire_tag(upstream_tag, lock, abort_waiting);
213215
}
214216
};
215217

@@ -230,10 +232,10 @@ public:
230232
};
231233
}
232234

233-
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
235+
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
234236
const std::function<bool(void)>& abort_waiting) -> bool override {
235237
this->log_.debug() << "downstream tries to acquire tag " << tag;
236-
return PhysicalTimeBarrier::acquire_tag(tag, lock, cv, abort_waiting);
238+
return PhysicalTimeBarrier::acquire_tag(tag, lock, this->environmet()->scheduler(), abort_waiting);
237239
}
238240

239241
void bind_upstream_port(Port<T>* port) override { Connection<T>::bind_upstream_port(port); }

include/reactor-cpp/scheduler.hh

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
#include <thread>
2020
#include <vector>
2121

22+
#include "assert.hh"
2223
#include "fwd.hh"
2324
#include "logical_time.hh"
2425
#include "reactor-cpp/logging.hh"
2526
#include "reactor-cpp/time.hh"
2627
#include "safe_vector.hh"
2728
#include "semaphore.hh"
29+
#include "time.hh"
2830

2931
namespace reactor {
3032

@@ -176,9 +178,19 @@ public:
176178
auto schedule_async_at(BaseAction* action, const Tag& tag) -> bool;
177179
auto schedule_empty_async_at(const Tag& tag) -> bool;
178180

181+
auto inline lock() noexcept -> std::unique_lock<std::mutex> {
182+
return std::unique_lock<std::mutex>(scheduling_mutex_);
183+
}
179184
void inline notify() noexcept { cv_schedule_.notify_one(); }
180-
181-
auto inline lock() noexcept -> auto { return std::unique_lock<std::mutex>(scheduling_mutex_); }
185+
void inline wait(std::unique_lock<std::mutex>& lock, const std::function<bool(void)>& predicate) noexcept {
186+
reactor_assert(lock.owns_lock());
187+
cv_schedule_.wait(lock, predicate);
188+
};
189+
auto inline wait_until(std::unique_lock<std::mutex>& lock, TimePoint time_point,
190+
const std::function<bool(void)>& predicate) noexcept -> bool {
191+
reactor_assert(lock.owns_lock());
192+
return cv_schedule_.wait_until(lock, time_point, predicate);
193+
};
182194

183195
void set_port(BasePort* port);
184196

include/reactor-cpp/time_barrier.hh

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@
1111

1212
#include "fwd.hh"
1313
#include "logical_time.hh"
14+
#include "scheduler.hh"
1415
#include "time.hh"
15-
#include <atomic>
16-
#include <condition_variable>
1716
#include <functional>
18-
#include <mutex>
1917

2018
namespace reactor {
2119

@@ -38,36 +36,38 @@ public:
3836
return tag.time_point() < physical_time;
3937
}
4038

41-
static inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
42-
const std::function<bool(void)>& abort_waiting) {
39+
static inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, Scheduler* scheduler,
40+
const std::function<bool(void)>& abort_waiting) -> bool {
4341
if (try_acquire_tag(tag)) {
4442
return true;
4543
}
46-
return !cv.wait_until(lock, tag.time_point(), abort_waiting);
44+
return scheduler->wait_until(lock, tag.time_point(), abort_waiting);
4745
}
4846
};
4947

5048
class LogicalTimeBarrier {
51-
std::mutex mutex_;
49+
private:
5250
LogicalTime released_time_;
51+
Scheduler* scheduler_;
5352

5453
public:
54+
LogicalTimeBarrier(Scheduler* scheduler)
55+
: scheduler_(scheduler) {}
56+
5557
inline void release_tag(const LogicalTime& tag) {
56-
std::lock_guard lock(mutex_);
58+
auto lock = scheduler_->lock();
5759
released_time_.advance_to(tag);
5860
}
5961

60-
inline auto try_acquire_tag(const Tag& tag) {
61-
std::lock_guard lock(mutex_);
62-
return tag <= released_time_;
63-
}
62+
// The caller must hold a lock on the scheduler mutex
63+
inline auto try_acquire_tag(const Tag& tag) { return tag <= released_time_; }
6464

65-
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
65+
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
6666
const std::function<bool(void)>& abort_waiting) -> bool {
6767
if (try_acquire_tag(tag)) {
6868
return true;
6969
}
70-
cv.wait(lock, [this, &tag, &abort_waiting]() { return try_acquire_tag(tag) || abort_waiting(); });
70+
scheduler_->wait(lock, [this, &tag, &abort_waiting]() { return try_acquire_tag(tag) || abort_waiting(); });
7171
return !abort_waiting();
7272
}
7373
};

lib/scheduler.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ void Scheduler::next() { // NOLINT
368368
// synchronize with physical time if not in fast forward mode
369369
if (!environment_->fast_fwd_execution()) {
370370
bool result = PhysicalTimeBarrier::acquire_tag(
371-
t_next, lock, cv_schedule_, [&t_next, this]() { return t_next != event_queue_.next_tag(); });
371+
t_next, lock, this, [&t_next, this]() { return t_next != event_queue_.next_tag(); });
372372
// If acquire tag returns false, then a new event was inserted into the queue and we need to start over
373373
if (!result) {
374374
continue;
@@ -378,8 +378,8 @@ void Scheduler::next() { // NOLINT
378378
// Wait until all input actions mark the tag as safe to process.
379379
bool result{true};
380380
for (auto* action : environment_->input_actions_) {
381-
bool inner_result = action->acquire_tag(t_next, lock, cv_schedule_,
382-
[&t_next, this]() { return t_next != event_queue_.next_tag(); });
381+
bool inner_result =
382+
action->acquire_tag(t_next, lock, [&t_next, this]() { return t_next != event_queue_.next_tag(); });
383383
// If the wait was aborted or if the next tag changed in the meantime,
384384
// we need to break from the loop and continue with the main loop.
385385
if (!inner_result || t_next != event_queue_.next_tag()) {

0 commit comments

Comments
 (0)