diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 16bf0e616..4f60f161b 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -122,4 +122,5 @@ define(USER_THREADS) define(SCHEDULER) define(LF_SOURCE_DIRECTORY) define(LF_PACKAGE_DIRECTORY) +define(LF_FEDERATES_BIN_DIRECTORY) define(LF_FILE_SEPARATOR) diff --git a/core/federated/RTI/enclave.c b/core/federated/RTI/enclave.c index 9af2d60ac..56f9c0334 100644 --- a/core/federated/RTI/enclave.c +++ b/core/federated/RTI/enclave.c @@ -63,6 +63,9 @@ void logical_tag_complete(enclave_t* enclave, tag_t completed) { tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) { tag_advance_grant_t result = {.tag = NEVER_TAG, .is_provisional = false}; + // Check how many upstream federates are connected + int num_connected_upstream = 0; + // Find the earliest LTC of upstream enclaves (M). tag_t min_upstream_completed = FOREVER_TAG; @@ -72,6 +75,8 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) { // Ignore this enclave if it no longer connected. if (upstream->state == NOT_CONNECTED) continue; + num_connected_upstream++; + // Adjust by the "after" delay. // Note that "no delay" is encoded as NEVER, // whereas one microstep delay is encoded as 0LL. @@ -81,16 +86,25 @@ tag_advance_grant_t tag_advance_grant_if_safe(enclave_t* e) { min_upstream_completed = candidate; } } + LF_PRINT_LOG("Minimum upstream LTC for federate/enclave %d is " PRINTF_TAG "(adjusted by after delay).", e->id, min_upstream_completed.time - start_time, min_upstream_completed.microstep); - if (lf_tag_compare(min_upstream_completed, e->last_granted) > 0 + + if (num_connected_upstream == 0) { + // When none of the upstream federates is connected (case of transients), + if (lf_tag_compare(e->next_event, FOREVER_TAG) != 0) { + result.tag = e->next_event; + return result; + } + } else if ( + lf_tag_compare(min_upstream_completed, e->last_granted) > 0 && lf_tag_compare(min_upstream_completed, e->next_event) >= 0 // The enclave has to advance its tag ) { result.tag = min_upstream_completed; return result; - } + } // Can't make progress based only on upstream LTCs. // If all (transitive) upstream enclaves of the enclave @@ -257,6 +271,7 @@ tag_advance_grant_t next_event_tag(enclave_t* e, tag_t next_event_tag) { void notify_advance_grant_if_safe(enclave_t* e) { tag_advance_grant_t grant = tag_advance_grant_if_safe(e); + if (lf_tag_compare(grant.tag, NEVER_TAG) != 0) { if (grant.is_provisional) { notify_provisional_tag_advance_grant(e, grant.tag); diff --git a/core/federated/RTI/rti.c b/core/federated/RTI/rti.c index 34826d7e7..798c0b62c 100644 --- a/core/federated/RTI/rti.c +++ b/core/federated/RTI/rti.c @@ -113,10 +113,14 @@ int main(int argc, const char* argv[]) { lf_print("Tracing the RTI execution in %s file.", rti_trace_file_name); } + lf_print("Starting RTI for a total of %d federates, with %d being transient, in federation ID %s", \ + _f_rti->number_of_enclaves, + _f_rti->number_of_transient_federates, + _f_rti->federation_id); - lf_print("Starting RTI for %d federates in federation ID %s.", _f_rti->number_of_enclaves, _f_rti->federation_id); assert(_f_rti->number_of_enclaves < UINT16_MAX); - + assert(_f_rti->number_of_transient_federates < UINT16_MAX); + // Allocate memory for the federates _f_rti->enclaves = (federate_t**)calloc(_f_rti->number_of_enclaves, sizeof(federate_t*)); for (uint16_t i = 0; i < _f_rti->number_of_enclaves; i++) { @@ -129,6 +133,5 @@ int main(int argc, const char* argv[]) { int socket_descriptor = start_rti_server(_f_rti->user_specified_port); wait_for_federates(socket_descriptor); - lf_print("RTI is exiting."); return 0; } diff --git a/core/federated/RTI/rti_lib.c b/core/federated/RTI/rti_lib.c index 4cab7bc15..7bc0c5782 100644 --- a/core/federated/RTI/rti_lib.c +++ b/core/federated/RTI/rti_lib.c @@ -29,11 +29,18 @@ // Global variables defined in tag.c: extern instant_t start_time; -/** - * Reference to federate_rti_t instance. - */ +// Reference to federate_rti_t instance. federation_rti_t *_f_rti; +// Referance to the ederate instance to support hot swap +federate_t * hot_swap_federate; + +// Indicates if a hot swap process is in progress +bool hot_swap_in_progress = false; + +// Indicates thatthe old federate has stopped +bool hot_swap_old_resigned = false; + lf_mutex_t rti_mutex; lf_cond_t received_start_times; lf_cond_t sent_start_time; @@ -161,6 +168,8 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty return socket_descriptor; } + + void notify_tag_advance_grant(enclave_t* e, tag_t tag) { if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 @@ -174,29 +183,22 @@ void notify_tag_advance_grant(enclave_t* e, tag_t tag) { // Need to wait here. lf_cond_wait(&sent_start_time); } - size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t); - unsigned char buffer[message_length]; - buffer[0] = MSG_TYPE_TAG_ADVANCE_GRANT; - encode_int64(tag.time, &(buffer[1])); - encode_int32((int32_t)tag.microstep, &(buffer[1 + sizeof(int64_t)])); - if (_f_rti->tracing_enabled) { - tracepoint_rti_to_federate(_f_rti->trace, send_TAG, e->id, &tag); - } - // This function is called in notify_advance_grant_if_safe(), which is a long - // function. During this call, the socket might close, causing the following write_to_socket - // to fail. Consider a failure here a soft failure and update the federate's status. - ssize_t bytes_written = write_to_socket(((federate_t*)e)->socket, message_length, buffer); - if (bytes_written < (ssize_t)message_length) { - lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); - if (bytes_written < 0) { - e->state = NOT_CONNECTED; - // FIXME: We need better error handling, but don't stop other execution here. + // Check if sending the tag advance grant needs to be delayed or not + // Delay is needed when a federate has, at least one, absent upstream transient + int num_absent_upstram_transients = 0; + for (int j = 0; j < e->num_upstream; j++) { + federate_t *upstream = (federate_t*)(_f_rti->enclaves[e->upstream[j]]); + // Do Ignore this enclave if it no longer connected. + if ((upstream->enclave.state == NOT_CONNECTED) && (upstream->is_transient)) { + num_absent_upstram_transients++; } + } + + if (num_absent_upstram_transients > 0) { + notify_tag_advance_grant_delayed(e, tag); } else { - e->last_granted = tag; - LF_PRINT_LOG("RTI sent to federate %d the tag advance grant (TAG) " PRINTF_TAG ".", - e->id, tag.time - start_time, tag.microstep); + notify_tag_advance_grant_immediate(e, tag); } } @@ -213,59 +215,22 @@ void notify_provisional_tag_advance_grant(enclave_t* e, tag_t tag) { // Need to wait here. lf_cond_wait(&sent_start_time); } - size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t); - unsigned char buffer[message_length]; - buffer[0] = MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT; - encode_int64(tag.time, &(buffer[1])); - encode_int32((int32_t)tag.microstep, &(buffer[1 + sizeof(int64_t)])); - if (_f_rti->tracing_enabled){ - tracepoint_rti_to_federate(_f_rti->trace, send_PTAG, e->id, &tag); + // Check if sending the tag advance grant needs to be delayed or not + // Delay is needed when a federate has, at least one, absent upstream transient + int num_absent_upstram_transients = 0; + for (int j = 0; j < e->num_upstream; j++) { + federate_t *upstream = (federate_t*)(_f_rti->enclaves[e->upstream[j]]); + // Do Ignore this enclave if it no longer connected. + if ((upstream->enclave.state == NOT_CONNECTED) && (upstream->is_transient)) { + num_absent_upstram_transients++; + } } - // This function is called in notify_advance_grant_if_safe(), which is a long - // function. During this call, the socket might close, causing the following write_to_socket - // to fail. Consider a failure here a soft failure and update the federate's status. - ssize_t bytes_written = write_to_socket(((federate_t*)e)->socket, message_length, buffer); - if (bytes_written < (ssize_t)message_length) { - lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); - if (bytes_written < 0) { - e->state = NOT_CONNECTED; - // FIXME: We need better error handling, but don't stop other execution here. - } + if (num_absent_upstram_transients > 0) { + notify_provisional_tag_advance_grant_delayed(e, tag); } else { - e->last_provisionally_granted = tag; - LF_PRINT_LOG("RTI sent to federate %d the Provisional Tag Advance Grant (PTAG) " PRINTF_TAG ".", - e->id, tag.time - start_time, tag.microstep); - - // Send PTAG to all upstream federates, if they have not had - // a later or equal PTAG or TAG sent previously and if their transitive - // NET is greater than or equal to the tag. - // NOTE: This could later be replaced with a TNET mechanism once - // we have an available encoding of causality interfaces. - // That might be more efficient. - for (int j = 0; j < e->num_upstream; j++) { - federate_t* upstream = _f_rti->enclaves[e->upstream[j]]; - - // Ignore this federate if it has resigned. - if (upstream->enclave.state == NOT_CONNECTED) continue; - // To handle cycles, need to create a boolean array to keep - // track of which upstream federates have been visited. - bool* visited = (bool*)calloc(_f_rti->number_of_enclaves, sizeof(bool)); // Initializes to 0. - - // Find the (transitive) next event tag upstream. - tag_t upstream_next_event = transitive_next_event( - &(upstream->enclave), upstream->enclave.next_event, visited); - free(visited); - // If these tags are equal, then - // a TAG or PTAG should have already been granted, - // in which case, another will not be sent. But it - // may not have been already granted. - if (lf_tag_compare(upstream_next_event, tag) >= 0) { - notify_provisional_tag_advance_grant(&(upstream->enclave), tag); - } - - } + notify_provisional_tag_advance_grant_immediate(e, tag); } } @@ -414,8 +379,15 @@ void handle_timed_message(federate_t* sending_federate, unsigned char* buffer) { fed->enclave.last_provisionally_granted.microstep ); return; + } else { + if(lf_tag_compare(intended_tag, fed->effective_start_tag) < 0) { + // Do not forward the message if the federate is connected, but its + // start_time is not reached yet + lf_mutex_unlock(&rti_mutex); + return; + } } - + // Forward the message or message chunk. int destination_socket = fed->socket; @@ -587,12 +559,14 @@ void _lf_rti_broadcast_stop_time_to_federates_locked() { void mark_federate_requesting_stop(federate_t* fed) { if (!fed->requested_stop) { - // Assume that the federate - // has requested stop - _f_rti->num_enclaves_handling_stop++; + // Assume that the federate has requested stop + // Increment the number of federates handling stop only if it is persistent + if (fed->is_transient == false) { + _f_rti->num_enclaves_handling_stop++; + } fed->requested_stop = true; } - if (_f_rti->num_enclaves_handling_stop == _f_rti->number_of_enclaves) { + if (_f_rti->num_enclaves_handling_stop == _f_rti->number_of_enclaves - _f_rti->number_of_transient_federates) { // We now have information about the stop time of all // federates. _lf_rti_broadcast_stop_time_to_federates_locked(); @@ -638,7 +612,7 @@ void handle_stop_request_message(federate_t* fed) { // for a stop, add it to the tally. mark_federate_requesting_stop(fed); - if (_f_rti->num_enclaves_handling_stop == _f_rti->number_of_enclaves) { + if (_f_rti->num_enclaves_handling_stop == _f_rti->number_of_enclaves - _f_rti->number_of_transient_federates) { // We now have information about the stop time of all // federates. This is extremely unlikely, but it can occur // all federates call lf_request_stop() at the same tag. @@ -706,8 +680,6 @@ void handle_stop_request_reply(federate_t* fed) { lf_mutex_unlock(&rti_mutex); } -////////////////////////////////////////////////// - void handle_address_query(uint16_t fed_id) { federate_t *fed = _f_rti->enclaves[fed_id]; // Use buffer both for reading and constructing the reply. @@ -790,54 +762,166 @@ void handle_timestamp(federate_t *my_fed) { LF_PRINT_DEBUG("RTI received timestamp message with time: " PRINTF_TIME ".", timestamp); lf_mutex_lock(&rti_mutex); - _f_rti->num_feds_proposed_start++; - if (timestamp > _f_rti->max_start_time) { - _f_rti->max_start_time = timestamp; - } - if (_f_rti->num_feds_proposed_start == _f_rti->number_of_enclaves) { - // All federates have proposed a start time. - lf_cond_broadcast(&received_start_times); - } else { - // Some federates have not yet proposed a start time. - // wait for a notification. - while (_f_rti->num_feds_proposed_start < _f_rti->number_of_enclaves) { - // FIXME: Should have a timeout here? - lf_cond_wait(&received_start_times); + + // Processing the TIMESTAMP depends on whether it is the startup phase (all + // persistent federates joined) or not. + if (_f_rti->phase == startup_phase) { // This is equivalent to: _f_rti->num_feds_proposed_start < (_f_rti->number_of_enclaves - _f_rti->number_of_transient_federates) + if (timestamp > _f_rti->max_start_time) { + _f_rti->max_start_time = timestamp; + } + // Check that persistent federates did propose a start_time + if (!my_fed->is_transient) { + _f_rti->num_feds_proposed_start++; + } + if (_f_rti->num_feds_proposed_start == (_f_rti->number_of_enclaves - _f_rti->number_of_transient_federates)) { + // All federates have proposed a start time. + lf_cond_broadcast(&received_start_times); + _f_rti->phase = execution_phase; + } else { + // Some federates have not yet proposed a start time. + // wait for a notification. + while (_f_rti->num_feds_proposed_start < (_f_rti->number_of_enclaves - _f_rti->number_of_transient_federates)) { + // FIXME: Should have a timeout here? + lf_cond_wait(&received_start_times); + } } - } - lf_mutex_unlock(&rti_mutex); + lf_mutex_unlock(&rti_mutex); - // Send back to the federate the maximum time plus an offset on a TIMESTAMP - // message. - unsigned char start_time_buffer[MSG_TYPE_TIMESTAMP_LENGTH]; - start_time_buffer[0] = MSG_TYPE_TIMESTAMP; - // Add an offset to this start time to get everyone starting together. - start_time = _f_rti->max_start_time + DELAY_START; - encode_int64(swap_bytes_if_big_endian_int64(start_time), &start_time_buffer[1]); + // Add an offset to this start time to get everyone starting together. + start_time = _f_rti->max_start_time + DELAY_START; + + // Send the start_time + my_fed->effective_start_tag = (tag_t){.time = start_time, .microstep = 0u}; + send_start_tag(my_fed, start_time, my_fed->effective_start_tag); + } else if (_f_rti->phase == execution_phase) { + // A transient has joined after the startup phase + // At this point, we already hold the mutex + + // This is rather a possible extreme corner case, where a transient sends its timestamp, and only + // enters the if section after all persistents have joined. + if (timestamp < start_time) { + timestamp = start_time; + } + + //// Algorithm for computing the effective_start_time of a joining transient + // The effective_start_time will be the max among all the following tags: + // - At tag: (joining time, 0 microstep) + // - The latest completed logical tag + 1 microstep + // - The latest granted tag + 1 microstep, of every downstream federate + // - The latest provisionnaly granted tag + 1 microstep, of every downstream federate + + my_fed->effective_start_tag = (tag_t){.time = timestamp, .microstep = 0u}; + + if (lf_tag_compare(my_fed->enclave.completed, my_fed->effective_start_tag) > 0) { + my_fed->effective_start_tag = my_fed->enclave.completed; + my_fed->effective_start_tag.microstep++; + } + + // Iterate over the downstream federates + for (int j = 0; j < my_fed->enclave.num_downstream; j++) { + federate_t* downstream = _f_rti->enclaves[my_fed->enclave.downstream[j]]; + + // Ignore this federate if it has resigned. + if (downstream->enclave.state == NOT_CONNECTED) { + continue; + } + + // Get the max over the TAG of the downstreams + if (lf_tag_compare(downstream->enclave.last_granted, my_fed->effective_start_tag) > 0) { + my_fed->effective_start_tag = downstream->enclave.last_granted; + my_fed->effective_start_tag.microstep++; + } + + // Get the max over the PTAG of the downstreams + if (lf_tag_compare(downstream->enclave.last_provisionally_granted, my_fed->effective_start_tag) > 0) { + my_fed->effective_start_tag = downstream->enclave.last_provisionally_granted; + my_fed->effective_start_tag.microstep++; + } + } + // For every downstream that has a pending grant that is higher then the + // effective_start_time of the federate, cancel it + for (int j = 0; j < my_fed->enclave.num_downstream; j++) { + federate_t* downstream = _f_rti->enclaves[my_fed->enclave.downstream[j]]; + + // Ignore this federate if it has resigned. + if (downstream->enclave.state == NOT_CONNECTED) { + continue; + } + + // Check the pending tag grant, if any, and keep it only if it is + // sonner than the effective start tag + if ( + lf_tag_compare(downstream->pending_grant, NEVER_TAG) != 0 + && lf_tag_compare(downstream->pending_grant, my_fed->effective_start_tag) > 0 + ) { + downstream->pending_grant = NEVER_TAG; + } + // Same for the possible pending provisional tag grant + if ( + lf_tag_compare(downstream->pending_provisional_grant, NEVER_TAG) != 0 + && lf_tag_compare(downstream->pending_provisional_grant, my_fed->effective_start_tag) > 0 + ) { + downstream->pending_provisional_grant = NEVER_TAG; + } + } + + lf_mutex_unlock(&rti_mutex); + + // Once the effective start time set, sent it to the joining transient, + // together with the start time of the federation. + + // Send the start time + send_start_tag(my_fed, start_time, my_fed->effective_start_tag); + } +} + +void send_start_tag(federate_t* my_fed, instant_t federation_start_time, tag_t federate_start_tag) { + // Send back to the federate the maximum time plus an offset on a TIMESTAMP_START + // message. + // In the startup phase, federates will receive identical start_time and + // effective_start_tag + unsigned char start_time_buffer[MSG_TYPE_TIMESTAMP_START_LENGTH]; + start_time_buffer[0] = MSG_TYPE_TIMESTAMP_START; + + encode_int64(swap_bytes_if_big_endian_int64(federation_start_time), &start_time_buffer[1]); + encode_tag(&(start_time_buffer[1 + sizeof(instant_t)]), federate_start_tag); + if (_f_rti->tracing_enabled) { - tag_t tag = {.time = start_time, .microstep = 0}; - tracepoint_rti_to_federate(_f_rti->trace, send_TIMESTAMP, my_fed->enclave.id, &tag); + tracepoint_rti_to_federate(_f_rti->trace, send_TIMESTAMP, my_fed->enclave.id, &federate_start_tag); } + ssize_t bytes_written = write_to_socket( - my_fed->socket, MSG_TYPE_TIMESTAMP_LENGTH, + my_fed->socket, MSG_TYPE_TIMESTAMP_START_LENGTH, start_time_buffer ); if (bytes_written < MSG_TYPE_TIMESTAMP_LENGTH) { lf_print_error("Failed to send the starting time to federate %d.", my_fed->enclave.id); } - lf_mutex_lock(&rti_mutex); - // Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP - // message has been sent. That MSG_TYPE_TIMESTAMP message grants time advance to + // Update state for the federate to indicate that MSG_TYPE_TIMESTAMP_START + // message has been sent. MSG_TYPE_TIMESTAMP_START grants time advance to // the federate to the start time. + lf_mutex_lock(&rti_mutex); my_fed->enclave.state = GRANTED; - lf_cond_broadcast(&sent_start_time); - LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id); + + // If it the startup phase, then broadcast that the start_time was sent + if (_f_rti->num_feds_proposed_start < (_f_rti->number_of_enclaves - _f_rti->number_of_transient_federates)) { + lf_cond_broadcast(&sent_start_time); + } + lf_mutex_unlock(&rti_mutex); + + LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d." + " The effective start tag is " PRINTF_TAG ".", + federation_start_time, + my_fed->enclave.id, + federate_start_tag.time, + federate_start_tag.microstep); } + void send_physical_clock(unsigned char message_type, federate_t* fed, socket_type_t socket_type) { if (fed->enclave.state == NOT_CONNECTED) { lf_print_warning("Clock sync: RTI failed to send physical time to federate %d. Socket not connected.", @@ -1014,6 +1098,12 @@ void handle_federate_resign(federate_t *my_fed) { lf_print("Federate %d has resigned.", my_fed->enclave.id); + + // // Signal the hot swap mechanism, if needed + // if (hot_swap_in_progress && hot_swap_federate->enclave.id == my_fed->enclave.id) { + // hot_swap_old_resigned = true; + // } + // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep // track of which upstream federates have been visited. @@ -1060,7 +1150,6 @@ void* federate_thread_TCP(void* fed) { break; case MSG_TYPE_RESIGN: handle_federate_resign(my_fed); - return NULL; break; case MSG_TYPE_NEXT_EVENT_TAG: handle_next_event_tag(my_fed); @@ -1091,6 +1180,26 @@ void* federate_thread_TCP(void* fed) { // Nothing more to do. Close the socket and exit. close(my_fed->socket); // from unistd.h + // Manual clean, in case of a transient federate + if (my_fed->is_transient) { + free_in_transit_message_q(my_fed->in_transit_message_tags); + lf_print("RTI: Transient Federate %d thread exited.", my_fed->enclave.id); + + // Update the number of connected transient federates + lf_mutex_lock(&rti_mutex); + _f_rti->number_of_connected_transient_federates--; + + // Reset the status of the leaving federate + reset_transient_federate(my_fed); + + lf_mutex_unlock(&rti_mutex); + } + + // Signal the hot swap mechanism, if needed + if (hot_swap_in_progress && hot_swap_federate->enclave.id == my_fed->enclave.id) { + hot_swap_old_resigned = true; + } + return NULL; } @@ -1106,8 +1215,8 @@ void send_reject(int socket_id, unsigned char error_code) { } int32_t receive_and_check_fed_id_message(int socket_id, struct sockaddr_in* client_fd) { - // Buffer for message ID, federate ID, and federation ID length. - size_t length = 1 + sizeof(uint16_t) + 1; // Message ID, federate ID, length of fedration ID. + // Buffer for message ID, federate ID, type, and federation ID length. + size_t length = 1 + sizeof(uint16_t) + 1 + 1; // Message ID, federate ID, type, length of fedration ID. unsigned char buffer[length]; // Read bytes from the socket. We need 4 bytes. @@ -1115,7 +1224,7 @@ int32_t receive_and_check_fed_id_message(int socket_id, struct sockaddr_in* clie read_from_socket_errexit(socket_id, length, buffer, "RTI failed to read from accepted socket."); uint16_t fed_id = _f_rti->number_of_enclaves; // Initialize to an invalid value. - + bool is_transient = false; // First byte received is the message type. if (buffer[0] != MSG_TYPE_FED_IDS) { if(buffer[0] == MSG_TYPE_P2P_SENDING_FED_ID || buffer[0] == MSG_TYPE_P2P_TAGGED_MESSAGE) { @@ -1138,10 +1247,15 @@ int32_t receive_and_check_fed_id_message(int socket_id, struct sockaddr_in* clie } else { // Received federate ID. fed_id = extract_uint16(buffer + 1); - LF_PRINT_DEBUG("RTI received federate ID: %d.", fed_id); + is_transient = (buffer[sizeof(uint16_t) + 1] == 1)? true : false; + if(is_transient) { + LF_PRINT_LOG("RTI received federate ID: %d, which is transient.", fed_id); + } else { + LF_PRINT_LOG("RTI received federate ID: %d, which is persistent.", fed_id); + } // Read the federation ID. First read the length, which is one byte. - size_t federation_id_length = (size_t)buffer[sizeof(uint16_t) + 1]; + size_t federation_id_length = (size_t)buffer[sizeof(uint16_t) + 2]; char federation_id_received[federation_id_length + 1]; // One extra for null terminator. // Next read the actual federation ID. // FIXME: This should not exit on error, but rather just reject the connection. @@ -1178,18 +1292,63 @@ int32_t receive_and_check_fed_id_message(int socket_id, struct sockaddr_in* clie send_reject(socket_id, FEDERATE_ID_OUT_OF_RANGE); return -1; } else { + // Identify if it is a new connection or a hot swap + // Reject if: + // - duplicate of a connected persistent federate + // - or hot_swap is already in progress (Only 1 hot swap at a time!), for that + // particular federate + // - or it is a hot swap, but it is not the execution phase yet + if ((_f_rti->enclaves[fed_id])->enclave.state != NOT_CONNECTED) { - lf_print_error("RTI received duplicate federate ID: %d.", fed_id); - if (_f_rti->tracing_enabled) { - tracepoint_rti_to_federate(_f_rti->trace, send_REJECT, fed_id, NULL); + if (!is_transient) { + lf_print_error("RTI received unallowed duplicate persistent federate ID: %d.", fed_id); + if (_f_rti->tracing_enabled) { + tracepoint_rti_to_federate(_f_rti->trace, send_REJECT, fed_id, NULL); + } + send_reject(socket_id, FEDERATE_ID_IN_USE); + lf_print(">>> Rejected, because a duplicate of a persistent."); + return -1; + } else if ( + hot_swap_in_progress + || _f_rti->phase != execution_phase + ) { + lf_print_warning("Reject for %d, because Hot swap is already in progress for federate %d.",fed_id, hot_swap_federate->enclave.id); + if (_f_rti->tracing_enabled) { + tracepoint_rti_to_federate(_f_rti->trace, send_REJECT, fed_id, NULL); + } + send_reject(socket_id, FEDERATE_ID_IN_USE); + return -1; + } - send_reject(socket_id, FEDERATE_ID_IN_USE); - return -1; } } } } - federate_t* fed = _f_rti->enclaves[fed_id]; + + // If the federate is already connected (making the request a duplicate), and that + // the federate is transient, and it is the execution phase, then mark that a hot + // swap is in progreass and initialize the hot_swap_federate. + // Otherwise, proceed with a normal transinet connection + federate_t* fed; + if ( + (_f_rti->enclaves[fed_id])->enclave.state != NOT_CONNECTED + && is_transient + && (_f_rti->enclaves[fed_id])->is_transient + && _f_rti->phase == execution_phase + && !hot_swap_in_progress + ) { + // Allocate memory for the new federate and initilize it + hot_swap_federate = (federate_t *)malloc(sizeof(federate_t)); + initialize_federate(hot_swap_federate, fed_id); + + // Set that hot swap is in progress + hot_swap_in_progress = true; + fed = hot_swap_federate; + lf_print("RTI: Hot Swap starting for federate %d.", fed_id); + } else { + fed = _f_rti->enclaves[fed_id]; + } + // The MSG_TYPE_FED_IDS message has the right federation ID. // Assign the address information for federate. // The IP address is stored here as an in_addr struct (in .server_ip_addr) that can be useful @@ -1214,6 +1373,8 @@ int32_t receive_and_check_fed_id_message(int socket_id, struct sockaddr_in* clie // because it is waiting for the start time to be // sent by the RTI before beginning its execution. fed->enclave.state = PENDING; + fed->is_transient = is_transient; + fed->enclave.id = fed_id; LF_PRINT_DEBUG("RTI responding with MSG_TYPE_ACK to federate %d.", fed_id); // Send an MSG_TYPE_ACK message. @@ -1244,7 +1405,12 @@ int receive_connection_information(int socket_id, uint16_t fed_id) { send_reject(socket_id, UNEXPECTED_MESSAGE); return 0; } else { - federate_t* fed = _f_rti->enclaves[fed_id]; + federate_t* fed; + if (hot_swap_in_progress) { + fed = hot_swap_federate; + } else { + fed = _f_rti->enclaves[fed_id]; + } // Read the number of upstream and downstream connections fed->enclave.num_upstream = extract_int32(&(connection_info_header[1])); fed->enclave.num_downstream = extract_int32(&(connection_info_header[1 + sizeof(int32_t)])); @@ -1310,7 +1476,12 @@ int receive_udp_message_and_set_up_clock_sync(int socket_id, uint16_t fed_id) { send_reject(socket_id, UNEXPECTED_MESSAGE); return 0; } else { - federate_t *fed = _f_rti->enclaves[fed_id]; + federate_t* fed; + if (hot_swap_in_progress) { + fed = hot_swap_federate; + } else { + fed = _f_rti->enclaves[fed_id]; + } if (_f_rti->clock_sync_global_status >= clock_sync_init) {// If no initial clock sync, no need perform initial clock sync. uint16_t federate_UDP_port_number = extract_uint16(&(response[1])); @@ -1432,8 +1603,11 @@ bool authenticate_federate(int socket) { } #endif +// FIXME: The socket descriptor here is not used. Should be removed? void connect_to_federates(int socket_descriptor) { - for (int i = 0; i < _f_rti->number_of_enclaves; i++) { + // This loop will accept both, persistent and transient federates. + // For transient, however, i will be decreased + for (int i = 0 ; i < _f_rti->number_of_enclaves - _f_rti->number_of_transient_federates ; i++) { // Wait for an incoming connection request. struct sockaddr client_fd; uint32_t client_length = sizeof(client_fd); @@ -1467,6 +1641,7 @@ void connect_to_federates(int socket_descriptor) { // The first message from the federate should contain its ID and the federation ID. int32_t fed_id = receive_and_check_fed_id_message(socket_id, (struct sockaddr_in*)&client_fd); + if (fed_id >= 0 && receive_connection_information(socket_id, (uint16_t)fed_id) && receive_udp_message_and_set_up_clock_sync(socket_id, (uint16_t)fed_id)) { @@ -1477,14 +1652,20 @@ void connect_to_federates(int socket_descriptor) { // synchronization messages. federate_t *fed = _f_rti->enclaves[fed_id]; lf_thread_create(&(fed->thread_id), federate_thread_TCP, fed); - + + if (_f_rti->enclaves[fed_id]->is_transient) { + _f_rti->number_of_connected_transient_federates++; + assert(_f_rti->number_of_connected_transient_federates <= _f_rti->number_of_transient_federates); + i--; + lf_print("RTI: Transient federate %d joined.", fed->enclave.id); + } } else { // Received message was rejected. Try again. i--; } } - // All federates have connected. - LF_PRINT_DEBUG("All federates have connected to RTI."); + // All (persistent) federates have connected. + LF_PRINT_DEBUG("All (persistent) federates have connected to RTI."); if (_f_rti->clock_sync_global_status >= clock_sync_on) { // Create the thread that performs periodic PTP clock synchronization sessions @@ -1539,6 +1720,10 @@ void initialize_federate(federate_t* fed, uint16_t id) { strncpy(fed->server_hostname ,"localhost", INET_ADDRSTRLEN); fed->server_ip_addr.s_addr = 0; fed->server_port = -1; + fed->is_transient = true; + fed->effective_start_tag = NEVER_TAG; + fed->pending_grant = NEVER_TAG; + fed->pending_provisional_grant = NEVER_TAG; } int32_t start_rti_server(uint16_t port) { @@ -1563,28 +1748,61 @@ void wait_for_federates(int socket_descriptor) { // Wait for connections from federates and create a thread for each. connect_to_federates(socket_descriptor); - // All federates have connected. - lf_print("RTI: All expected federates have connected. Starting execution."); + // All persistent federates have connected. + lf_print("RTI: All expected (persistent) federates have connected. Starting execution."); + if (_f_rti->number_of_transient_federates > 0) { + lf_print("RTI: Transient Federates can join and leave the federation at anytime."); + } - // The socket server will not continue to accept connections after all the federates - // have joined. + // The socket server will only continue to accept connections from transient + // federates. // In case some other federation's federates are trying to join the wrong // federation, need to respond. Start a separate thread to do that. lf_thread_t responder_thread; - lf_thread_create(&responder_thread, respond_to_erroneous_connections, NULL); + lf_thread_t transient_thread; + // If the federation does not include transient federates, then respond to + // erronous connections. Otherwise, continue to accept transients joining and + // respond to duplicate joing requests. + if (_f_rti->number_of_transient_federates == 0) { + lf_thread_create(&responder_thread, respond_to_erroneous_connections, NULL); + } else if (_f_rti->number_of_transient_federates > 0) { + lf_thread_create(&transient_thread, connect_to_transient_federates_thread, NULL); + } - // Wait for federate threads to exit. + // Wait for persistent federate threads to exit. void* thread_exit_status; - for (int i = 0; i < _f_rti->number_of_enclaves; i++) { - federate_t* fed = _f_rti->enclaves[i]; - lf_print("RTI: Waiting for thread handling federate %d.", fed->enclave.id); - lf_thread_join(fed->thread_id, &thread_exit_status); - free_in_transit_message_q(fed->in_transit_message_tags); - lf_print("RTI: Federate %d thread exited.", fed->enclave.id); + for (int i = 0 ; i < _f_rti->number_of_enclaves ; i++) { + if (!_f_rti->enclaves[i]->is_transient) { + federate_t* fed = _f_rti->enclaves[i]; + lf_print("RTI: Waiting for thread handling persistent federate %d.", fed->enclave.id); + lf_thread_join(fed->thread_id, &thread_exit_status); + free_in_transit_message_q(fed->in_transit_message_tags); + lf_print("RTI: Federate %d thread exited.", fed->enclave.id); + } } - _f_rti->all_federates_exited = true; + lf_print("All persistent threads exited."); + _f_rti->phase = shutdown_phase; + + // Wait for transient federate threads to exit. + // NOTE: It is important to separate the waiting of persistent federates from + // the transient federates. The reason is that if, for example, federate 0 is + // transienet, and it did leave in the middle of a federation execution, then + // we will no more wait for the thread of a future joining instance to lf_thread_join. + if (_f_rti->number_of_transient_federates > 0) { + for (int i = 0 ; i < _f_rti->number_of_enclaves ; i++) { + if (_f_rti->enclaves[i]->is_transient) { + lf_print("RTI: Waiting for thread handling transient federate %d.", _f_rti->enclaves[i]->enclave.id); + lf_thread_join(_f_rti->enclaves[i]->thread_id, &thread_exit_status); + free_in_transit_message_q(_f_rti->enclaves[i]->in_transit_message_tags); + lf_print("RTI: Federate %d thread exited.", _f_rti->enclaves[i]->enclave.id); + } + } + } + _f_rti->all_federates_exited = true; + lf_print("All transient threads exited."); + // Shutdown and close the socket so that the accept() call in // respond_to_erroneous_connections returns. That thread should then // check _f_rti->all_federates_exited and it should exit. @@ -1611,7 +1829,9 @@ void usage(int argc, const char* argv[]) { lf_print(" -i, --id "); lf_print(" The ID of the federation that this RTI will control."); lf_print(" -n, --number_of_federates "); - lf_print(" The number of federates in the federation that this RTI will control."); + lf_print(" The number of federates in the federation that this RTI will control.\n"); + lf_print(" -nt, --number_of_transient_federates "); + lf_print(" The number of transient federates in the federation that this RTI will control.\n"); lf_print(" -p, --port "); lf_print(" The port number to use for the RTI. Must be larger than 0 and smaller than %d. Default is %d.", UINT16_MAX, STARTING_PORT); lf_print(" -c, --clock_sync [off|init|on] [period ] [exchanges-per-interval ]"); @@ -1721,6 +1941,21 @@ int process_args(int argc, const char* argv[]) { } _f_rti->number_of_enclaves = (int32_t)num_federates; // FIXME: Loses numbers on 64-bit machines lf_print("RTI: Number of federates: %d", _f_rti->number_of_enclaves); + } else if (strcmp(argv[i], "-nt") == 0 || strcmp(argv[i], "--number_of_transient_federates") == 0) { + if (argc < i + 2) { + lf_print_error("--number_of_transient_federates needs an integer argument."); + usage(argc, argv); + return 0; + } + i++; + long num_transient_federates = strtol(argv[i], NULL, 10); + if (num_transient_federates == LONG_MAX || num_transient_federates == LONG_MIN) { + lf_print_error("--number_of_transient_federates needs a valid positive or null integer argument."); + usage(argc, argv); + return 0; + } + _f_rti->number_of_transient_federates = (int32_t)num_transient_federates; // FIXME: Loses numbers on 64-bit machines + lf_print("RTI: Number of transient federates: %d", _f_rti->number_of_transient_federates); } else if (strcmp(argv[i], "-p") == 0 || strcmp(argv[i], "--port") == 0) { if (argc < i + 2) { lf_print_error( @@ -1772,6 +2007,11 @@ int process_args(int argc, const char* argv[]) { usage(argc, argv); return 0; } + if (_f_rti->number_of_transient_federates > _f_rti->number_of_enclaves) { + lf_print_error("--number_of_transient_federates cannot be higher than the number of federates."); + usage(argc, argv); + return 0; + } return 1; } @@ -1786,6 +2026,7 @@ void initialize_RTI(){ _f_rti->num_feds_proposed_start = 0, _f_rti->all_federates_exited = false, _f_rti->federation_id = "Unidentified Federation", + _f_rti->phase = startup_phase, _f_rti->user_specified_port = 0, _f_rti->final_port_TCP = 0, _f_rti->socket_descriptor_TCP = -1, @@ -1797,4 +2038,308 @@ void initialize_RTI(){ _f_rti->authentication_enabled = false, _f_rti->tracing_enabled = false; _f_rti->stop_in_progress = false; -} \ No newline at end of file +} + +////////////////////////////////////////////////////////// + +void send_stop(federate_t * fed) { + // Reply with a stop granted to all federates + unsigned char outgoing_buffer[MSG_TYPE_STOP_LENGTH]; + outgoing_buffer[0] = MSG_TYPE_STOP; + lf_print("RTI sent MSG_TYPE_STOP to federate %d.", fed->enclave.id); + + if (_f_rti->tracing_enabled) { + tracepoint_rti_to_federate(_f_rti->trace, send_STOP, fed->enclave.id, NULL); + } + write_to_socket_errexit(fed->socket, MSG_TYPE_STOP_LENGTH, outgoing_buffer, + "RTI failed to send MSG_TYPE_STOP message to federate %d.", fed->enclave.id); + + LF_PRINT_LOG("RTI sent MSG_TYPE_STOP to federate %d.", fed->enclave.id); +} + +void* connect_to_transient_federates_thread() { + // This loop will continue to accept connections of transient federates, as + // soon as there is room + + while (!_f_rti->all_federates_exited) { + // Continue waiting for an incoming connection requests from transients + // to join, or for hot swap. + struct sockaddr client_fd; + uint32_t client_length = sizeof(client_fd); + // The following blocks until a federate connects. + int socket_id = -1; + while(1) { + if (_f_rti->all_federates_exited) { + return NULL; + } + socket_id = accept(_f_rti->socket_descriptor_TCP, &client_fd, &client_length); + if (socket_id >= 0) { + // Got a socket + break; + } else { + // Try again + lf_print_warning("RTI failed to accept the socket. %s. Trying again.", strerror(errno)); + continue; + } + } + + // Send RTI hello when RTI -a option is on. + #ifdef __RTI_AUTH__ + if (_f_rti->authentication_enabled) { + if (!authenticate_federate(socket_id)) { + lf_print_warning("RTI failed to authenticate the incoming federate."); + // Ignore the federate that failed authentication. + continue; + } + } + #endif + + // The first message from the federate should contain its ID and the federation ID. + // The function also detects if a hot swap request is initiated. + int32_t fed_id = receive_and_check_fed_id_message(socket_id, (struct sockaddr_in*)&client_fd); + + if (fed_id >= 0 + && receive_connection_information(socket_id, (uint16_t)fed_id) + && receive_udp_message_and_set_up_clock_sync(socket_id, (uint16_t)fed_id)) + { + lf_mutex_lock(&rti_mutex); + if (hot_swap_in_progress) { + lf_print("RTI: Hot swap confirmed for federate %d.", fed_id); + + // Then send STOP + federate_t *fed_old = _f_rti->enclaves[fed_id]; + hot_swap_federate->enclave.completed = fed_old->enclave.completed; + + LF_PRINT_LOG("RTI: Send MSG_TYPE_STOP to old federate %d.", fed_id); + send_stop(fed_old); + lf_mutex_unlock(&rti_mutex); + + // Wait for the old federate to send MSG_TYPE_RESIGN + LF_PRINT_LOG("RTI: Waiting for old federate %d to send resign.", fed_id); + // FIXME: Should this have a timeout? + while(!hot_swap_old_resigned); + + // The latest LTC is the tag at which the old federate resigned. This is useful + // for computing the effective_start_time of the new joining federate. + hot_swap_federate->enclave.completed = fed_old->enclave.completed; + + // Create a thread to communicate with the federate. + // This has to be done after clock synchronization is finished + // or that thread may end up attempting to handle incoming clock + // synchronization messages. + lf_thread_create(&(hot_swap_federate->thread_id), federate_thread_TCP, hot_swap_federate); + + // Redirect the federate in _f_rti + _f_rti->enclaves[fed_id] = hot_swap_federate; + + // Free the old federate memory and reset the Hot wap indicators + // FIXME: Is this enough to free the memory allocated to the federate? + free(fed_old); + lf_mutex_lock(&rti_mutex); + hot_swap_in_progress = false; + lf_mutex_unlock(&rti_mutex); + + lf_print("RTI: Hot swap succeeded for federate %d.", fed_id); + } else { + lf_mutex_unlock(&rti_mutex); + + // Create a thread to communicate with the federate. + // This has to be done after clock synchronization is finished + // or that thread may end up attempting to handle incoming clock + // synchronization messages. + federate_t *fed = _f_rti->enclaves[fed_id]; + lf_thread_create(&(fed->thread_id), federate_thread_TCP, fed); + lf_print("RTI: Transient federate %d joined.", fed_id); + } + _f_rti->number_of_connected_transient_federates++; + } else { + // If a hot swap was initialed, but the connection information or/and clock + // synchronization fail, then reset hot_swap_in_profress, and free the memory + // allocated for hot_swap_federate + if (hot_swap_in_progress) { + lf_print("RTI: Hot swap canceled for federate %d.", fed_id); + lf_mutex_lock(&rti_mutex); + hot_swap_in_progress = false; + lf_mutex_unlock(&rti_mutex); + + // FIXME: Is this enough to free the memory of a federate_t data structure? + free(hot_swap_federate); + } + } + } +} + +void reset_transient_federate(federate_t* fed) { + fed->enclave.next_event = NEVER_TAG; + fed->enclave.state = NOT_CONNECTED; + // Reset of the federate-related attributes + fed->socket = -1; // No socket. + fed->clock_synchronization_enabled = true; + fed->in_transit_message_tags = initialize_in_transit_message_q(); + strncpy(fed->server_hostname ,"localhost", INET_ADDRSTRLEN); + fed->server_ip_addr.s_addr = 0; + fed->server_port = -1; + fed->requested_stop = false; + fed->is_transient = true; + fed->effective_start_tag = NEVER_TAG; + fed->pending_grant = NEVER_TAG; + + // FIXME: Need to free the enclave's memory for updstreams, delays, and downstreams + // FIXME: There is room though to check if the interface has changed??? Do we allow this? +} + +void* pending_grant_thread(void* federate) { + federate_t* fed = (federate_t*)federate; + + interval_t sleep_interval = fed->pending_grant.time - lf_time_physical(); + if (sleep_interval > 0) { + lf_sleep(sleep_interval); + } + + lf_mutex_lock(&rti_mutex); + + // If the pending grant becomes NEVER_TAG, then this means that it should + // not be sent + if(lf_tag_compare(fed->pending_grant, NEVER_TAG) != 0) { + notify_tag_advance_grant_immediate(&(fed->enclave), fed->pending_grant); + fed->pending_grant = NEVER_TAG; + } + lf_mutex_unlock(&rti_mutex); +} + +void notify_tag_advance_grant_delayed(enclave_t* e, tag_t tag) { + federate_t* fed = (federate_t*)e; + + // Check wether there is already a pending grant + // And check the pending provisional grant as well + lf_mutex_lock(&rti_mutex); + if (lf_tag_compare(fed->pending_grant, NEVER_TAG) == 0) { + // If a tag is issued, then stop any possible provisional tag grant + fed->pending_grant = tag; + fed->pending_provisional_grant = NEVER_TAG; + lf_thread_create(&(fed->pending_grant_thread_id), pending_grant_thread, fed); + } else { + // If there is already a pending tag grant, then let it be sent first + // FIXME: Is this correct? + } + lf_mutex_unlock(&rti_mutex); +} + +void notify_tag_advance_grant_immediate(enclave_t* e, tag_t tag) { + // Case where the TAG notification is immediate + size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t); + unsigned char buffer[message_length]; + buffer[0] = MSG_TYPE_TAG_ADVANCE_GRANT; + encode_int64(tag.time, &(buffer[1])); + encode_int32((int32_t)tag.microstep, &(buffer[1 + sizeof(int64_t)])); + + if (_f_rti->tracing_enabled) { + tracepoint_rti_to_federate(_f_rti->trace, send_TAG, e->id, &tag); + } + // This function is called in notify_advance_grant_if_safe(), which is a long + // function. During this call, the socket might close, causing the following write_to_socket + // to fail. Consider a failure here a soft failure and update the federate's status. + ssize_t bytes_written = write_to_socket(((federate_t*)e)->socket, message_length, buffer); + e->last_granted = tag; + if (bytes_written < (ssize_t)message_length) { + lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); + if (bytes_written < 0) { + e->state = NOT_CONNECTED; + // FIXME: We need better error handling, but don't stop other execution here. + } + } else { + // e->last_granted = tag; + LF_PRINT_LOG("RTI sent to federate %d the tag advance grant (TAG) " PRINTF_TAG ".", + e->id, tag.time - start_time, tag.microstep); + } +} + +void* pending_provisional_grant_thread(void* federate) { + federate_t* fed = (federate_t*)federate; + + interval_t sleep_interval = fed->pending_provisional_grant.time - lf_time_physical(); + if (sleep_interval > 0) { + lf_sleep(sleep_interval); + } + + lf_mutex_lock(&rti_mutex); + + // If the pending grant becomes NEVER_TAG, then this means that it should + // not be sent + if(lf_tag_compare(fed->pending_provisional_grant, NEVER_TAG) != 0) { + notify_provisional_tag_advance_grant_immediate(&(fed->enclave), fed->pending_provisional_grant); + fed->pending_provisional_grant = NEVER_TAG; + } + lf_mutex_unlock(&rti_mutex); +} + +void notify_provisional_tag_advance_grant_delayed(enclave_t* e, tag_t tag) { + federate_t* fed = (federate_t*)e; + + // Proceed with the delayed provisional tag grant notification only if + // there is no pending grant and no provisional pending grant + lf_mutex_lock(&rti_mutex); + if ( + (lf_tag_compare(fed->pending_grant, NEVER_TAG) == 0) + && (lf_tag_compare(fed->pending_provisional_grant, NEVER_TAG) >= 0) + ) { + fed->pending_provisional_grant = tag; + lf_thread_create(&(fed->pending_provisional_grant_thread_id), pending_provisional_grant_thread, fed); + } + lf_mutex_unlock(&rti_mutex); +} + +void notify_provisional_tag_advance_grant_immediate(enclave_t* e, tag_t tag) { + size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t); + unsigned char buffer[message_length]; + buffer[0] = MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT; + encode_int64(tag.time, &(buffer[1])); + encode_int32((int32_t)tag.microstep, &(buffer[1 + sizeof(int64_t)])); + + if (_f_rti->tracing_enabled){ + tracepoint_rti_to_federate(_f_rti->trace, send_PTAG, e->id, &tag); + } + // This function is called in notify_advance_grant_if_safe(), which is a long + // function. During this call, the socket might close, causing the following write_to_socket + // to fail. Consider a failure here a soft failure and update the federate's status. + ssize_t bytes_written = write_to_socket(((federate_t*)e)->socket, message_length, buffer); + if (bytes_written < (ssize_t)message_length) { + lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id); + if (bytes_written < 0) { + e->state = NOT_CONNECTED; + // FIXME: We need better error handling, but don't stop other execution here. + } + } else { + e->last_provisionally_granted = tag; + LF_PRINT_LOG("RTI sent to federate %d the Provisional Tag Advance Grant (PTAG) " PRINTF_TAG ".", + e->id, tag.time - start_time, tag.microstep); + + // Send PTAG to all upstream federates, if they have not had + // a later or equal PTAG or TAG sent previously and if their transitive + // NET is greater than or equal to the tag. + // NOTE: This could later be replaced with a TNET mechanism once + // we have an available encoding of causality interfaces. + // That might be more efficient. + for (int j = 0; j < e->num_upstream; j++) { + federate_t* upstream = _f_rti->enclaves[e->upstream[j]]; + + // Ignore this federate if it has resigned. + // if (upstream->enclave.state == NOT_CONNECTED) continue; + // To handle cycles, need to create a boolean array to keep + // track of which upstream federates have been visited. + bool* visited = (bool*)calloc(_f_rti->number_of_enclaves, sizeof(bool)); // Initializes to 0. + + // Find the (transitive) next event tag upstream. + tag_t upstream_next_event = transitive_next_event( + &(upstream->enclave), upstream->enclave.next_event, visited); + free(visited); + // If these tags are equal, then + // a TAG or PTAG should have already been granted, + // in which case, another will not be sent. But it + // may not have been already granted. + if (lf_tag_compare(upstream_next_event, tag) >= 0) { + notify_provisional_tag_advance_grant(&(upstream->enclave), tag); + } + } + } +} diff --git a/core/federated/RTI/rti_lib.h b/core/federated/RTI/rti_lib.h index 9d7c63948..676255cff 100644 --- a/core/federated/RTI/rti_lib.h +++ b/core/federated/RTI/rti_lib.h @@ -67,6 +67,12 @@ typedef struct federate_t { // RTI has not been informed of the port number. struct in_addr server_ip_addr; // Information about the IP address of the socket // server of the federate. + bool is_transient; // Indicates whether the federate is transient or persistent. + tag_t effective_start_tag; // Records the start time of the federate, which is mainly useful for transient federates + tag_t pending_grant; // The pending tag advance grant + tag_t pending_provisional_grant; // The pending provisional tag advance grant + lf_thread_t pending_grant_thread_id; // The ID of the thread handling the pending tag grant + lf_thread_t pending_provisional_grant_thread_id; // The ID of the thread handling the pending provitional tag grant } federate_t; /** @@ -78,6 +84,15 @@ typedef enum clock_sync_stat { clock_sync_on } clock_sync_stat; +/** + * The RTI life cycle phase. + */ +typedef enum rti_phase { + startup_phase, + execution_phase, + shutdown_phase +} rti_phase; + /** * Structure that an RTI instance uses to keep track of its own and its * corresponding federates' state. @@ -136,6 +151,9 @@ typedef struct federation_rti_t { */ const char* federation_id; + // RTI current phase + rti_phase phase; + /************* TCP server information *************/ /** The desired port specified by the user on the command line. */ uint16_t user_specified_port; @@ -178,6 +196,13 @@ typedef struct federation_rti_t { */ bool authentication_enabled; + // Number of transient federates in the federation + int32_t number_of_transient_federates; + + // Number of connected transient federates in the federation + int32_t number_of_connected_transient_federates; + + /** * Boolean indicating that a stop request is already in progress. */ @@ -335,9 +360,31 @@ void handle_address_ad(uint16_t federate_id); /** * A function to handle timestamp messages. * This function assumes the caller does not hold the mutex. + * + * The behavior here depends on whether the message is received within the + * startup phase or not. By startup phase, it is menat that all persistent federates + * have their start_time set (already started or about to start). + * + * @param my_fed the federate that sent a MSG_TYPE_TIMESTAMP message. */ void handle_timestamp(federate_t *my_fed); +/** + * Send to the start time to the federate my_fed. + * This function assumes the caller does not hold the mutex. + * + * If it is the startup phase, the start_time will be the maximum received timestamps + * plus an offset. The federate will then receive identical federation_start_time + * and federate_start_tag.time (the federate_start_tag.microstep will be 0). + * If, however, the startup phase is passed, the federate will receive different + * values than sateted above. + * + * @param my_fed the federate to send the start time to. + * @param federation_start_time the federation start_time + * @param federate_start_tag the federate effective start tag + */ +void send_start_tag(federate_t* my_fed, instant_t federation_start_time, tag_t federate_start_tag); + /** * Take a snapshot of the physical clock time and send * it to federate fed_id. @@ -479,6 +526,7 @@ void* respond_to_erroneous_connections(void* nothing); /** * Initialize the federate with the specified ID. + * @param fed A pointer to the federate * @param id The federate ID. */ void initialize_federate(federate_t* fed, uint16_t id); @@ -527,4 +575,97 @@ int process_args(int argc, const char* argv[]); */ void initialize_RTI(); +////////////////////////////////////////////////////////// + +/** + * Once all persistent federates have connected, continue to wait for incoming + * connection requests from transient federates. + * Upon receiving it, it creates a thread to communicate with that federate. + * This thread continues to check whether the communication thread with a transient + * federate has exited, in which case it accepts other connections. + */ +void* connect_to_transient_federates_thread(); + +/** + * Reset the federate. The federate has to be transient. + * @param fed A pointer to the federate + */ +void reset_transient_federate(federate_t* fed); + +/** + * @brief a request for immediate stop to the federate + * + * @param fed: the deferate to stop + */ +void send_stop(federate_t * fed); + +/** + * @brief Thread that sleeps for a period of time, and then wakes up to check if + * a tag advance grant needs to be sent. That is, if the pending tag have not + * been reset to NEVER_TAG, the tag advance grant will be immediate. + * + * @param federate the fedarate whose tag advance grant needs to be delayed. + */ +void* pending_grant_thread(void* federate); + +/** + * Notify a tag advance grant (TAG) message to the specified federate after + * the physical time reaches the tag. A thread is created to this end. + * + * If a provisionl tag advance grant is pending, cancel it. If there is another + * pending tag advance grant, do not proceed with the thread creation. + * + * @param e The enclave. + * @param tag The tag to grant. + */ +void notify_tag_advance_grant_delayed(enclave_t* e, tag_t tag); + +/** + * Notify a tag advance grant (TAG) message to the specified federate immediately. + * + * This function will keep a record of this TAG in the enclave's last_granted + * field. + * + * @param e The enclave. + * @param tag The tag to grant. + */ +void notify_tag_advance_grant_immediate(enclave_t* e, tag_t tag); + +/** + * Thread that sleeps for a period of time, and then wakes up to check if + * a provisional tag advance grant needs to be sent. That is, if the pending + * provisional tag have not been reset to NEVER_TAG, the provisional tag advance + * grant will be immediate. + * + * @param federate the federate whose provisional tag advance grant needs to be delayed. + */ +void* pending_provisional_grant_thread(void* federate); + +/** + * Notify a provisional tag advance grant (PTAG) message to the specified federate + * after the physical time reaches the tag. A thread is created to this end. + * + * If a tag advance grant or a provisional one is pending, then do not proceed + * with the thread creation. + * + * @param e The enclave. + * @param tag The provisional tag to grant. + */ +void notify_provisional_tag_advance_grant_delayed(enclave_t* e, tag_t tag); + +/** + * Notify a provisional tag advance grant (PTAG) message to the specified federate + * immediately. + * + * This function will keep a record of this TAG in the enclave's last_provisionally_granted + * field. + * + * @param e The enclave. + * @param tag The tag to grant. + */ +void notify_provisional_tag_advance_grant_immediate(enclave_t* e, tag_t tag); + + + +////////////////////////////////////////////////////////// #endif // RTI_LIB_H diff --git a/core/federated/clock-sync.c b/core/federated/clock-sync.c index e438d83ac..3e1e576f5 100644 --- a/core/federated/clock-sync.c +++ b/core/federated/clock-sync.c @@ -443,8 +443,8 @@ void handle_T4_clock_sync_message(unsigned char* buffer, int socket, instant_t r LF_PRINT_LOG("Clock sync:" " New offset: " PRINTF_TIME "." " Round trip delay to RTI (now): " PRINTF_TIME "." - " (AVG): " PRINTF_TIME "." - " (SD): " PRINTF_TIME "." + " (AVG): %ld." + " (SD): %lld." " Local round trip delay: " PRINTF_TIME "." " Test offset: " PRINTF_TIME ".", _lf_time_physical_clock_offset, diff --git a/core/federated/federate.c b/core/federated/federate.c index d49cd20a4..fdb2dd865 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -71,6 +71,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // Global variables defined in tag.c: extern instant_t _lf_last_reported_unadjusted_physical_time_ns; extern instant_t start_time; +extern tag_t effective_start_tag; // Error messages. char* ERROR_SENDING_HEADER = "ERROR sending header information to federate via RTI"; @@ -104,7 +105,8 @@ federate_instance_t _fed = { .received_stop_request_from_rti = false, .last_sent_LTC = (tag_t) {.time = NEVER, .microstep = 0u}, .last_sent_NET = (tag_t) {.time = NEVER, .microstep = 0u}, - .min_delay_from_physical_action_to_federate_output = NEVER + .min_delay_from_physical_action_to_federate_output = NEVER, + .is_transient = false }; @@ -565,7 +567,7 @@ void* handle_p2p_connections_from_federates(void* env_arg) { } LF_PRINT_LOG("Accepted new connection from remote federate."); - size_t header_length = 1 + sizeof(uint16_t) + 1; + size_t header_length = 1 + sizeof(uint16_t) + 1 + 1; unsigned char buffer[header_length]; ssize_t bytes_read = read_from_socket(socket_id, header_length, (unsigned char*)&buffer); if (bytes_read != (ssize_t)header_length || buffer[0] != MSG_TYPE_P2P_SENDING_FED_ID) { @@ -605,6 +607,7 @@ void* handle_p2p_connections_from_federates(void* env_arg) { // Extract the ID of the sending federate. uint16_t remote_fed_id = extract_uint16((unsigned char*)&(buffer[1])); + bool remote_fed_is_transient = buffer[1 + sizeof(uint16_t)]; LF_PRINT_DEBUG("Received sending federate ID %d.", remote_fed_id); // Trace the event when tracing is enabled @@ -840,7 +843,7 @@ void connect_to_federate(uint16_t remote_federate_id) { } } else { // Connect was successful. - size_t buffer_length = 1 + sizeof(uint16_t) + 1; + size_t buffer_length = 1 + sizeof(uint16_t) + 1 + 1; unsigned char buffer[buffer_length]; buffer[0] = MSG_TYPE_P2P_SENDING_FED_ID; if (_lf_my_fed_id > UINT16_MAX) { @@ -848,8 +851,9 @@ void connect_to_federate(uint16_t remote_federate_id) { lf_print_error_and_exit("Too many federates! More than %d.", UINT16_MAX); } encode_uint16((uint16_t)_lf_my_fed_id, (unsigned char*)&(buffer[1])); + buffer[1 + sizeof(uint16_t)] = _fed.is_transient ? 1 : 0; unsigned char federation_id_length = (unsigned char)strnlen(federation_metadata.federation_id, 255); - buffer[sizeof(uint16_t) + 1] = federation_id_length; + buffer[sizeof(uint16_t) + 2] = federation_id_length; // Trace the event when tracing is enabled tracepoint_federate_to_federate(_fed.trace, send_FED_ID, _lf_my_fed_id, remote_federate_id, NULL); write_to_socket_errexit(socket_id, @@ -1078,7 +1082,7 @@ void connect_to_rti(const char* hostname, int port) { // Have connected to an RTI, but not sure it's the right RTI. // Send a MSG_TYPE_FED_IDS message and wait for a reply. // Notify the RTI of the ID of this federate and its federation. - unsigned char buffer[4]; + unsigned char buffer[5]; #ifdef FEDERATED_AUTHENTICATED LF_PRINT_LOG("Connected to an RTI. Performing HMAC-based authentication using federation ID."); @@ -1094,15 +1098,17 @@ void connect_to_rti(const char* hostname, int port) { lf_print_error_and_exit("Too many federates! More than %d.", UINT16_MAX); } encode_uint16((uint16_t)_lf_my_fed_id, &buffer[1]); + // Next send the federate type (persistent or transient) + buffer[1 + sizeof(uint16_t)] = _fed.is_transient? 1 : 0; // Next send the federation ID length. // The federation ID is limited to 255 bytes. size_t federation_id_length = strnlen(federation_metadata.federation_id, 255); - buffer[1 + sizeof(uint16_t)] = (unsigned char)(federation_id_length & 0xff); + buffer[2 + sizeof(uint16_t)] = (unsigned char)(federation_id_length & 0xff); // Trace the event when tracing is enabled tracepoint_federate_to_rti(_fed.trace, send_FED_ID, _lf_my_fed_id, NULL); - write_to_socket_errexit(_fed.socket_TCP_RTI, 2 + sizeof(uint16_t), buffer, + write_to_socket_errexit(_fed.socket_TCP_RTI, 3 + sizeof(uint16_t), buffer, "Failed to send federate ID to RTI."); // Next send the federation ID itself. @@ -1173,27 +1179,30 @@ instant_t get_start_time_from_rti(instant_t my_physical_time) { // Send the timestamp marker first. _lf_send_time(MSG_TYPE_TIMESTAMP, my_physical_time, true); - // Read bytes from the socket. We need 9 bytes. + // Read bytes from the socket. We need 17 (1 + 8 + 8) bytes. // Buffer for message ID plus timestamp. - size_t buffer_length = 1 + sizeof(instant_t); + size_t buffer_length = MSG_TYPE_TIMESTAMP_START_LENGTH; unsigned char buffer[buffer_length]; read_from_socket_errexit(_fed.socket_TCP_RTI, buffer_length, buffer, - "Failed to read MSG_TYPE_TIMESTAMP message from RTI."); + "Failed to read MSG_TYPE_TIMESTAMP_START message from RTI."); LF_PRINT_DEBUG("Read 9 bytes."); // First byte received is the message ID. - if (buffer[0] != MSG_TYPE_TIMESTAMP) { - lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP message from the RTI. Got %u (see net_common.h).", - buffer[0]); + if (buffer[0] != MSG_TYPE_TIMESTAMP_START) { + lf_print_error_and_exit("Expected a MSG_TYPE_TIMESTAMP_START message from " + "the RTI. Got %u (see net_common.h).", + buffer[0]); } + // Read the federation start_time first, then the effective_start_tag after instant_t timestamp = extract_int64(&(buffer[1])); - - tag_t tag = {.time = timestamp, .microstep = 0}; - // Trace the event when tracing is enabled - tracepoint_federate_from_rti(_fed.trace, receive_TIMESTAMP, _lf_my_fed_id, &tag); - lf_print("Starting timestamp is: " PRINTF_TIME ".", timestamp); + effective_start_tag = extract_tag(&(buffer[9])); + + // Trace the event when tracing is enabled. + // Note that we report in the trace the effective_start_tag. + // This is rather a choice. To be changed, if needed, of course. + tracepoint_federate_from_rti(_fed.trace, receive_TIMESTAMP, _lf_my_fed_id, &effective_start_tag); LF_PRINT_LOG("Current physical time is: " PRINTF_TIME ".", lf_time_physical()); return timestamp; @@ -2259,6 +2268,21 @@ void handle_stop_granted_message() { } } +/** + * Handle a MSG_TYPE_STOP message from the RTI. + * + * This function simply calls lf_stop(). + */ +void handle_stop() { + // Trace the event when tracing is enabled + tracepoint_federate_from_rti(_fed.trace, receive_STOP, _lf_my_fed_id, NULL); + + lf_print("Received from RTI a MSG_TYPE_STOP at physical time " PRINTF_TIME ".", + lf_time_physical()); + + lf_stop(); +} + /** * Handle a MSG_TYPE_STOP_REQUEST message from the RTI. */ @@ -2373,7 +2397,7 @@ void terminate_execution(environment_t* env) { tracepoint_federate_to_rti(_fed.trace, send_RESIGN, _lf_my_fed_id, &tag); ssize_t written = write_to_socket(_fed.socket_TCP_RTI, bytes_to_write, &(buffer[0])); if (written == bytes_to_write) { - LF_PRINT_LOG("Resigned."); + lf_print("Resigned %d", _lf_my_fed_id); } } lf_mutex_unlock(&outbound_socket_mutex); @@ -2551,6 +2575,9 @@ void* listen_to_rti_TCP(void* args) { case MSG_TYPE_STOP_GRANTED: handle_stop_granted_message(); break; + case MSG_TYPE_STOP: + handle_stop(); + break; case MSG_TYPE_PORT_ABSENT: handle_port_absent_message(_fed.socket_TCP_RTI, -1); break; @@ -2711,8 +2738,7 @@ tag_t _lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply LF_PRINT_DEBUG("Granted tag " PRINTF_TAG " because the federate has neither " "upstream nor downstream federates.", tag.time - start_time, tag.microstep); - return tag; - } + return tag; } // If time advance (TAG or PTAG) has already been granted for this tag // or a larger tag, then return immediately. @@ -2884,4 +2910,35 @@ void set_federation_id(const char* fid) { void set_federation_trace_object(trace_t * trace) { _fed.trace = trace; } + +void lf_stop() { + environment_t *env; + int num_env = _lf_get_environments(&env); + + for (int i = 0 ; i < num_env ; i++) { + tag_t new_stop_tag; + new_stop_tag.time = env[i].current_tag.time; + new_stop_tag.microstep = env[i].current_tag.microstep + 1; + _lf_set_stop_tag(&env[i], new_stop_tag); + } + + LF_PRINT_LOG("Federate is stopping."); +} + +char* lf_get_federates_bin_directory() { + return LF_FEDERATES_BIN_DIRECTORY; +} + +char* lf_get_federation_id() { + return federation_metadata.federation_id; +} + +instant_t lf_get_effective_start_time() { + return effective_start_tag.time; +} + +instant_t lf_get_start_time() { + return start_time; +} + #endif diff --git a/core/tag.c b/core/tag.c index 3e9c366ff..929d14580 100644 --- a/core/tag.c +++ b/core/tag.c @@ -38,6 +38,12 @@ typedef enum _lf_time_type { // Global variables declared in tag.h: instant_t start_time = NEVER; +/** + * Only useful for transient federates. It records the effective start tag, to + * be used at startup. Elapsed logical time calculations will use start_time. + */ +tag_t effective_start_tag = {.time = 0LL, .microstep = 0}; + //////////////// Global variables not declared in tag.h (must be declared extern if used elsewhere): /** diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index 749550bc3..1d327b6ea 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -56,6 +56,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // Global variables defined in tag.c and shared across environments: extern instant_t _lf_last_reported_unadjusted_physical_time_ns; extern instant_t start_time; +extern tag_t effective_start_tag; /** * The maximum amount of time a worker thread should stall @@ -681,7 +682,7 @@ void _lf_initialize_start_tag(environment_t *env) { synchronize_with_other_federates(); // Resets start_time in federated execution according to the RTI. } // The start time will likely have changed. Adjust the current tag and stop tag. - env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; + env->current_tag = effective_start_tag; if (duration >= 0LL) { // A duration has been specified. Recalculate the stop time. env->stop_tag = ((tag_t) {.time = start_time + duration, .microstep = 0}); @@ -715,7 +716,7 @@ void _lf_initialize_start_tag(environment_t *env) { start_time, _lf_fed_STA_offset); // Ignore interrupts to this wait. We don't want to start executing until // physical time matches or exceeds the logical start time. - while (!wait_until(env, start_time, &env->event_q_changed)) {} + while (!wait_until(env, effective_start_tag.time, &env->event_q_changed)) {} LF_PRINT_DEBUG("Done waiting for start time " PRINTF_TIME ".", start_time); LF_PRINT_DEBUG("Physical time is ahead of current time by " PRINTF_TIME ". This should be small.", lf_time_physical() - start_time); diff --git a/core/trace.c b/core/trace.c index 1839e8f84..221f1c0d1 100644 --- a/core/trace.c +++ b/core/trace.c @@ -241,9 +241,60 @@ void flush_trace(trace_t* trace, int worker) { lf_critical_section_exit(GLOBAL_ENVIRONMENT); } +/** + * This utility function helps creating a new file trace, if already one exists. + * This is particularly useful for transient federates, since each joining one will + * have a different trace file. + * @param n the integer to convert into a string + * @return the converted string + */ +char * convert_int_to_string(int n) { + // Count the number of digits in n + int n_ = n; + int number_of_digits = 0; + while (n_) { + number_of_digits++; + n_ /= 10; + } + + // Construct the array of chars to return + char *string_of_int; + string_of_int = (char *)malloc(number_of_digits + 1); + + // Extract the digits and convert them into chars + int index = 0; + for (int i = 0; i < number_of_digits ; i++) { + string_of_int[number_of_digits - i - 1] = n % 10 + '0'; + n /= 10; + } + // Add the null character and return + string_of_int[number_of_digits] = '\0'; + return (char *)string_of_int; +} + void start_trace(trace_t* trace) { // FIXME: location of trace file should be customizable. - trace->_lf_trace_file = fopen(trace->filename, "w"); + char filename_[100]; + strcpy(filename_, trace->filename); + int i = 0; + FILE *test_file_exists; + while (true) { + test_file_exists = fopen(filename_, "r"); + if (test_file_exists == NULL) { + break; + } + fclose(test_file_exists); + // Get the root of the original file name + memset(filename_, '\0', sizeof(filename_)); + strncpy(filename_, trace->filename, strlen(trace->filename) - 4); + // Add an index + char *ind = convert_int_to_string(i++); + strcat(filename_, ind); + // Add the file extension + strcat(filename_, ".lft"); + } + + trace->_lf_trace_file = fopen(filename_, "w"); if (trace->_lf_trace_file == NULL) { fprintf(stderr, "WARNING: Failed to open log file with error code %d." "No log will be written.\n", errno); diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index aca041e5b..5c40f1509 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -172,6 +172,12 @@ typedef struct federate_instance_t { */ bool has_downstream; + /** + * Indicator of whether this federate is transient. + * The default value of false may be overridden in _lf_initialize_trigger_objects. + */ + bool is_transient; + /** * Used to prevent the federate from sending a REQUEST_STOP * message if it has already received a stop request from the RTI. @@ -472,4 +478,44 @@ void synchronize_with_other_federates(); */ void wait_until_port_status_known(environment_t* env, int portID, interval_t STAA); +/** + * Handle a MSG_TYPE_STOP message from the RTI. + * + * This function simply calls lf_stop() + */ +void handle_stop(); + +/** + * @brief Stop the execution of a federate. + * Every enclave within the federate will stop at one microstep later than its + * current tag. Unlike lf_request_stop(), this process does not require any + * involvement from the RTI, nor does it necessitate any consensus. + * + * This function is particularly useful for testing transient federates. + */ +void lf_stop(); + +/** + * @brief Return the directory containing the executables of the individual + * federates. + */ +char* lf_get_federates_bin_directory(); + +/** + * @brief Returns the federation id. + * + * This function is useful for creating federates on runtime. + */ +char* lf_get_federation_id(); + +/** + * @brief Returns the effective start time of the federate. The start_time of persistent + * federates is equal to their effective_start_time. Transient federates, however, + * have their effective_start_time higher or equal to their start_time. + */ +instant_t lf_get_effective_start_time(); + +/** @brief Returns the start time of the federate. */ +instant_t lf_get_start_time(); + #endif // FEDERATE_H diff --git a/include/core/federated/net_common.h b/include/core/federated/net_common.h index 38001cc0b..9b78622af 100644 --- a/include/core/federated/net_common.h +++ b/include/core/federated/net_common.h @@ -64,7 +64,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * When the federation IDs match, the RTI will respond with an * MSG_TYPE_ACK. - * + * * The next message to the RTI will be a MSG_TYPE_NEIGHBOR_STRUCTURE message * that informs the RTI about connections between this federate and other * federates where messages are routed through the RTI. Currently, this only @@ -172,9 +172,9 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * each federate report a reading of its physical clock to the RTI on a * `MSG_TYPE_TIMESTAMP`. The RTI broadcasts the maximum of these readings plus * `DELAY_START` to all federates as the start time, again on a `MSG_TYPE_TIMESTAMP`. - * - * The next step depends on the coordination type. - * + * + * The next step depends on the coordination type. + * * Under centralized coordination, each federate will send a * `MSG_TYPE_NEXT_EVENT_TAG` to the RTI with the start tag. That is to say that * each federate has a valid event at the start tag (start time, 0) and it will @@ -185,7 +185,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * have to wait for a `MSG_TYPE_TAG_ADVANCE_GRANT` or a * `MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT` before it can advance to a * particular tag. - * + * * Under decentralized coordination, the coordination is governed by STA and * STAAs, as further explained in https://doi.org/10.48550/arXiv.2109.07771. * @@ -208,7 +208,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define UDP_TIMEOUT_TIME SEC(1) - /** * Size of the buffer used for messages sent between federates. * This is used by both the federates and the rti, so message lengths @@ -380,12 +379,21 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. /** * Byte identifying a timestamp message, which is 64 bits long. * Each federate sends its starting physical time as a message of this - * type, and the RTI broadcasts to all the federates the starting logical - * time as a message of this type. - s*/ + * type. + */ #define MSG_TYPE_TIMESTAMP 2 #define MSG_TYPE_TIMESTAMP_LENGTH (1 + sizeof(int64_t)) +/** + * As an answer to MSG_TYPE_TIMESTAMP, the RTI broadcasts to all persistent + * federates, or sends to newly joining transient federate, a message of + * MSG_TYPE_STIMESTAMP_START. It includes the starting time of the federation, + * together with the effective starting logical tag. The latter is useful for + * transient federates. + */ +#define MSG_TYPE_TIMESTAMP_START 50 +#define MSG_TYPE_TIMESTAMP_START_LENGTH (1 + sizeof(instant_t) + sizeof(instant_t) + sizeof(microstep_t)) + /** Byte identifying a message to forward to another federate. * The next two bytes will be the ID of the destination port. * The next two bytes are the destination federate ID. @@ -414,7 +422,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define MSG_TYPE_TAGGED_MESSAGE 5 -/** +/** * Byte identifying a next event tag (NET) message sent from a federate in * centralized coordination. The next eight bytes will be the timestamp. The * next four bytes will be the microstep. This message from a federate tells the @@ -430,7 +438,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define MSG_TYPE_NEXT_EVENT_TAG 6 -/** +/** * Byte identifying a time advance grant (TAG) sent by the RTI to a federate * in centralized coordination. This message is a promise by the RTI to the federate * that no later message sent to the federate will have a tag earlier than or @@ -440,7 +448,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define MSG_TYPE_TAG_ADVANCE_GRANT 7 -/** +/** * Byte identifying a provisional time advance grant (PTAG) sent by the RTI to a federate * in centralized coordination. This message is a promise by the RTI to the federate * that no later message sent to the federate will have a tag earlier than the tag @@ -450,7 +458,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT 8 -/** +/** * Byte identifying a logical tag complete (LTC) message sent by a federate * to the RTI. * The next eight bytes will be the timestep of the completed tag. @@ -490,18 +498,20 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * that specifies the stop time on all other federates, then every federate * depends on every other federate and time cannot be advanced. * Hence, the actual stop time may be nondeterministic. - * + * * If, on the other hand, the federate requesting the stop is upstream of every * other federate, then it should be possible to respect its requested stop tag. */ #define MSG_TYPE_STOP_REQUEST 10 #define MSG_TYPE_STOP_REQUEST_LENGTH (1 + sizeof(instant_t) + sizeof(microstep_t)) -#define ENCODE_STOP_REQUEST(buffer, time, microstep) do { \ - buffer[0] = MSG_TYPE_STOP_REQUEST; \ - encode_int64(time, &(buffer[1])); \ - assert(microstep >= 0); \ - encode_int32((int32_t)microstep, &(buffer[1 + sizeof(instant_t)])); \ -} while(0) +#define ENCODE_STOP_REQUEST(buffer, time, microstep) \ + do \ + { \ + buffer[0] = MSG_TYPE_STOP_REQUEST; \ + encode_int64(time, &(buffer[1])); \ + assert(microstep >= 0); \ + encode_int32((int32_t)microstep, &(buffer[1 + sizeof(instant_t)])); \ + } while (0) /** * Byte indicating a federate's reply to a MSG_TYPE_STOP_REQUEST that was sent @@ -513,28 +523,32 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define MSG_TYPE_STOP_REQUEST_REPLY 11 #define MSG_TYPE_STOP_REQUEST_REPLY_LENGTH (1 + sizeof(instant_t) + sizeof(microstep_t)) -#define ENCODE_STOP_REQUEST_REPLY(buffer, time, microstep) do { \ - buffer[0] = MSG_TYPE_STOP_REQUEST_REPLY; \ - encode_int64(time, &(buffer[1])); \ - assert(microstep >= 0); \ - encode_int32((int32_t)microstep, &(buffer[1 + sizeof(instant_t)])); \ -} while(0) +#define ENCODE_STOP_REQUEST_REPLY(buffer, time, microstep) \ + do \ + { \ + buffer[0] = MSG_TYPE_STOP_REQUEST_REPLY; \ + encode_int64(time, &(buffer[1])); \ + assert(microstep >= 0); \ + encode_int32((int32_t)microstep, &(buffer[1 + sizeof(instant_t)])); \ + } while (0) /** * Byte sent by the RTI indicating that the stop request from some federate * has been granted. The payload is the tag at which all federates have * agreed that they can stop. - * The next 8 bytes will be the time at which the federates will stop. * + * The next 8 bytes will be the time at which the federates will stop. * * The next 4 bytes will be the microstep at which the federates will stop.. */ #define MSG_TYPE_STOP_GRANTED 12 #define MSG_TYPE_STOP_GRANTED_LENGTH (1 + sizeof(instant_t) + sizeof(microstep_t)) -#define ENCODE_STOP_GRANTED(buffer, time, microstep) do { \ - buffer[0] = MSG_TYPE_STOP_GRANTED; \ - encode_int64(time, &(buffer[1])); \ - assert(microstep >= 0); \ - encode_int32((int32_t)microstep, &(buffer[1 + sizeof(instant_t)])); \ -} while(0) +#define ENCODE_STOP_GRANTED(buffer, time, microstep) \ + do \ + { \ + buffer[0] = MSG_TYPE_STOP_GRANTED; \ + encode_int64(time, &(buffer[1])); \ + assert(microstep >= 0); \ + encode_int32((int32_t)microstep, &(buffer[1 + sizeof(instant_t)])); \ + } while (0) /////////// End of lf_request_stop() messages //////////////// @@ -572,7 +586,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. /** * Byte identifying a message to send directly to another federate. - * + * * The next two bytes will be the ID of the destination port. * The next two bytes are the destination federate ID. This is checked against * the _lf_my_fed_id of the receiving federate to ensure the message was intended for @@ -586,7 +600,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * This is a variant of @see MSG_TYPE_TAGGED_MESSAGE that is used in P2P connections between * federates. Having a separate message type for P2P connections between federates * will be useful in preventing crosstalk. - * + * * The next two bytes will be the ID of the destination port. * The next two bytes are the destination federate ID. This is checked against * the _lf_my_fed_id of the receiving federate to ensure the message was intended for @@ -641,11 +655,10 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define MSG_TYPE_CLOCK_SYNC_CODED_PROBE 22 - /** * A port absent message, informing the receiver that a given port * will not have event for the current logical time. - * + * * The next 2 bytes is the port id. * The next 2 bytes will be the federate id of the destination federate. * This is needed for the centralized coordination so that the RTI knows where @@ -655,21 +668,19 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define MSG_TYPE_PORT_ABSENT 23 - - /** * A message that informs the RTI about connections between this federate and * other federates where messages are routed through the RTI. Currently, this * only includes logical connections when the coordination is centralized. This * information is needed for the RTI to perform the centralized coordination. - * + * * @note Only information about the immediate neighbors is required. The RTI can * transitively obtain the structure of the federation based on each federate's * immediate neighbor information. * - * The next 4 bytes is the number of upstream federates. + * The next 4 bytes is the number of upstream federates. * The next 4 bytes is the number of downstream federates. - * + * * Depending on the first four bytes, the next bytes are pairs of (fed ID (2 * bytes), delay (8 bytes)) for this federate's connection to upstream federates * (by direct connection). The delay is the minimum "after" delay of all @@ -685,6 +696,16 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define MSG_TYPE_NEIGHBOR_STRUCTURE 24 #define MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE 9 +/** + * Byte sent by the RTI ordering the federate to stop. Upon receiving the meaasage, + * the federate will call lf_stop(), which will make him resign at its current_tag + * plus 1 microstep. + * The next 8 bytes will be the time at which the federates will stop. * + * The next 4 bytes will be the microstep at which the federates will stop.. + */ +#define MSG_TYPE_STOP 30 +#define MSG_TYPE_STOP_LENGTH 1 + ///////////////////////////////////////////// //// Rejection codes diff --git a/include/core/trace.h b/include/core/trace.h index 357f22b11..0c70df9e9 100644 --- a/include/core/trace.h +++ b/include/core/trace.h @@ -120,6 +120,9 @@ typedef enum receive_ADR_AD, receive_ADR_QR, receive_UNIDENTIFIED, + // Transient + send_STOP, + receive_STOP, NUM_EVENT_TYPES } trace_event_t; @@ -183,6 +186,9 @@ static const char *trace_event_names[] = { "Receiving ADR_AD", "Receiving ADR_QR", "Receiving UNIDENTIFIED", + // Transient + "Sending STOP", + "Receiving STOP" }; // FIXME: Target property should specify the capacity of the trace buffer. diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 5fde102e7..6d21bf098 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -single-threaded +transient-federates \ No newline at end of file diff --git a/util/tracing/visualization/fedsd.py b/util/tracing/visualization/fedsd.py index d9d44253b..142ce3589 100644 --- a/util/tracing/visualization/fedsd.py +++ b/util/tracing/visualization/fedsd.py @@ -28,6 +28,7 @@ .TAG { stroke: #08a578; fill: #08a578} \ .TIMESTAMP { stroke: grey; fill: grey } \ .FED_ID {stroke: #80DD99; fill: #80DD99 } \ + .STOP {stroke: #d0b7eb; fill: #d0b7eb} \ .ADV {stroke-linecap="round" ; stroke: "red" ; fill: "red"} \ text { \ font-size: smaller; \ @@ -81,7 +82,9 @@ "Receiving ADR_AD": "ADR_AD", "Receiving ADR_QR": "ADR_QR", "Receiving UNIDENTIFIED": "UNIDENTIFIED", - "Scheduler advancing time ends": "AdvLT" + "Scheduler advancing time ends": "AdvLT", + "Sending STOP": "STOP", + "Receiving STOP": "STOP" } prune_event_name.setdefault(" ", "UNIDENTIFIED") @@ -104,7 +107,7 @@ # Events matching at the sender and receiver ends depend on whether they are tagged # (the elapsed logical time and microstep have to be the same) or not. # Set of tagged events (messages) -non_tagged_messages = {'FED_ID', 'ACK', 'REJECT', 'ADR_RQ', 'ADR_AD', 'MSG', 'P2P_MSG'} +non_tagged_messages = {'FED_ID', 'ACK', 'REJECT', 'ADR_RQ', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'} ################################################################################ @@ -203,7 +206,6 @@ def svg_string_draw_label(x1, y1, x2, y2, label) : else: rotation = 0 str_line = '\t'+label+'\n' - #print('rot = '+str(rotation)+' x1='+str(x1)+' y1='+str(y1)+' x2='+str(x2)+' y2='+str(y2)) return str_line @@ -491,11 +493,16 @@ def get_and_convert_lft_files(rti_lft_file, federates_lft_files): if (not fed_df.empty): # Get the federate id number fed_id = fed_df.iloc[-1]['self_id'] - # Add to the list of sequence diagram actors and add the name - actors.append(fed_id) - actors_names[fed_id] = Path(fed_trace).stem - # Derive the x coordinate of the actor - x_coor[fed_id] = (padding * 2) + (spacing * (len(actors) - 1)) + ### Check that the federate id have not been entrered yet. + ### This is particlurly useful for transient actors, when + ### they leave and join several times + if (actors.count(fed_id) == 0): + # Add to the list of sequence diagram actors and add the name + actors.append(fed_id) + actors_names[fed_id] = Path(fed_trace).stem + # Derive the x coordinate of the actor + x_coor[fed_id] = (padding * 2) + (spacing * (len(actors)-1)) + fed_df['x1'] = x_coor[fed_id] trace_df = pd.concat([trace_df, fed_df]) fed_df = fed_df[0:0] @@ -659,7 +666,7 @@ def get_and_convert_lft_files(rti_lft_file, federates_lft_files): # FIXME: Using microseconds is hardwired here. physical_time = f'{int(row["physical_time"]/1000):,}' - if (row['event'] in {'FED_ID', 'ACK', 'REJECT', 'ADR_RQ', 'ADR_AD', 'MSG', 'P2P_MSG'}): + if (row['event'] in {'FED_ID', 'ACK', 'REJECT', 'ADR_RQ', 'ADR_AD', 'MSG', 'P2P_MSG', 'STOP'}): label = row['event'] else: label = row['event'] + '(' + f'{int(row["logical_time"]):,}' + ', ' + str(row['microstep']) + ')'