Skip to content

Commit 0a4759c

Browse files
wehyyroiedanino
andauthored
UCT/IB/UD: update stale dest ep_id and stale acks from before connection reset (#11092)
* UCT/IB/UD: revise stale ep_id and stale acks from before connection reset * AUTHORS: update * UCT/IB/UD: unify codeline for readability * UCT/IB/UD: unify codeline for readability * TEST/GTEST/UCT/IB: added a test for checking dest ep id is being updated Signed-off-by: Roie Danino <rdanino@nvidia.com> --------- Signed-off-by: Roie Danino <rdanino@nvidia.com> Co-authored-by: Roie Danino <rdanino@nvidia.com>
1 parent a346463 commit 0a4759c

File tree

3 files changed

+164
-12
lines changed

3 files changed

+164
-12
lines changed

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ Zhongkai Zhang <zhzhang@habana.ai>
120120
Zhu Yanjun <yanjunz@mellanox.com>
121121
Zihao Zhao <zizhao@nvidia.com>
122122
lzhang2 <cherry.zhang@intel.com>
123+
michaelzli <michaelzli@tencent.com>
123124

124125
In addition we would like to acknowledge the following members of UCX community
125126
for their participation in annual face-to-face meeting, design discussions, and

src/uct/ib/ud/base/ud_ep.c

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,15 @@ uct_ud_ep_process_ack(uct_ud_iface_t *iface, uct_ud_ep_t *ep,
728728
return;
729729
}
730730

731+
/* Ignore stale ACKs for unsent packets (e.g., after endpoint reset).
732+
* Valid ACK PSN must be in (acked_psn, psn). */
733+
if (ucs_unlikely(UCT_UD_PSN_COMPARE(ep->tx.psn, <=, ack_psn))) {
734+
ucs_debug("ep %p: ignoring invalid ack_psn=%u (tx.psn=%u acked_psn=%u)"
735+
" - possibly stale from previous connection",
736+
ep, ack_psn, ep->tx.psn, ep->tx.acked_psn);
737+
return;
738+
}
739+
731740
ep->tx.acked_psn = ack_psn;
732741
ucs_assertv(UCT_UD_PSN_COMPARE(ep->tx.acked_psn, <, ep->tx.psn),
733742
"ep %p: flags=0x%x acked_psn=%u must be smaller than"
@@ -829,21 +838,30 @@ static void uct_ud_ep_rx_creq(uct_ud_iface_t *iface, uct_ud_neth_t *neth)
829838
ep->rx.ooo_pkts.head_sn = neth->psn;
830839
uct_ud_peer_copy(&ep->peer, ucs_unaligned_ptr(&ctl->peer));
831840
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP);
832-
} else if (ep->dest_ep_id == UCT_UD_EP_NULL_ID) {
833-
/* simultaneous CREQ */
841+
} else if (uct_ib_unpack_uint24(ctl->conn_req.ep_addr.ep_id) != ep->dest_ep_id) {
842+
if (ep->dest_ep_id == UCT_UD_EP_NULL_ID) {
843+
/* simultaneous CREQ */
844+
ucs_debug("simultaneous CREQ ep=%p"
845+
"(iface=%p conn_sn=%d ep_id=%d, dest_ep_id=%d rx_psn=%u)",
846+
ep, iface, ep->conn_sn, ep->ep_id,
847+
ep->dest_ep_id, ep->rx.ooo_pkts.head_sn);
848+
if (UCT_UD_PSN_COMPARE(ep->tx.psn, >, UCT_UD_INITIAL_PSN)) {
849+
/* our own creq was sent, treat incoming creq as ack and remove our
850+
* own from tx window
851+
*/
852+
uct_ud_ep_process_ack(iface, ep, UCT_UD_INITIAL_PSN, 0);
853+
}
854+
} else {
855+
/* stale EP reuse */
856+
ucs_debug("iface=%p: detected stale EP reuse (ep=%p conn_sn=%d "
857+
"old_dest_ep_id=%d new_ep_id=%d), updating dest_ep_id",
858+
iface, ep, ep->conn_sn, ep->dest_ep_id,
859+
uct_ib_unpack_uint24(ctl->conn_req.ep_addr.ep_id));
860+
}
861+
/* Update dest_ep_id */
834862
uct_ud_ep_set_dest_ep_id(ep, uct_ib_unpack_uint24(ctl->conn_req.ep_addr.ep_id));
835863
ep->rx.ooo_pkts.head_sn = neth->psn;
836864
uct_ud_peer_copy(&ep->peer, ucs_unaligned_ptr(&ctl->peer));
837-
ucs_debug("simultaneous CREQ ep=%p"
838-
"(iface=%p conn_sn=%d ep_id=%d, dest_ep_id=%d rx_psn=%u)",
839-
ep, iface, ep->conn_sn, ep->ep_id,
840-
ep->dest_ep_id, ep->rx.ooo_pkts.head_sn);
841-
if (UCT_UD_PSN_COMPARE(ep->tx.psn, >, UCT_UD_INITIAL_PSN)) {
842-
/* our own creq was sent, treat incoming creq as ack and remove our
843-
* own from tx window
844-
*/
845-
uct_ud_ep_process_ack(iface, ep, UCT_UD_INITIAL_PSN, 0);
846-
}
847865
uct_ud_ep_ctl_op_add(iface, ep, UCT_UD_EP_OP_CREP);
848866
}
849867

test/gtest/uct/ib/test_ud.cc

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -920,6 +920,139 @@ UCS_TEST_SKIP_COND_P(test_ud, ctls_loss,
920920

921921
UCT_INSTANTIATE_UD_TEST_CASE(test_ud)
922922

923+
#if UCT_UD_EP_DEBUG_HOOKS
924+
class test_ud_stale_ack : public test_ud {
925+
public:
926+
static ucs_status_t inject_stale_ack_psn(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
927+
{
928+
if (m_stale_ack_psn_to_inject != 0) {
929+
neth->ack_psn = m_stale_ack_psn_to_inject;
930+
}
931+
return UCS_OK;
932+
}
933+
934+
static ucs_status_t capture_rx_ack_psn(uct_ud_ep_t *ep, uct_ud_neth_t *neth)
935+
{
936+
m_received_ack_psn = neth->ack_psn;
937+
return UCS_OK;
938+
}
939+
940+
static void set_stale_ack_psn(uct_ud_psn_t psn) {
941+
m_stale_ack_psn_to_inject = psn;
942+
}
943+
944+
static uct_ud_psn_t get_stale_ack_psn() {
945+
return m_stale_ack_psn_to_inject;
946+
}
947+
948+
static uct_ud_psn_t get_received_ack_psn() {
949+
return m_received_ack_psn;
950+
}
951+
952+
private:
953+
/* Stale ACK PSN to inject - simulates ACK from before endpoint reset */
954+
static volatile uct_ud_psn_t m_stale_ack_psn_to_inject;
955+
/* Captured ACK PSN from received packet */
956+
static volatile uct_ud_psn_t m_received_ack_psn;
957+
};
958+
959+
volatile uct_ud_psn_t test_ud_stale_ack::m_stale_ack_psn_to_inject = 0;
960+
volatile uct_ud_psn_t test_ud_stale_ack::m_received_ack_psn = 0;
961+
962+
UCS_TEST_SKIP_COND_P(test_ud_stale_ack, stale_ack_after_reset,
963+
!check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
964+
constexpr uct_ud_psn_t STALE_ACK_PSN = 135;
965+
constexpr uct_ud_psn_t WINDOW_SIZE = 1024;
966+
967+
disable_async(m_e1);
968+
disable_async(m_e2);
969+
connect();
970+
set_tx_win(m_e1, WINDOW_SIZE);
971+
set_tx_win(m_e2, WINDOW_SIZE);
972+
973+
uct_ud_ep_t *ud_ep1 = ep(m_e1);
974+
975+
/* Send some data to advance PSN */
976+
for (int i = 0; i < 5; i++) {
977+
EXPECT_UCS_OK(tx(m_e1));
978+
}
979+
flush();
980+
981+
/* Simulate endpoint reset on m_e1:
982+
* Reset PSN to low values as if endpoint was just created. */
983+
uct_ud_enter(iface(m_e1));
984+
ud_ep1->tx.psn = 3; /* Next PSN to send */
985+
ud_ep1->tx.acked_psn = 0; /* Last ACKed PSN */
986+
ucs_queue_head_init(&ud_ep1->tx.window); /* Clear TX window */
987+
uct_ud_leave(iface(m_e1));
988+
989+
/* Set up TX hook on m_e2 to inject stale ack_psn in outgoing packets.
990+
* This simulates a delayed/stale packet arriving after reset. */
991+
set_stale_ack_psn(STALE_ACK_PSN);
992+
ep(m_e2)->tx.tx_hook = test_ud_stale_ack::inject_stale_ack_psn;
993+
ep(m_e1)->rx.rx_hook = test_ud_stale_ack::capture_rx_ack_psn;
994+
995+
/* m_e2 sends a packet. Due to the hook, it will contain ack_psn=135. */
996+
EXPECT_UCS_OK(tx(m_e2));
997+
short_progress_loop();
998+
999+
/* Verify the hook actually injected the stale ack_psn */
1000+
EXPECT_EQ(STALE_ACK_PSN, get_received_ack_psn())
1001+
<< "Invalid value after the hook";
1002+
1003+
set_stale_ack_psn(0);
1004+
ep(m_e2)->tx.tx_hook = uct_ud_ep_null_hook;
1005+
ep(m_e1)->rx.rx_hook = uct_ud_ep_null_hook;
1006+
1007+
/* Verify endpoint state wasn't corrupted by the stale ACK */
1008+
EXPECT_EQ(0, ud_ep1->tx.acked_psn); /* Should NOT be STALE_ACK_PSN */
1009+
}
1010+
1011+
UCT_INSTANTIATE_UD_TEST_CASE(test_ud_stale_ack)
1012+
1013+
/* Test that simulates a stale EP reuse, when an EP with an existing dest_ep_id receives a CREQ with
1014+
* a different ep_id, the dest_ep_id is updated. */
1015+
UCS_TEST_SKIP_COND_P(test_ud, stale_dest_ep_id_update,
1016+
!check_caps(UCT_IFACE_FLAG_AM_SHORT)) {
1017+
constexpr uint32_t STALE_DEST_EP_ID = 0xBEEF;
1018+
constexpr uint32_t REMOTE_EP_ID = 1;
1019+
1020+
/* Create a dummy EP on m_e1 first, so the actual EP will have ep_id=1
1021+
* (not 0, which could be confused with NULL/default values) */
1022+
m_e1->create_ep(0);
1023+
1024+
/* Start connection from m_e1 to m_e2 - block m_e2's TX to delay CREP */
1025+
iface(m_e2)->tx.available = 0;
1026+
1027+
m_e1->connect_to_iface(1, *m_e2);
1028+
1029+
/* Let CREQ be received and passive EP created on m_e2 */
1030+
short_progress_loop();
1031+
1032+
/* m_e2 side: connect back, which will reuse the passive EP */
1033+
m_e2->connect_to_iface(0, *m_e1);
1034+
1035+
/* Now manually set a "stale" dest_ep_id on m_e2's EP to simulate
1036+
* the scenario where the peer had a different ep_id before reset */
1037+
ep(m_e2)->dest_ep_id = STALE_DEST_EP_ID;
1038+
1039+
/* Allow m_e2 to send - CREP will be sent, and any pending CREQs processed */
1040+
iface(m_e2)->tx.available = 128;
1041+
1042+
/* Wait for m_e2's dest_ep_id to be updated from the stale value */
1043+
wait_for_value(&ep(m_e2)->dest_ep_id, REMOTE_EP_ID, true,
1044+
TEST_UD_LINGER_TIMEOUT_IN_SEC);
1045+
1046+
EXPECT_EQ(REMOTE_EP_ID, ep(m_e1, REMOTE_EP_ID)->ep_id);
1047+
EXPECT_NE(UCT_UD_EP_NULL_ID, ep(m_e1, REMOTE_EP_ID)->dest_ep_id);
1048+
EXPECT_NE(STALE_DEST_EP_ID, ep(m_e2)->dest_ep_id)
1049+
<< "dest_ep_id should have been updated from stale value";
1050+
EXPECT_EQ(ep(m_e1, REMOTE_EP_ID)->ep_id, ep(m_e2)->dest_ep_id)
1051+
<< "dest_ep_id should match the remote EP's actual ep_id";
1052+
EXPECT_EQ(REMOTE_EP_ID, ep(m_e2)->dest_ep_id)
1053+
<< "dest_ep_id should be 1 (not 0)";
1054+
}
1055+
#endif
9231056

9241057
class test_ud_peer_failure : public ud_base_test {
9251058
public:

0 commit comments

Comments
 (0)