Skip to content

Commit cac3d75

Browse files
committed
fixing startup in non-federated
1 parent 565403b commit cac3d75

File tree

6 files changed

+51
-37
lines changed

6 files changed

+51
-37
lines changed

include/reactor-uc/environment.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,4 +150,7 @@ struct Environment {
150150
void Environment_ctor(Environment *self, Reactor *main, Scheduler *scheduler, bool fast_mode);
151151
void Environment_free(Environment *self);
152152

153+
void Environment_schedule_startups(const Environment *self, tag_t start_tag);
154+
void Environment_schedule_timers(Environment *self, const Reactor *reactor, tag_t start_tag);
155+
153156
#endif

include/reactor-uc/startup_coordinator.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "reactor-uc/error.h"
55
#include "reactor-uc/tag.h"
66
#include "reactor-uc/event.h"
7+
#include "reactor-uc/reactor.h"
78
#include "proto/message.pb.h"
89

910
typedef struct StartupCoordinator StartupCoordinator;
@@ -53,4 +54,7 @@ void StartupCoordinator_ctor(StartupCoordinator *self, Environment *env, Neighbo
5354
size_t payload_size, void *payload_buf, bool *payload_used_buf,
5455
size_t payload_buf_capacity);
5556

57+
void StartupCoordinator_schedule_startups(const StartupCoordinator *self, const tag_t start_tag);
58+
void StartupCoordinator_schedule_timers(StartupCoordinator *self, const Reactor *reactor, const tag_t start_tag);
59+
5660
#endif // REACTOR_UC_STARTUP_COORDINATOR_H

src/environment.c

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,32 @@
44
#else
55
#include "./environments/unfederated_environment.c"
66
#endif
7+
8+
#include "reactor-uc/timer.h"
9+
10+
void Environment_schedule_startups(const Environment *self, const tag_t start_tag) {
11+
if (self->startup) {
12+
LF_DEBUG(FED, "Scheduling Startup Reactions at" PRINTF_TAG, start_tag);
13+
Event event = EVENT_INIT(start_tag, &self->startup->super, NULL);
14+
LF_INFO(FED, "Self: %p Scheduler: %p", self, self->scheduler);
15+
lf_ret_t ret = self->scheduler->schedule_at(self->scheduler, &event);
16+
validate(ret == LF_OK);
17+
}
18+
}
19+
20+
void Environment_schedule_timers(Environment *self, const Reactor *reactor, const tag_t start_tag) {
21+
lf_ret_t ret;
22+
for (size_t i = 0; i < reactor->triggers_size; i++) {
23+
Trigger *trigger = reactor->triggers[i];
24+
if (trigger->type == TRIG_TIMER) {
25+
Timer *timer = (Timer *)trigger;
26+
tag_t tag = {.time = start_tag.time + timer->offset, .microstep = start_tag.microstep};
27+
Event event = EVENT_INIT(tag, trigger, NULL);
28+
ret = self->scheduler->schedule_at(self->scheduler, &event);
29+
validate(ret == LF_OK);
30+
}
31+
}
32+
for (size_t i = 0; i < reactor->children_size; i++) {
33+
Environment_schedule_timers(self, reactor->children[i], start_tag);
34+
}
35+
}

src/environments/unfederated_environment.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "reactor-uc/reactor.h"
44
#include "reactor-uc/scheduler.h"
55
#include "reactor-uc/queues.h"
6+
#include "reactor-uc/startup_coordinator.h"
67
#include <assert.h>
78
#include <inttypes.h>
89

@@ -18,6 +19,11 @@ static void Environment_assemble(Environment *self) {
1819
static void Environment_start(Environment *self) {
1920
instant_t start_time = self->get_physical_time(self);
2021
LF_INFO(ENV, "Starting program at " PRINTF_TIME " nsec", start_time);
22+
tag_t start_tag = {.time = start_time, .microstep = 0};
23+
self->scheduler->prepare_timestep(self->scheduler, NEVER_TAG);
24+
Environment_schedule_startups(self, start_tag);
25+
Environment_schedule_timers(self, self->main, start_tag);
26+
self->scheduler->prepare_timestep(self->scheduler, start_tag);
2127
self->scheduler->set_and_schedule_start_tag(self->scheduler, start_time);
2228
self->scheduler->run(self->scheduler);
2329
}

src/schedulers/dynamic/scheduler.c

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ void Scheduler_prepare_timestep(Scheduler *untyped_self, tag_t tag) {
105105
self->current_tag = tag;
106106
MUTEX_UNLOCK(self->mutex);
107107

108-
LF_DEBUG(SCHED, "Preparing timestep for tag " PRINTF_TAG " object %p", self->current_tag, self);
109108
self->reaction_queue->reset(self->reaction_queue);
110109
}
111110

@@ -354,9 +353,7 @@ void Scheduler_run(Scheduler *untyped_self) {
354353
lf_ret_t Scheduler_schedule_at(Scheduler *super, Event *event) {
355354
DynamicScheduler *self = (DynamicScheduler *)super;
356355
lf_ret_t ret;
357-
LF_DEBUG(SCHED, "Scheduler_schedule_at %p", self);
358-
LF_DEBUG(SCHED, "scheduling at intended: " PRINTF_TAG, event->intended_tag);
359-
LF_DEBUG(SCHED, "scheduling current tag: " PRINTF_TAG, self->current_tag);
356+
360357
// This can be called from the async context and the channel context. It reads stop_tag, current_tag, start_time
361358
// and more and we lock the scheduler mutex before doing anything.
362359
MUTEX_LOCK(self->mutex);
@@ -369,7 +366,7 @@ lf_ret_t Scheduler_schedule_at(Scheduler *super, Event *event) {
369366
goto unlock_and_return;
370367
}
371368

372-
// Check if we are tring to schedule into the past
369+
// Check if we are trying to schedule into the past
373370
if (lf_tag_compare(event->super.tag, self->current_tag) <= 0) {
374371
LF_WARN(SCHED, "Trying to schedule event at tag " PRINTF_TAG " which is before current tag " PRINTF_TAG,
375372
event->super.tag, self->current_tag);

src/startup_coordinator.c

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -57,33 +57,6 @@ static lf_ret_t StartupCoordinator_connect_to_neighbors_blocking(StartupCoordina
5757
return LF_OK;
5858
}
5959

60-
void StartupCoordinator_schedule_startups(const StartupCoordinator *self, const tag_t start_tag) {
61-
if (self->env->startup) {
62-
LF_DEBUG(FED, "Scheduling Startup Reactions at" PRINTF_TAG, start_tag);
63-
Event event = EVENT_INIT(start_tag, &self->env->startup->super, NULL);
64-
LF_INFO(FED, "Self: %p Scheduler: %p", self, self->env->scheduler);
65-
lf_ret_t ret = self->env->scheduler->schedule_at(self->env->scheduler, &event);
66-
validate(ret == LF_OK);
67-
}
68-
}
69-
70-
void StartupCoordinator_schedule_timers(StartupCoordinator *self, const Reactor *reactor, const tag_t start_tag) {
71-
lf_ret_t ret;
72-
for (size_t i = 0; i < reactor->triggers_size; i++) {
73-
Trigger *trigger = reactor->triggers[i];
74-
if (trigger->type == TRIG_TIMER) {
75-
Timer *timer = (Timer *)trigger;
76-
tag_t tag = {.time = start_tag.time + timer->offset, .microstep = start_tag.microstep};
77-
Event event = EVENT_INIT(tag, trigger, NULL);
78-
ret = self->env->scheduler->schedule_at(self->env->scheduler, &event);
79-
validate(ret == LF_OK);
80-
}
81-
}
82-
for (size_t i = 0; i < reactor->children_size; i++) {
83-
StartupCoordinator_schedule_timers(self, reactor->children[i], start_tag);
84-
}
85-
}
86-
8760
void StartupCoordinator_schedule_timers_joining(StartupCoordinator *self, Reactor *reactor,
8861
interval_t federation_start_time, interval_t join_time) {
8962
lf_ret_t ret;
@@ -368,8 +341,8 @@ static void StartupCoordinator_handle_start_time_proposal(StartupCoordinator *se
368341
self->state = StartupCoordinationState_RUNNING;
369342
self->env->scheduler->set_and_schedule_start_tag(self->env->scheduler, self->start_time_proposal);
370343
tag_t start_tag = {.time = self->start_time_proposal, .microstep = 0};
371-
StartupCoordinator_schedule_startups(self, start_tag);
372-
StartupCoordinator_schedule_timers(self, self->env->main, start_tag);
344+
Environment_schedule_startups(self->env, start_tag);
345+
Environment_schedule_timers(self->env, self->env->main, start_tag);
373346
} else {
374347
self->start_time_proposal_step++;
375348
send_start_time_proposal(self, self->start_time_proposal, self->start_time_proposal_step);
@@ -465,16 +438,16 @@ static void StartupCoordinator_handle_start_time_response(StartupCoordinator *se
465438
tag_t start_tag = {.time = joining_time, .microstep = 0};
466439
LF_INFO(FED, "Policy: IMMEDIATELY Scheduling join_time: " PRINTF_TIME, joining_time);
467440
self->env->scheduler->prepare_timestep(self->env->scheduler, NEVER_TAG);
468-
StartupCoordinator_schedule_startups(self, start_tag);
469-
StartupCoordinator_schedule_timers(self, self->env->main, start_tag);
441+
Environment_schedule_startups(self->env, start_tag);
442+
Environment_schedule_timers(self->env, self->env->main, start_tag);
470443
self->env->scheduler->prepare_timestep(self->env->scheduler, start_tag);
471444
self->env->scheduler->set_and_schedule_start_tag(self->env->scheduler, joining_time);
472445
} else if (self->joining_policy == JOIN_ALIGNED_WITH_SHORT_TIMER) {
473446
joining_time = max_logical_time + MSEC(50);
474447
tag_t start_tag = {.time = joining_time, .microstep = 0};
475448
LF_INFO(FED, "Policy: Timer Aligned Scheduling join_time: " PRINTF_TIME, joining_time);
476449
self->env->scheduler->prepare_timestep(self->env->scheduler, NEVER_TAG);
477-
StartupCoordinator_schedule_startups(self, start_tag);
450+
Environment_schedule_startups(self->env, start_tag);
478451
StartupCoordinator_schedule_timers_joining(self, self->env->main, start_time, joining_time);
479452
self->env->scheduler->prepare_timestep(self->env->scheduler, start_tag);
480453
self->env->scheduler->set_and_schedule_start_tag(self->env->scheduler, joining_time);
@@ -542,6 +515,8 @@ void StartupCoordinator_ctor(StartupCoordinator *self, Environment *env, Neighbo
542515
self->start_time_proposal = NEVER;
543516
self->joining_policy = joining_policy;
544517
for (size_t i = 0; i < self->num_neighbours; i++) {
518+
self->neighbor_state[i].core_federate = true;
519+
self->neighbor_state[i].current_logical_time = 0;
545520
self->neighbor_state[i].handshake_response_received = false;
546521
self->neighbor_state[i].handshake_request_received = false;
547522
self->neighbor_state[i].handshake_response_sent = false;

0 commit comments

Comments
 (0)