@@ -62,10 +62,6 @@ typedef unsigned char byte_t; ///< For compatibility with platforms where byte s
6262/// as new transfers. Must be a multiple of 64 bits.
6363#define RX_TRANSFER_ID_WINDOW_BITS 512U
6464
65- /// Defines the number of most recently received transfer IDs for which the reception history is stored.
66- /// This is used to avoid accepting duplicates that are outside of the RX_TRANSFER_ID_WINDOW_BITS range.
67- #define RX_TRANSFER_ID_HISTORY_LENGTH 8U
68-
6965#define UDP_PORT 9382U
7066#define IPv4_MCAST_PREFIX 0xEF000000UL
7167#define IPv4_MCAST_SUFFIX_MASK 0x007FFFFFUL
@@ -1001,6 +997,15 @@ static void rx_transfer_id_window_slide(rx_transfer_id_window_t* const self, con
1001997 self -> head = new_head ;
1002998}
1003999
1000+ /// Copy the contents of one transfer-ID window to another.
1001+ static void rx_transfer_id_window_copy (rx_transfer_id_window_t * const dst , const rx_transfer_id_window_t * const src )
1002+ {
1003+ dst -> head = src -> head ;
1004+ for (size_t i = 0 ; i < (RX_TRANSFER_ID_WINDOW_BITS / 64U ); i ++ ) {
1005+ dst -> bitset [i ] = src -> bitset [i ];
1006+ }
1007+ }
1008+
10041009/// Mark the specified past transfer-ID as set. No effect if this transfer-ID is outside of the window.
10051010static void rx_transfer_id_window_set (rx_transfer_id_window_t * const self , const uint64_t transfer_id )
10061011{
@@ -1139,19 +1144,19 @@ typedef struct
11391144 udpard_list_member_t list_by_animation ;
11401145 udpard_us_t last_animated_ts ;
11411146
1142- /// To weed out duplicates and to retransmit lost ACKs, we keep track of which transfers have been received .
1147+ /// To weed out duplicates and to retransmit lost ACKs, we keep track of which transfers have been observed .
11431148 ///
1144- /// The window head points at the newest transfer-ID ejected to the application (which may not be the same as
1149+ /// A window head points at the newest transfer-ID ejected to the application (which may not be the same as
11451150 /// the newest successfully received transfer-ID in case of interning). This is essential for the ORDERED mode
11461151 /// because the head position tells us which transfers can no longer be accepted due to reordering window closure.
11471152 ///
1148- /// The history array stores the list of N recently received transfer-IDs, with the least recently used
1149- /// being replaced first. This is used to support the edge case when the remote sends transfers with greatly
1150- /// different transfer-ID values in a short time, causing large jumps in the transfer-ID window head;
1151- /// this is important because the window can only store a relatively short range of transfer-IDs (a few hundred).
1152- rx_transfer_id_window_t window ;
1153- uint64_t history [ RX_TRANSFER_ID_HISTORY_LENGTH ] ;
1154- uint_fast8_t history_index ;
1153+ /// We keep two such windows. This is needed to support the case when the remote node restarts
1154+ /// while there are some old transfers still in flight in the network. The old transfers may arrive after the
1155+ /// restart and must still be handled correctly with respect to duplicate rejection and ordering.
1156+ /// One example where more than one window is needed to uphold the ORDERED mode guarantees is when the remote
1157+ /// sends transfers #1, #3, #10000, then #2; transfer #2 shall be rejected because it is out of order.
1158+ rx_transfer_id_window_t tidwin ;
1159+ rx_transfer_id_window_t tidwin_old ;
11551160
11561161 bool initialized ; ///< Set after the first frame is seen.
11571162
@@ -1213,7 +1218,6 @@ static udpard_tree_t* cavl_factory_rx_session_by_remote_uid(void* const user)
12131218 out -> remote .uid = args -> remote_uid ;
12141219 out -> owner = args -> owner ;
12151220 out -> last_animated_ts = args -> now ;
1216- out -> history_index = 0 ;
12171221 out -> initialized = false;
12181222 enlist_head (args -> sessions_by_animation , & out -> list_by_animation );
12191223 }
@@ -1244,13 +1248,17 @@ static void rx_session_eject(rx_session_t* const self,
12441248{
12451249 UDPARD_ASSERT (slot -> state == rx_slot_done );
12461250
1247- // Mark the transfer as ejected in the history.
1251+ // Update the history.
1252+ // The window is backed up if a large jump occurs, indicating that the remote node has restarted.
1253+ // The backup copy is needed to properly handle possibly delayed transfers from the old epoch.
12481254 if (slide_window ) {
1249- rx_transfer_id_window_slide (& self -> window , slot -> transfer_id );
1255+ const uint64_t dist = rx_transfer_id_forward_distance (self -> tidwin .head , slot -> transfer_id );
1256+ if (dist > (RX_TRANSFER_ID_WINDOW_BITS / 2U )) {
1257+ rx_transfer_id_window_copy (& self -> tidwin_old , & self -> tidwin );
1258+ }
1259+ rx_transfer_id_window_slide (& self -> tidwin , slot -> transfer_id );
12501260 }
1251- rx_transfer_id_window_set (& self -> window , slot -> transfer_id );
1252- self -> history [self -> history_index ] = slot -> transfer_id ;
1253- self -> history_index = (self -> history_index + 1U ) % RX_TRANSFER_ID_HISTORY_LENGTH ;
1261+ rx_transfer_id_window_set (& self -> tidwin , slot -> transfer_id );
12541262
12551263 // Construct the arguments and invoke the callback.
12561264 const udpard_rx_transfer_t transfer = {
@@ -1288,7 +1296,7 @@ static void rx_session_ordered_scan_slots(rx_session_t* const self,
12881296 // We need to repeat the scan because each ejection may open up the window for the next in-sequence transfer.
12891297 for (size_t iter = 0 ; iter < RX_SLOT_COUNT ; iter ++ ) {
12901298 // Find the slot closest to the next in-sequence transfer-ID.
1291- const uint64_t tid_expected = self -> window .head + 1U ;
1299+ const uint64_t tid_expected = self -> tidwin .head + 1U ;
12921300 uint64_t min_tid_dist = UINT64_MAX ;
12931301 rx_slot_t * slot = NULL ;
12941302 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
@@ -1330,14 +1338,16 @@ static void rx_session_ordered_scan_slots(rx_session_t* const self,
13301338 // Some of the in-progress slots may be obsoleted by this move, which will be taken care of later.
13311339 UDPARD_ASSERT ((slot != NULL ) && (slot -> state == rx_slot_done ));
13321340 rx_session_eject (self , rx , slot , true);
1333- }
13341341
1335- // Having updated the state, ensure that in-progress slots, if any, have not ended up within the accepted window.
1336- // This is essential for the ORDERED mode.
1337- for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1338- if ((self -> slots [i ].state == rx_slot_busy ) &&
1339- rx_transfer_id_window_contains (& self -> window , self -> slots [i ].transfer_id )) {
1340- rx_slot_reset (& self -> slots [i ], self -> owner -> memory .fragment );
1342+ // Ensure that in-progress slots, if any, have not ended up within the accepted window after the update.
1343+ // This is essential for the ORDERED mode.
1344+ for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1345+ slot = & self -> slots [i ];
1346+ if ((slot -> state == rx_slot_busy ) &&
1347+ (rx_transfer_id_window_contains (& self -> tidwin , slot -> transfer_id ) ||
1348+ rx_transfer_id_window_contains (& self -> tidwin_old , slot -> transfer_id ))) {
1349+ rx_slot_reset (slot , self -> owner -> memory .fragment );
1350+ }
13411351 }
13421352 }
13431353}
@@ -1414,24 +1424,22 @@ static rx_session_transfer_status_t rx_session_get_transfer_status(const rx_sess
14141424 const uint64_t transfer_id )
14151425{
14161426 // Check those that have been already ejected to the application.
1417- if (rx_transfer_id_window_test (& self -> window , transfer_id )) {
1427+ if (rx_transfer_id_window_test (& self -> tidwin , transfer_id ) ||
1428+ rx_transfer_id_window_test (& self -> tidwin_old , transfer_id )) {
14181429 return rx_session_transfer_ejected ;
14191430 }
1420- // Check the history in case of large transfer-ID jumps.
1421- for (size_t i = 0 ; i < RX_TRANSFER_ID_HISTORY_LENGTH ; i ++ ) {
1422- if (self -> history [i ] == transfer_id ) {
1423- return rx_session_transfer_ejected ;
1424- }
1425- }
14261431 // Check interned transfers waiting for reordering window closure.
14271432 for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
14281433 if ((self -> slots [i ].state == rx_slot_done ) && (self -> slots [i ].transfer_id == transfer_id )) {
14291434 return rx_session_transfer_interned ; // received successfully but the application is yet to see it
14301435 }
14311436 }
14321437 // The transfer is not received; check if it's within the dup transfer-ID window.
1433- return rx_transfer_id_window_contains (& self -> window , transfer_id ) ? rx_session_transfer_late
1434- : rx_session_transfer_new ;
1438+ // If it is, then it cannot be ejected under the ORDERED mode any longer.
1439+ return (rx_transfer_id_window_contains (& self -> tidwin , transfer_id ) ||
1440+ rx_transfer_id_window_contains (& self -> tidwin_old , transfer_id ))
1441+ ? rx_session_transfer_late
1442+ : rx_session_transfer_new ;
14351443}
14361444
14371445/// The ORDERED mode implementation. May delay incoming transfers to maintain strict transfer-ID ordering.
@@ -1538,10 +1546,8 @@ static void rx_session_update(rx_session_t* const self,
15381546 // Any transfers with prior transfer-ID values arriving later will be rejected, which is acceptable.
15391547 if (!self -> initialized ) {
15401548 self -> initialized = true;
1541- rx_transfer_id_window_slide (& self -> window , frame .meta .transfer_id - 1U );
1542- for (size_t i = 0 ; i < RX_TRANSFER_ID_HISTORY_LENGTH ; i ++ ) {
1543- self -> history [i ] = frame .meta .transfer_id - 1U ;
1544- }
1549+ rx_transfer_id_window_slide (& self -> tidwin , frame .meta .transfer_id - 1U );
1550+ rx_transfer_id_window_copy (& self -> tidwin_old , & self -> tidwin );
15451551 }
15461552
15471553 // Accept the frame depending on the selected reassembly mode.
0 commit comments