Skip to content

Commit b607f1f

Browse files
authored
Merge pull request #49 from lf-lang/enclave-fix
Fix race condition in time barriers
2 parents a13eda6 + 565835a commit b607f1f

File tree

13 files changed

+131
-86
lines changed

13 files changed

+131
-86
lines changed

include/reactor-cpp/action.hh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,11 @@ protected:
4747
* Returns false if the wait was interrupted and true otherwise. True
4848
* indicates that the tag is safe to process.
4949
*/
50-
virtual auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
50+
virtual auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
5151
const std::function<bool(void)>& abort_waiting) -> bool {
5252
reactor_assert(!logical_);
53-
return PhysicalTimeBarrier::acquire_tag(tag, lock, cv, abort_waiting);
53+
reactor_assert(lock.owns_lock());
54+
return PhysicalTimeBarrier::acquire_tag(tag, lock, environment()->scheduler(), abort_waiting);
5455
}
5556

5657
BaseAction(const std::string& name, Reactor* container, bool logical, Duration min_delay)

include/reactor-cpp/assert.hh

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ constexpr bool runtime_assertion = false;
2121
constexpr bool runtime_assertion = true;
2222
#endif
2323

24-
#include "environment.hh"
24+
#include "fwd.hh"
2525

2626
#include <cassert>
2727
#include <sstream>
@@ -37,7 +37,6 @@ constexpr bool runtime_assertion = true;
3737
#define reactor_assert(x) assert(x)
3838

3939
namespace reactor {
40-
using EnvPhase = Environment::Phase;
4140

4241
class ValidationError : public std::runtime_error {
4342
private:
@@ -74,31 +73,8 @@ template <typename E> constexpr auto extract_value(E enum_value) -> typename std
7473
return static_cast<typename std::underlying_type<E>::type>(enum_value);
7574
}
7675

77-
inline void assert_phase([[maybe_unused]] const ReactorElement* ptr, [[maybe_unused]] EnvPhase phase) {
78-
if constexpr (runtime_assertion) { // NOLINT
79-
if (ptr->environment()->phase() != phase) {
80-
auto enum_value_to_name = [](EnvPhase phase) -> std::string {
81-
const std::map<EnvPhase, std::string> conversation_map = {
82-
// NOLINT
83-
{EnvPhase::Construction, "Construction"}, {EnvPhase::Assembly, "Assembly"},
84-
{EnvPhase::Startup, "Startup"}, {EnvPhase::Execution, "Execution"},
85-
{EnvPhase::Shutdown, "Shutdown"}, {EnvPhase::Deconstruction, "Deconstruction"}};
86-
// in C++20 use .contains()
87-
if (conversation_map.find(phase) != std::end(conversation_map)) {
88-
return conversation_map.at(phase);
89-
}
90-
return "Unknown Phase: Value: " + std::to_string(extract_value(phase));
91-
};
92-
#ifdef __linux__
93-
print_debug_backtrace();
94-
#endif
76+
void assert_phase([[maybe_unused]] const ReactorElement* ptr, [[maybe_unused]] Phase phase);
9577

96-
// C++20 std::format
97-
throw ValidationError("Expected Phase: " + enum_value_to_name(phase) +
98-
" Current Phase: " + enum_value_to_name(ptr->environment()->phase()));
99-
}
100-
}
101-
}
10278
} // namespace reactor
10379

10480
#endif // REACTOR_CPP_ASSERT_HH

include/reactor-cpp/connection.hh

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,14 @@ protected:
110110

111111
EnclaveConnection(const std::string& name, Environment* enclave, const Duration& delay)
112112
: BaseDelayedConnection<T>(name, enclave, false, delay)
113-
, log_{this->fqn()} {}
113+
, log_{this->fqn()}
114+
, logical_time_barrier_(enclave->scheduler()) {}
114115

115116
public:
116117
EnclaveConnection(const std::string& name, Environment* enclave)
117118
: BaseDelayedConnection<T>(name, enclave, false, Duration::zero())
118-
, log_{this->fqn()} {}
119+
, log_{this->fqn()}
120+
, logical_time_barrier_(enclave->scheduler()) {}
119121

120122
inline auto upstream_set_callback() noexcept -> PortCallback override {
121123
return [this](const BasePort& port) {
@@ -136,8 +138,9 @@ public:
136138
};
137139
}
138140

139-
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
141+
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
140142
const std::function<bool(void)>& abort_waiting) -> bool override {
143+
reactor_assert(lock.owns_lock());
141144
log_.debug() << "downstream tries to acquire tag " << tag;
142145

143146
if (this->upstream_port() == nullptr) {
@@ -164,7 +167,7 @@ public:
164167
}
165168

166169
// Wait until we receive a release_tag message from upstream
167-
return logical_time_barrier_.acquire_tag(tag, lock, cv, abort_waiting);
170+
return logical_time_barrier_.acquire_tag(tag, lock, abort_waiting);
168171
}
169172

170173
void bind_upstream_port(Port<T>* port) override {
@@ -201,15 +204,15 @@ public:
201204
};
202205
}
203206

204-
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
207+
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
205208
const std::function<bool(void)>& abort_waiting) -> bool override {
206209
// Since this is a delayed connection, we can go back in time and need to
207210
// acquire the latest upstream tag that can create an event at the given
208211
// tag. We also need to consider that given a delay d and a tag g=(t, n),
209212
// for any value of n, g + d = (t, 0). Hence, we need to quire a tag with
210213
// the highest possible microstep value.
211214
auto upstream_tag = tag.subtract(this->min_delay());
212-
return EnclaveConnection<T>::acquire_tag(upstream_tag, lock, cv, abort_waiting);
215+
return EnclaveConnection<T>::acquire_tag(upstream_tag, lock, abort_waiting);
213216
}
214217
};
215218

@@ -230,10 +233,10 @@ public:
230233
};
231234
}
232235

233-
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
236+
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
234237
const std::function<bool(void)>& abort_waiting) -> bool override {
235238
this->log_.debug() << "downstream tries to acquire tag " << tag;
236-
return PhysicalTimeBarrier::acquire_tag(tag, lock, cv, abort_waiting);
239+
return PhysicalTimeBarrier::acquire_tag(tag, lock, this->environment()->scheduler(), abort_waiting);
237240
}
238241

239242
void bind_upstream_port(Port<T>* port) override { Connection<T>::bind_upstream_port(port); }

include/reactor-cpp/environment.hh

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@ constexpr unsigned int default_max_reaction_index = 0;
2626
constexpr bool default_run_forever = false;
2727
constexpr bool default_fast_fwd_execution = false;
2828

29-
class Environment {
30-
public:
31-
enum class Phase { Construction = 0, Assembly = 1, Startup = 2, Execution = 3, Shutdown = 4, Deconstruction = 5 };
29+
enum class Phase { Construction = 0, Assembly = 1, Startup = 2, Execution = 3, Shutdown = 4, Deconstruction = 5 };
3230

31+
class Environment {
3332
private:
3433
using Dependency = std::pair<Reaction*, Reaction*>;
3534

include/reactor-cpp/fwd.hh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ namespace reactor {
1616
class BaseAction;
1717
class BasePort;
1818
class Environment;
19+
enum class Phase;
1920
class Reaction;
2021
class Reactor;
22+
class ReactorElement;
2123
class Scheduler;
2224
class Tag;
2325

include/reactor-cpp/scheduler.hh

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
#include <thread>
2020
#include <vector>
2121

22+
#include "assert.hh"
2223
#include "fwd.hh"
2324
#include "logical_time.hh"
2425
#include "reactor-cpp/logging.hh"
2526
#include "reactor-cpp/time.hh"
2627
#include "safe_vector.hh"
2728
#include "semaphore.hh"
29+
#include "time.hh"
2830

2931
namespace reactor {
3032

@@ -176,9 +178,19 @@ public:
176178
auto schedule_async_at(BaseAction* action, const Tag& tag) -> bool;
177179
auto schedule_empty_async_at(const Tag& tag) -> bool;
178180

181+
auto inline lock() noexcept -> std::unique_lock<std::mutex> {
182+
return std::unique_lock<std::mutex>(scheduling_mutex_);
183+
}
179184
void inline notify() noexcept { cv_schedule_.notify_one(); }
180-
181-
auto inline lock() noexcept -> auto { return std::unique_lock<std::mutex>(scheduling_mutex_); }
185+
void inline wait(std::unique_lock<std::mutex>& lock, const std::function<bool(void)>& predicate) noexcept {
186+
reactor_assert(lock.owns_lock());
187+
cv_schedule_.wait(lock, predicate);
188+
};
189+
auto inline wait_until(std::unique_lock<std::mutex>& lock, TimePoint time_point,
190+
const std::function<bool(void)>& predicate) noexcept -> bool {
191+
reactor_assert(lock.owns_lock());
192+
return cv_schedule_.wait_until(lock, time_point, predicate);
193+
};
182194

183195
void set_port(BasePort* port);
184196

include/reactor-cpp/time_barrier.hh

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@
1111

1212
#include "fwd.hh"
1313
#include "logical_time.hh"
14+
#include "scheduler.hh"
1415
#include "time.hh"
15-
#include <atomic>
16-
#include <condition_variable>
1716
#include <functional>
18-
#include <mutex>
1917

2018
namespace reactor {
2119

@@ -38,36 +36,55 @@ public:
3836
return tag.time_point() < physical_time;
3937
}
4038

41-
static inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
42-
const std::function<bool(void)>& abort_waiting) {
39+
static inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, Scheduler* scheduler,
40+
const std::function<bool(void)>& abort_waiting) -> bool {
4341
if (try_acquire_tag(tag)) {
4442
return true;
4543
}
46-
return !cv.wait_until(lock, tag.time_point(), abort_waiting);
44+
return !scheduler->wait_until(lock, tag.time_point(), abort_waiting);
4745
}
4846
};
4947

5048
class LogicalTimeBarrier {
51-
std::mutex mutex_;
49+
private:
5250
LogicalTime released_time_;
51+
Scheduler* scheduler_;
5352

53+
/*
54+
* Note that we use the main scheduler lock to protect our `released_time_`
55+
* variable. We cannot use a local mutex here that only protects our local
56+
* state. This is, because we also need to use a lock to wait on a condition
57+
* variable. This lock must be the same lock as the one that is used when
58+
* updating the wait condition. (See "An atomic predicate" in
59+
* https://www.modernescpp.com/index.php/c-core-guidelines-be-aware-of-the-traps-of-condition-variables)
60+
*
61+
* Here we have two conditions in which we want to stop the wait:
62+
* 1. When the tag that we are waiting for was released.
63+
* 2. When the given function `abort_waiting` returns true. In a typical usage
64+
* scenario, this would be the case when a new event was scheduled.
65+
*
66+
* We need to make sure that both conditions are updated while holding the
67+
* lock that we use for waiting on the condition variable. Our only option for
68+
* this is to use the global scheduler lock.
69+
*/
5470
public:
71+
LogicalTimeBarrier(Scheduler* scheduler)
72+
: scheduler_(scheduler) {}
73+
5574
inline void release_tag(const LogicalTime& tag) {
56-
std::lock_guard lock(mutex_);
75+
auto lock = scheduler_->lock();
5776
released_time_.advance_to(tag);
5877
}
5978

60-
inline auto try_acquire_tag(const Tag& tag) {
61-
std::lock_guard lock(mutex_);
62-
return tag <= released_time_;
63-
}
79+
// The caller must hold a lock on the scheduler mutex
80+
inline auto try_acquire_tag(const Tag& tag) { return tag <= released_time_; }
6481

65-
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock, std::condition_variable& cv,
82+
inline auto acquire_tag(const Tag& tag, std::unique_lock<std::mutex>& lock,
6683
const std::function<bool(void)>& abort_waiting) -> bool {
6784
if (try_acquire_tag(tag)) {
6885
return true;
6986
}
70-
cv.wait(lock, [this, &tag, &abort_waiting]() { return try_acquire_tag(tag) || abort_waiting(); });
87+
scheduler_->wait(lock, [this, &tag, &abort_waiting]() { return try_acquire_tag(tag) || abort_waiting(); });
7188
return !abort_waiting();
7289
}
7390
};

lib/action.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace reactor {
1818
void BaseAction::register_trigger(Reaction* reaction) {
1919
reactor_assert(reaction != nullptr);
2020
reactor_assert(this->environment() == reaction->environment());
21-
assert_phase(this, Environment::Phase::Assembly);
21+
assert_phase(this, Phase::Assembly);
2222
validate(this->container() == reaction->container(),
2323
"Action triggers must belong to the same reactor as the triggered "
2424
"reaction");
@@ -29,7 +29,7 @@ void BaseAction::register_trigger(Reaction* reaction) {
2929
void BaseAction::register_scheduler(Reaction* reaction) {
3030
reactor_assert(reaction != nullptr);
3131
reactor_assert(this->environment() == reaction->environment());
32-
assert_phase(this, Environment::Phase::Assembly);
32+
assert_phase(this, Phase::Assembly);
3333
// the reaction must belong to the same reactor as this action
3434
validate(this->container() == reaction->container(), "Scheduable actions must belong to the same reactor as the "
3535
"triggered reaction");

lib/assert.cc

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*/
88

99
#include "reactor-cpp/assert.hh"
10+
#include "reactor-cpp/environment.hh"
1011

1112
namespace reactor {
1213

@@ -16,4 +17,30 @@ auto ValidationError::build_message(const std::string_view msg) noexcept -> std:
1617
return string_stream.str();
1718
}
1819

20+
void assert_phase([[maybe_unused]] const ReactorElement* ptr, [[maybe_unused]] Phase phase) {
21+
if constexpr (runtime_assertion) { // NOLINT
22+
if (ptr->environment()->phase() != phase) {
23+
auto enum_value_to_name = [](Phase phase) -> std::string {
24+
const std::map<Phase, std::string> conversation_map = {
25+
// NOLINT
26+
{Phase::Construction, "Construction"}, {Phase::Assembly, "Assembly"},
27+
{Phase::Startup, "Startup"}, {Phase::Execution, "Execution"},
28+
{Phase::Shutdown, "Shutdown"}, {Phase::Deconstruction, "Deconstruction"}};
29+
// in C++20 use .contains()
30+
if (conversation_map.find(phase) != std::end(conversation_map)) {
31+
return conversation_map.at(phase);
32+
}
33+
return "Unknown Phase: Value: " + std::to_string(extract_value(phase));
34+
};
35+
#ifdef __linux__
36+
print_debug_backtrace();
37+
#endif
38+
39+
// C++20 std::format
40+
throw ValidationError("Expected Phase: " + enum_value_to_name(phase) +
41+
" Current Phase: " + enum_value_to_name(ptr->environment()->phase()));
42+
}
43+
}
44+
}
45+
1946
} // namespace reactor

lib/port.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ void BasePort::base_bind_to(BasePort* port) {
2020
reactor_assert(this->environment() == port->environment());
2121
validate(!port->has_inward_binding(), "Ports may only be connected once");
2222
validate(!port->has_anti_dependencies(), "Ports with anti dependencies may not be connected to other ports");
23-
assert_phase(this, Environment::Phase::Assembly);
23+
assert_phase(this, Phase::Assembly);
2424
if (this->is_input() && port->is_input()) {
2525
validate(this->container() == port->container()->container(),
2626
"An input port A may only be bound to another input port B if B is contained by a reactor that in turn is "
@@ -52,7 +52,7 @@ void BasePort::register_dependency(Reaction* reaction, bool is_trigger) noexcept
5252
reactor_assert(reaction != nullptr);
5353
reactor_assert(this->environment() == reaction->environment());
5454
validate(!this->has_outward_bindings(), "Dependencies may no be declared on ports with an outward binding!");
55-
assert_phase(this, Environment::Phase::Assembly);
55+
assert_phase(this, Phase::Assembly);
5656

5757
if (this->is_input()) {
5858
validate(this->container() == reaction->container(), "Dependent input ports must belong to the same reactor as the "
@@ -74,7 +74,7 @@ void BasePort::register_antidependency(Reaction* reaction) noexcept {
7474
reactor_assert(reaction != nullptr);
7575
reactor_assert(this->environment() == reaction->environment());
7676
validate(!this->has_inward_binding(), "Antidependencies may no be declared on ports with an inward binding!");
77-
assert_phase(this, Environment::Phase::Assembly);
77+
assert_phase(this, Phase::Assembly);
7878

7979
if (this->is_output()) {
8080
validate(this->container() == reaction->container(),

0 commit comments

Comments
 (0)