Skip to content

Commit 41b4d6d

Browse files
committed
handling joining time announcement
1 parent 5bea031 commit 41b4d6d

File tree

2 files changed

+25
-7
lines changed

2 files changed

+25
-7
lines changed

src/federated.c

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,7 @@ void FederatedInputConnection_ctor(FederatedInputConnection *self, Reactor *pare
139139
self->max_wait = max_wait;
140140
}
141141

142-
// Callback registered with the NetworkChannel. Is called asynchronously when there is a
143-
// a TaggedMessage available.
142+
// Callback registered with the NetworkChannel. Is called asynchronously when there is a TaggedMessage available.
144143
void FederatedConnectionBundle_handle_tagged_msg(FederatedConnectionBundle *self, const FederateMessage *_msg) {
145144
const TaggedMessage *msg = &_msg->message.tagged_message;
146145
LF_DEBUG(FED, "Callback on FedConnBundle %p for message of size=%u with tag:" PRINTF_TAG, self, msg->payload.size,
@@ -222,16 +221,16 @@ void FederatedConnectionBundle_msg_received_cb(FederatedConnectionBundle *self,
222221
FederatedEnvironment *env_fed = (FederatedEnvironment *)self->parent->env;
223222
switch (msg->which_message) {
224223
case FederateMessage_tagged_message_tag:
225-
LF_DEBUG(FED, "Handeling tagged message");
224+
LF_DEBUG(FED, "Handling tagged message");
226225
FederatedConnectionBundle_handle_tagged_msg(self, msg);
227226
break;
228227
case FederateMessage_startup_coordination_tag:
229-
LF_DEBUG(FED, "Handeling start up message");
228+
LF_DEBUG(FED, "Handling start up message");
230229
env_fed->startup_coordinator->handle_message_callback(env_fed->startup_coordinator,
231230
&msg->message.startup_coordination, self->index);
232231
break;
233232
case FederateMessage_clock_sync_msg_tag:
234-
LF_DEBUG(FED, "Handeling clock sync message");
233+
LF_DEBUG(FED, "Handling clock sync message");
235234
if (env_fed->do_clock_sync) {
236235
env_fed->clock_sync->handle_message_callback(env_fed->clock_sync, &msg->message.clock_sync_msg, self->index);
237236
} else {

src/startup_coordinator.c

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,25 @@ static void StartupCoordinator_handle_start_time_response(StartupCoordinator *se
456456
}
457457
}
458458

459+
static void StartupCoordinator_handle_join_time_announcement(const StartupCoordinator *self,
460+
const StartupEvent *payload) {
461+
if (payload->neighbor_index != NEIGHBOR_INDEX_SELF) {
462+
463+
const FederatedEnvironment *env = (FederatedEnvironment *)self->env;
464+
for (size_t i = 0; i < env->net_bundles_size; i++) {
465+
if (env->net_bundles[i]->index == (size_t)payload->neighbor_index) {
466+
467+
// we found the correct connection bundle to this federate now we set last known tag to the joining time.
468+
const FederatedConnectionBundle *bundle = env->net_bundles[i];
469+
for (size_t j = 0; j < bundle->inputs_size; j++) {
470+
tag_t joining_time = {.time = payload->msg.message.joining_time_announcement.joining_time, .microstep = 0};
471+
bundle->inputs[i]->last_known_tag = joining_time;
472+
}
473+
}
474+
}
475+
}
476+
}
477+
459478
/** Invoked by scheduler when handling any system event destined for StartupCoordinator. */
460479
static void StartupCoordinator_handle_system_event(SystemEventHandler *_self, SystemEvent *event) {
461480
StartupCoordinator *self = (StartupCoordinator *)_self;
@@ -487,8 +506,8 @@ static void StartupCoordinator_handle_system_event(SystemEventHandler *_self, Sy
487506
break;
488507

489508
case StartupCoordination_joining_time_announcement_tag:
490-
LF_INFO(FED, "Handle: Announcement of Joining Tag");
491-
// TODO: here we then need to set the last message tag on all input ports connected to this transient federate
509+
LF_INFO(FED, "Handle: Joining Time Announcement");
510+
StartupCoordinator_handle_join_time_announcement(self, payload);
492511
break;
493512
}
494513

0 commit comments

Comments
 (0)