@@ -1130,6 +1130,7 @@ static void rx_session_on_ack_mandate(const rx_session_t* const self,
11301130 const uint64_t transfer_id ,
11311131 const udpard_bytes_t payload_head )
11321132{
1133+ UDPARD_ASSERT (self -> owner -> invoked );
11331134 udpard_rx_subscription_t * const subscription =
11341135 (self -> owner == & rx -> p2p_port ) ? NULL : (udpard_rx_subscription_t * )self -> owner ;
11351136 const udpard_rx_ack_mandate_t mandate = {
@@ -1143,6 +1144,7 @@ static void rx_session_on_ack_mandate(const rx_session_t* const self,
11431144/// The payload ownership is transferred to the application.
11441145static void rx_session_on_message (const rx_session_t * const self , udpard_rx_t * const rx , rx_slot_t * const slot )
11451146{
1147+ UDPARD_ASSERT (self -> owner -> invoked );
11461148 udpard_rx_subscription_t * const subscription =
11471149 (self -> owner == & rx -> p2p_port ) ? NULL : (udpard_rx_subscription_t * )self -> owner ;
11481150 const udpard_rx_transfer_t transfer = {
@@ -1309,6 +1311,58 @@ static void rx_session_ordered_scan_slots(rx_session_t* const self,
13091311 }
13101312}
13111313
1314+ /// Finds an existing in-progress slot with the specified transfer-ID, or allocates a new one.
1315+ /// Allocation always succeeds so the result is never NULL, but it may cause early ejection of an interned DONE slot.
1316+ static rx_slot_t * rx_session_get_slot (rx_session_t * const self ,
1317+ udpard_rx_t * const rx ,
1318+ const udpard_us_t ts ,
1319+ const uint64_t transfer_id )
1320+ {
1321+ // First, check if one is in progress already; resume it if so.
1322+ for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1323+ if ((self -> slots [i ].state == rx_slot_busy ) && (self -> slots [i ].transfer_id == transfer_id )) {
1324+ return & self -> slots [i ];
1325+ }
1326+ }
1327+ // This appears to be a new transfer, so we will need to allocate a new slot for it.
1328+ for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1329+ if (self -> slots [i ].state == rx_slot_idle ) {
1330+ return & self -> slots [i ];
1331+ }
1332+ }
1333+ // All slots are currently occupied; find the oldest slot to sacrifice, which may be busy or done.
1334+ rx_slot_t * slot = NULL ;
1335+ udpard_us_t oldest_ts = HEAT_DEATH ;
1336+ for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1337+ UDPARD_ASSERT (self -> slots [i ].state != rx_slot_idle ); // Checked this already.
1338+ if (self -> slots [i ].ts_max < oldest_ts ) {
1339+ oldest_ts = self -> slots [i ].ts_max ;
1340+ slot = & self -> slots [i ];
1341+ }
1342+ }
1343+ UDPARD_ASSERT ((slot != NULL ) && ((slot -> state == rx_slot_busy ) || (slot -> state == rx_slot_done )));
1344+ // If it's busy, it is probably just a stale transfer, so it's a no-brainer to evict it.
1345+ // If it's done, we have to force the reordering window to close early to free up a slot without transfer loss.
1346+ if (slot -> state == rx_slot_busy ) {
1347+ rx_slot_reset (slot , self -> owner -> memory .fragment ); // Just a stale transfer, it's probably dead anyway.
1348+ } else {
1349+ UDPARD_ASSERT (slot -> state == rx_slot_done );
1350+ // The oldest slot is DONE; we cannot just reset it, we must force an early ejection.
1351+ // The slot to eject will be chosen based on the transfer-ID, which may not be the oldest slot.
1352+ // Then we repeat the search looking for any IDLE slot, which must succeed now.
1353+ rx_session_ordered_scan_slots (self , rx , ts , true); // A slot will be ejected (we don't know which one).
1354+ slot = NULL ;
1355+ for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1356+ if (self -> slots [i ].state == rx_slot_idle ) {
1357+ slot = & self -> slots [i ];
1358+ break ;
1359+ }
1360+ }
1361+ }
1362+ UDPARD_ASSERT ((slot != NULL ) && (slot -> state == rx_slot_idle ));
1363+ return slot ;
1364+ }
1365+
13121366typedef enum
13131367{
13141368 rx_session_transfer_new , ///< Should be accepted --- part of a transfer not yet received.
@@ -1342,6 +1396,7 @@ static void rx_session_update(rx_session_t* const self,
13421396 const udpard_mem_deleter_t payload_deleter ,
13431397 const uint_fast8_t ifindex )
13441398{
1399+ UDPARD_ASSERT (self -> owner -> invoked );
13451400 UDPARD_ASSERT (self -> remote .uid == frame .meta .sender_uid );
13461401 UDPARD_ASSERT (frame .meta .topic_hash == self -> owner -> topic_hash ); // must be checked by the caller beforehand
13471402
@@ -1378,54 +1433,10 @@ static void rx_session_update(rx_session_t* const self,
13781433 }
13791434
13801435 // It appears that we need to accept this frame. We need to find a suitable slot for that.
1381- rx_slot_t * slot = NULL ;
1382- for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) { // First, check if one is in progress already; resume it if so.
1383- if ((self -> slots [i ].state == rx_slot_busy ) && (self -> slots [i ].transfer_id == frame .meta .transfer_id )) {
1384- slot = & self -> slots [i ];
1385- break ;
1386- }
1387- }
1388- if (slot == NULL ) { // This appears to be a new transfer, so we will need to allocate a new slot for it.
1389- for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1390- if (self -> slots [i ].state == rx_slot_idle ) {
1391- slot = & self -> slots [i ];
1392- break ;
1393- }
1394- }
1395- }
1396- if (slot == NULL ) { // All slots are currently occupied; sacrifice the oldest slot, which may be busy or done.
1397- udpard_us_t oldest_ts = HEAT_DEATH ;
1398- for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1399- UDPARD_ASSERT (self -> slots [i ].state != rx_slot_idle ); // Checked this already.
1400- if (self -> slots [i ].ts_max < oldest_ts ) {
1401- oldest_ts = self -> slots [i ].ts_max ;
1402- slot = & self -> slots [i ];
1403- }
1404- }
1405- // If it's busy, it is probably just a stale transfer, so it's a no-brainer to evict it.
1406- // If it's done, we have to force the reordering window to close early to free up a slot without transfer loss.
1407- UDPARD_ASSERT ((slot != NULL ) && ((slot -> state == rx_slot_busy ) || (slot -> state == rx_slot_done )));
1408- if (slot -> state == rx_slot_done ) {
1409- rx_session_ordered_scan_slots (self , rx , ts , true); // A slot will be ejected (maybe another one).
1410- slot = NULL ; // Repeat the search. It is certain that now we have at least one idle slot.
1411- for (size_t i = 0 ; i < RX_SLOT_COUNT ; i ++ ) {
1412- if (self -> slots [i ].state == rx_slot_idle ) {
1413- slot = & self -> slots [i ];
1414- break ;
1415- }
1416- }
1417- } else {
1418- UDPARD_ASSERT (slot -> state == rx_slot_busy );
1419- rx_slot_reset (slot , self -> owner -> memory .fragment ); // Just a stale transfer, it's probably dead anyway.
1420- }
1421- UDPARD_ASSERT ((slot != NULL ) && (slot -> state == rx_slot_idle ));
1422- }
1423- UDPARD_ASSERT (slot != NULL );
1436+ rx_slot_t * const slot = rx_session_get_slot (self , rx , ts , frame .meta .transfer_id );
1437+ UDPARD_ASSERT ((slot != NULL ) && (slot -> state != rx_slot_done ));
14241438 UDPARD_ASSERT ((slot -> state == rx_slot_idle ) ||
14251439 ((slot -> state == rx_slot_busy ) && (slot -> transfer_id == frame .meta .transfer_id )));
1426- UDPARD_ASSERT (slot -> state != rx_slot_done );
1427-
1428- // Update the slot state and check if it completes the transfer.
14291440 rx_slot_update (slot ,
14301441 ts ,
14311442 self -> owner -> memory .fragment ,
@@ -1471,6 +1482,7 @@ bool udpard_rx_new(udpard_rx_t* const self,
14711482 .reordering_window = UDPARD_REORDERING_WINDOW_UNORDERED ,
14721483 .memory = p2p_port_memory ,
14731484 .index_session_by_remote_uid = NULL ,
1485+ .invoked = false,
14741486 };
14751487 self -> list_session_by_animation = (udpard_list_t ){ NULL , NULL };
14761488 self -> index_session_by_reordering = NULL ;
@@ -1487,6 +1499,8 @@ bool udpard_rx_new(udpard_rx_t* const self,
14871499
14881500void udpard_rx_poll (udpard_rx_t * const self , const udpard_us_t now )
14891501{
1502+ UDPARD_ASSERT (!self -> p2p_port .invoked );
1503+ self -> p2p_port .invoked = true;
14901504 // Retire timed out sessions. We retire at most one per poll to avoid burstiness because session retirement
14911505 // may potentially free up a lot of memory at once.
14921506 {
@@ -1505,4 +1519,5 @@ void udpard_rx_poll(udpard_rx_t* const self, const udpard_us_t now)
15051519 }
15061520 rx_session_ordered_scan_slots (ses , self , now , false);
15071521 }
1522+ self -> p2p_port .invoked = false;
15081523}
0 commit comments