Skip to content

Commit 997421c

Browse files
committed
Fix timeout so shutdown is invoked only once
This fixes lf-lang/lingua-franca#1955 and makes the overall implementation more robust.
1 parent 521f1d8 commit 997421c

File tree

5 files changed

+28
-16
lines changed

5 files changed

+28
-16
lines changed

include/reactor-cpp/environment.hh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ private:
5858
Tag start_tag_{};
5959

6060
const Duration timeout_{};
61+
Tag timeout_tag_{};
6162

6263
Graph<BasePort*, ConnectionProperties> graph_{};
6364
Graph<BasePort*, ConnectionProperties> optimized_graph_{};
@@ -118,6 +119,7 @@ public:
118119
[[nodiscard]] auto logical_time() const noexcept -> const LogicalTime& { return scheduler_.logical_time(); }
119120
[[nodiscard]] auto start_tag() const noexcept -> const Tag& { return start_tag_; }
120121
[[nodiscard]] auto timeout() const noexcept -> const Duration& { return timeout_; }
122+
[[nodiscard]] auto timeout_tag() const noexcept -> const Tag& { return timeout_tag_; }
121123

122124
static auto physical_time() noexcept -> TimePoint { return get_physical_time(); }
123125

include/reactor-cpp/logical_time.hh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public:
4343
[[nodiscard]] auto delay(Duration offset = Duration::zero()) const noexcept -> Tag;
4444
[[nodiscard]] auto subtract(Duration offset = Duration::zero()) const noexcept -> Tag;
4545
[[nodiscard]] auto decrement() const noexcept -> Tag;
46+
47+
[[nodiscard]] static auto max() noexcept -> Tag { return {TimePoint::max(), std::numeric_limits<mstep_t>::max()}; }
4648
};
4749

4850
// define all the comparison operators

lib/action.cc

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,11 @@ void Timer::cleanup() noexcept {
7171
ShutdownTrigger::ShutdownTrigger(const std::string& name, Reactor* container)
7272
: Timer(name, container, Duration::zero(), container->environment()->timeout()) {}
7373

74-
void ShutdownTrigger::setup() noexcept {
75-
BaseAction::setup();
76-
environment()->sync_shutdown();
77-
}
74+
void ShutdownTrigger::setup() noexcept { BaseAction::setup(); }
7875

7976
void ShutdownTrigger::shutdown() {
80-
if (!is_present()) {
81-
Tag tag = Tag::from_logical_time(environment()->logical_time()).delay();
82-
environment()->scheduler()->schedule_sync(this, tag);
83-
}
77+
Tag tag = Tag::from_logical_time(environment()->logical_time()).delay();
78+
environment()->scheduler()->schedule_sync(this, tag);
8479
}
8580

8681
auto Action<void>::schedule_at(const Tag& tag) -> bool {

lib/environment.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,6 +330,12 @@ auto Environment::startup(const TimePoint& start_time) -> std::thread {
330330
phase_ = Phase::Startup;
331331

332332
this->start_tag_ = Tag::from_physical_time(start_time);
333+
if (this->timeout_ == Duration::max()) {
334+
this->timeout_tag_ = Tag::max();
335+
} else {
336+
this->timeout_tag_ = this->start_tag_.delay(this->timeout_);
337+
}
338+
333339
// start up initialize all reactors
334340
for (auto* reactor : top_level_reactors_) {
335341
reactor->startup();

lib/scheduler.cc

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,17 +91,17 @@ void Scheduler::schedule() noexcept {
9191
bool found_ready_reactions = schedule_ready_reactions();
9292

9393
while (!found_ready_reactions) {
94-
log_.debug() << "call next()";
95-
next();
96-
reaction_queue_pos_ = 0;
97-
98-
found_ready_reactions = schedule_ready_reactions();
99-
10094
if (!continue_execution_ && !found_ready_reactions) {
10195
// let all workers know that they should terminate
10296
terminate_all_workers();
10397
break;
10498
}
99+
100+
log_.debug() << "call next()";
101+
next();
102+
reaction_queue_pos_ = 0;
103+
104+
found_ready_reactions = schedule_ready_reactions();
105105
}
106106
}
107107

@@ -354,8 +354,8 @@ void Scheduler::next() { // NOLINT
354354
log_.debug() << "Shutting down the scheduler";
355355
Tag t_next = Tag::from_logical_time(logical_time_).delay();
356356
if (!event_queue_.empty() && t_next == event_queue_.next_tag()) {
357-
log_.debug() << "Schedule the last round of reactions including all "
358-
"termination reactions";
357+
log_.debug() << "Trigger the last round of reactions including all "
358+
"shutdown reactions";
359359
triggered_actions_ = event_queue_.extract_next_event();
360360
advance_logical_time_to(t_next);
361361
} else {
@@ -364,6 +364,13 @@ void Scheduler::next() { // NOLINT
364364
} else {
365365
auto t_next = event_queue_.next_tag();
366366

367+
if (t_next == environment_->timeout_tag()) {
368+
continue_execution_ = false;
369+
log_.debug() << "Shutting down the scheduler due to timeout";
370+
log_.debug() << "Trigger the last round of reactions including all "
371+
"shutdwon reactions";
372+
}
373+
367374
// synchronize with physical time if not in fast forward mode
368375
if (!environment_->fast_fwd_execution()) {
369376
log_.debug() << "acquire tag " << t_next << " from physical time barrier";

0 commit comments

Comments
 (0)