Skip to content

Commit 55d9fd2

Browse files
committed
working version
1 parent f6a922e commit 55d9fd2

File tree

6 files changed

+79
-40
lines changed

6 files changed

+79
-40
lines changed

external/proto/message.pb.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* Automatically generated nanopb constant definitions */
2-
/* Generated by nanopb-0.4.9 */
2+
/* Generated by nanopb-0.4.9.1 */
33

44
#include "./message.pb.h"
55
#if PB_PROTO_HEADER_VERSION != 40

external/proto/message.pb.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* Automatically generated nanopb header */
2-
/* Generated by nanopb-0.4.9 */
2+
/* Generated by nanopb-0.4.9.1 */
33

44
#ifndef PB_EXTERNAL_PROTO_MESSAGE_PB_H_INCLUDED
55
#define PB_EXTERNAL_PROTO_MESSAGE_PB_H_INCLUDED
@@ -58,7 +58,7 @@ typedef struct _StartTimeRequest {
5858

5959
/* The response to a StartTimeRequest, telling the transient federate the start time of the federation. */
6060
typedef struct _StartTimeResponse {
61-
int64_t current_logical_time;
61+
int64_t elapsed_logical_time;
6262
int64_t federation_start_time;
6363
} StartTimeResponse;
6464

@@ -218,7 +218,7 @@ extern "C" {
218218
#define StartupHandshakeResponse_state_tag 1
219219
#define StartTimeProposal_time_tag 1
220220
#define StartTimeProposal_step_tag 2
221-
#define StartTimeResponse_current_logical_time_tag 1
221+
#define StartTimeResponse_elapsed_logical_time_tag 1
222222
#define StartTimeResponse_federation_start_time_tag 2
223223
#define JoiningTimeAnnouncement_joining_time_tag 1
224224
#define StartupCoordination_startup_handshake_request_tag 1
@@ -282,7 +282,7 @@ X(a, STATIC, REQUIRED, UINT32, step, 2)
282282
#define StartTimeRequest_DEFAULT NULL
283283

284284
#define StartTimeResponse_FIELDLIST(X, a) \
285-
X(a, STATIC, REQUIRED, INT64, current_logical_time, 1) \
285+
X(a, STATIC, REQUIRED, INT64, elapsed_logical_time, 1) \
286286
X(a, STATIC, REQUIRED, INT64, federation_start_time, 2)
287287
#define StartTimeResponse_CALLBACK NULL
288288
#define StartTimeResponse_DEFAULT NULL

external/proto/message.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ message StartTimeRequest{ }
4242

4343
// The response to a StartTimeRequest, telling the transient federate the start time of the federation.
4444
message StartTimeResponse {
45-
required int64 current_logical_time = 1;
45+
required int64 elapsed_logical_time = 1;
4646
required int64 federation_start_time = 2;
4747
}
4848

include/reactor-uc/startup_coordinator.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ enum JoiningPolicy { JOIN_IMMEDIATELY = 0, JOIN_ALIGNED_WITH_SHORT_TIMER = 1, JO
1515

1616
/** Represents the state of a neighbor. */
1717
typedef struct {
18+
bool core_federate; // Whether this federate can be not available
1819
bool handshake_response_received; // Whether a handshake response has been received from this neighbor.
1920
bool handshake_request_received; // Whether a handshake response has been sent to this neighbor.
2021
bool handshake_response_sent; // Whether a handshake response has been sent to this neighbor.

src/schedulers/dynamic/scheduler.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,11 +100,12 @@ void Scheduler_register_for_cleanup(Scheduler *untyped_self, Trigger *trigger) {
100100
void Scheduler_prepare_timestep(Scheduler *untyped_self, tag_t tag) {
101101
DynamicScheduler *self = (DynamicScheduler *)untyped_self;
102102

103-
LF_DEBUG(SCHED, "Preparing timestep for tag " PRINTF_TAG, tag);
104103
// Before setting `current_tag` we must lock because it is read from async and channel context.
105104
MUTEX_LOCK(self->mutex);
106105
self->current_tag = tag;
107106
MUTEX_UNLOCK(self->mutex);
107+
108+
LF_DEBUG(SCHED, "Preparing timestep for tag " PRINTF_TAG " object %p", self->current_tag, self);
108109
self->reaction_queue->reset(self->reaction_queue);
109110
}
110111

@@ -355,6 +356,9 @@ void Scheduler_run(Scheduler *untyped_self) {
355356
lf_ret_t Scheduler_schedule_at(Scheduler *super, Event *event) {
356357
DynamicScheduler *self = (DynamicScheduler *)super;
357358
lf_ret_t ret;
359+
LF_DEBUG(SCHED, "Scheduler_schedule_at %p", self);
360+
LF_DEBUG(SCHED, "scheduling at intended: " PRINTF_TAG, event->intended_tag);
361+
LF_DEBUG(SCHED, "scheduling current tag: " PRINTF_TAG, self->current_tag);
358362
// This can be called from the async context and the channel context. It reads stop_tag, current_tag, start_time
359363
// and more and we lock the scheduler mutex before doing anything.
360364
MUTEX_LOCK(self->mutex);

src/startup_coordinator.c

Lines changed: 67 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,9 @@ static lf_ret_t StartupCoordinator_connect_to_neighbors_blocking(StartupCoordina
5959

6060
void StartupCoordinator_schedule_startups(const StartupCoordinator *self, const tag_t start_tag) {
6161
if (self->env->startup) {
62+
LF_DEBUG(FED, "Scheduling Startup Reactions at" PRINTF_TAG, start_tag);
6263
Event event = EVENT_INIT(start_tag, &self->env->startup->super, NULL);
64+
LF_INFO(FED, "Self: %p Scheduler: %p", self, self->env->scheduler);
6365
lf_ret_t ret = self->env->scheduler->schedule_at(self->env->scheduler, &event);
6466
validate(ret == LF_OK);
6567
}
@@ -385,6 +387,10 @@ static void StartupCoordinator_handle_start_time_request(StartupCoordinator *sel
385387
do {
386388
ret = chan->send_blocking(chan, &self->msg);
387389
} while (ret != LF_OK);
390+
391+
// We now schedule a system event here, because otherwise we will never detect no other federates responding
392+
StartupCoordinator_schedule_system_self_event(self, self->env->get_physical_time(self->env) + MSEC(250),
393+
StartupCoordination_start_time_response_tag);
388394
}
389395

390396
} else {
@@ -394,13 +400,13 @@ static void StartupCoordinator_handle_start_time_request(StartupCoordinator *sel
394400
NetworkChannel *chan = env->net_bundles[payload->neighbor_index]->net_channel;
395401
msg->which_message = FederateMessage_startup_coordination_tag;
396402
msg->message.startup_coordination.which_message = StartupCoordination_start_time_response_tag;
397-
msg->message.startup_coordination.message.start_time_response.current_logical_time =
403+
msg->message.startup_coordination.message.start_time_response.elapsed_logical_time =
398404
self->env->get_elapsed_logical_time(self->env);
399405
msg->message.startup_coordination.message.start_time_response.federation_start_time = self->start_time_proposal;
400406
chan->send_blocking(chan, msg);
401-
LF_INFO(FED, "SENDING TIME start_tag: " PRINTF_TIME " current_time: " PRINTF_TIME,
407+
LF_INFO(FED, "SENDING TIME start_tag: " PRINTF_TIME " elapsed_time: " PRINTF_TIME,
402408
msg->message.startup_coordination.message.start_time_response.federation_start_time,
403-
msg->message.startup_coordination.message.start_time_response.current_logical_time);
409+
msg->message.startup_coordination.message.start_time_response.elapsed_logical_time);
404410
break;
405411
}
406412
default:;
@@ -411,43 +417,71 @@ static void StartupCoordinator_handle_start_time_request(StartupCoordinator *sel
411417

412418

413419
static void StartupCoordinator_handle_start_time_response(StartupCoordinator *self, StartupEvent *payload) {
420+
if (self->start_time_proposal > 0) {
421+
return;
422+
}
423+
414424
if (payload->neighbor_index == NEIGHBOR_INDEX_SELF) {
415-
// SHOULD only come from other federates
416-
} else {
417-
self->neighbor_state[payload->neighbor_index].start_time_proposals_received++;
425+
bool got_no_response = true;
426+
for (size_t i = 0; i < self->num_neighbours; i++) {
427+
got_no_response = got_no_response && (self->neighbor_state[i].current_logical_time == 0);
428+
}
418429

419-
if (self->start_time_proposal > 0) {
430+
if (got_no_response) {
431+
LF_ERR(FED, "No other federate responded to the start time request! Shutting Down!");
432+
self->env->request_shutdown(self->env);
433+
}
434+
}
435+
436+
const instant_t start_time = payload->msg.message.start_time_response.federation_start_time;
437+
const interval_t elapsed_logical = payload->msg.message.start_time_response.elapsed_logical_time;
438+
const instant_t current_logical_time = start_time + elapsed_logical;
439+
validate(start_time < current_logical_time);
440+
441+
self->neighbor_state[payload->neighbor_index].current_logical_time = current_logical_time;
442+
self->neighbor_state[payload->neighbor_index].start_time_proposals_received++;
443+
444+
// checking if we got a response from all core federates
445+
for (size_t i = 0; i < self->num_neighbours; i++) {
446+
if (self->neighbor_state[i].core_federate && self->neighbor_state[i].current_logical_time == 0) {
420447
return;
421448
}
449+
}
422450

423-
const instant_t start_time = payload->msg.message.start_time_response.federation_start_time;
424-
const instant_t current_logical = payload->msg.message.start_time_response.current_logical_time;
425-
self->start_time_proposal = start_time;
426-
self->state = StartupCoordinationState_RUNNING;
427-
428-
instant_t joining_time = 0;
429-
430-
if (self->joining_policy == JOIN_IMMEDIATELY) {
431-
joining_time = current_logical + MSEC(50);
432-
tag_t start_tag = {.time = joining_time, .microstep = 0};
433-
self->env->scheduler->prepare_timestep(self->env->scheduler, start_tag);
434-
self->env->scheduler->set_and_schedule_start_tag(self->env->scheduler, joining_time);
435-
LF_INFO(FED, "Policy: IMMEDIATELY Scheduling start_tag: " PRINTF_TIME " join_time: " PRINTF_TIME, start_time, joining_time);
436-
StartupCoordinator_schedule_startups(self, start_tag);
437-
StartupCoordinator_schedule_timers(self, self->env->main, start_tag);
438-
} else if (self->joining_policy == JOIN_ALIGNED_WITH_SHORT_TIMER) {
439-
joining_time = current_logical + MSEC(50);
440-
tag_t start_tag = {.time = joining_time, .microstep = 0};
441-
self->env->scheduler->prepare_timestep(self->env->scheduler, start_tag);
442-
start_tag.time = joining_time + 1;
443-
self->env->scheduler->set_and_schedule_start_tag(self->env->scheduler, joining_time + 1);
444-
LF_INFO(FED, "Policy: Timer Aligned Scheduling start_tag: " PRINTF_TIME "join_time: " PRINTF_TIME, start_time, joining_time);
445-
StartupCoordinator_schedule_startups(self, start_tag);
446-
StartupCoordinator_schedule_timers_joining(self, self->env->main, start_time, joining_time + 1);
447-
} else {
448-
validate(false);
451+
// calculating the maximum logical time from all neighbors that responded
452+
interval_t max_logical_time = 0;
453+
for (size_t i = 0; i < self->num_neighbours; i++) {
454+
if (self->neighbor_state[i].current_logical_time > max_logical_time) {
455+
max_logical_time = self->neighbor_state[i].current_logical_time;
449456
}
450457
}
458+
459+
self->start_time_proposal = start_time;
460+
self->state = StartupCoordinationState_RUNNING;
461+
462+
instant_t joining_time = 0;
463+
464+
if (self->joining_policy == JOIN_IMMEDIATELY) {
465+
joining_time = max_logical_time + MSEC(50);
466+
tag_t start_tag = {.time = joining_time, .microstep = 0};
467+
LF_INFO(FED, "Policy: IMMEDIATELY Scheduling join_time: " PRINTF_TIME, joining_time);
468+
self->env->scheduler->prepare_timestep(self->env->scheduler, NEVER_TAG);
469+
StartupCoordinator_schedule_startups(self, start_tag);
470+
StartupCoordinator_schedule_timers(self, self->env->main, start_tag);
471+
self->env->scheduler->prepare_timestep(self->env->scheduler, start_tag);
472+
self->env->scheduler->set_and_schedule_start_tag(self->env->scheduler, joining_time);
473+
} else if (self->joining_policy == JOIN_ALIGNED_WITH_SHORT_TIMER) {
474+
joining_time = max_logical_time + MSEC(50);
475+
tag_t start_tag = {.time = joining_time, .microstep = 0};
476+
LF_INFO(FED, "Policy: Timer Aligned Scheduling join_time: " PRINTF_TIME, joining_time);
477+
self->env->scheduler->prepare_timestep(self->env->scheduler, NEVER_TAG);
478+
StartupCoordinator_schedule_startups(self, start_tag);
479+
StartupCoordinator_schedule_timers_joining(self, self->env->main, start_time, joining_time);
480+
self->env->scheduler->prepare_timestep(self->env->scheduler, start_tag);
481+
self->env->scheduler->set_and_schedule_start_tag(self->env->scheduler, joining_time);
482+
} else {
483+
validate(false);
484+
}
451485
}
452486

453487
/** Invoked by scheduler when handling any system event destined for StartupCoordinator. */

0 commit comments

Comments
 (0)