@@ -93,29 +93,99 @@ template<ucs_device_level_t level> UCS_F_DEVICE void uct_rc_mlx5_gda_sync(void)
9393 }
9494}
9595
96- UCS_F_DEVICE uint64_t
97- uct_rc_mlx5_gda_reserv_wqe_thread (uct_rc_gdaki_dev_ep_t *ep, unsigned count)
96+ UCS_F_DEVICE uint16_t uct_rc_mlx5_gda_bswap16 (uint16_t x)
9897{
99- /* Try to reserve optimistically */
100- int32_t prev = atomicAdd (&ep->avail_count , -(int32_t )count);
101- if (prev < (int32_t )count) {
102- /* Rollback */
103- atomicAdd (&ep->avail_count , count);
98+ uint32_t ret;
99+ asm volatile (" {\n\t "
100+ " .reg .b32 mask;\n\t "
101+ " .reg .b32 ign;\n\t "
102+ " mov.b32 mask, 0x1;\n\t "
103+ " prmt.b32 %0, %1, ign, mask;\n\t "
104+ " }"
105+ : " =r" (ret)
106+ : " r" ((uint32_t )x));
107+ return ret;
108+ }
109+
110+ UCS_F_DEVICE uint64_t uct_rc_mlx5_gda_parse_cqe (uct_rc_gdaki_dev_ep_t *ep,
111+ uint16_t *wqe_cnt,
112+ uint8_t *opcode)
113+ {
114+ auto *cqe64 = reinterpret_cast <mlx5_cqe64*>(ep->cqe_daddr );
115+ uint32_t *data_ptr = (uint32_t *)&cqe64->wqe_counter ;
116+ uint32_t data = READ_ONCE (*data_ptr);
117+ uint64_t rsvd_idx = READ_ONCE (ep->sq_rsvd_index );
118+
119+ *wqe_cnt = uct_rc_mlx5_gda_bswap16 (data);
120+ if (opcode != nullptr ) {
121+ *opcode = data >> 28 ;
122+ }
123+
124+ return rsvd_idx - ((rsvd_idx - *wqe_cnt) & 0xffff );
125+ }
126+
127+ UCS_F_DEVICE uint64_t uct_rc_mlx5_gda_max_alloc_wqe_base (
128+ uct_rc_gdaki_dev_ep_t *ep, unsigned count)
129+ {
130+ uint16_t wqe_cnt;
131+ uint64_t pi;
132+
133+ pi = uct_rc_mlx5_gda_parse_cqe (ep, &wqe_cnt, nullptr );
134+ return pi + ep->sq_wqe_num + 1 - count;
135+ }
136+
137+ UCS_F_DEVICE uint64_t uct_rc_mlx5_gda_reserv_wqe_thread (
138+ uct_rc_gdaki_dev_ep_t *ep, unsigned count)
139+ {
140+ /* Do not attempt to reserve if the available space is less than the
141+ * requested count, to avoid starvation of threads trying to rollback the
142+ * reservation with atomicCAS. */
143+ uint64_t max_wqe_base = uct_rc_mlx5_gda_max_alloc_wqe_base (ep, count);
144+ if (ep->sq_rsvd_index > max_wqe_base) {
104145 return UCT_RC_GDA_RESV_WQE_NO_RESOURCE;
105146 }
106147
107- /* We own count elements, now can safely increment the reserved index */
108- return atomicAdd (reinterpret_cast <unsigned long long *>(&ep->sq_rsvd_index ),
109- count);
148+ uint64_t wqe_base = atomicAdd (reinterpret_cast <unsigned long long *>(
149+ &ep->sq_rsvd_index ),
150+ static_cast <unsigned long long >(count));
151+
152+ /*
153+ * Attempt to reserve 'count' WQEs by atomically incrementing the reserved
154+ * index. If the reservation exceeds the available space in the work queue,
155+ * enter a rollback loop.
156+ *
157+ * Rollback Logic:
158+ * - Calculate the next potential index (wqe_next) after attempting the
159+ * reservation.
160+ * - Use atomic CAS to check if the current reserved index matches wqe_next.
161+ * If it does, revert the reservation by resetting the reserved index to
162+ * wqe_base.
163+ * - A successful CAS indicates no other thread has modified the reserved
164+ * index, allowing the rollback to complete, and the function returns
165+ * UCT_RC_GDA_RESV_WQE_NO_RESOURCE to signal insufficient resources.
166+ * - If CAS fails, it means another thread has modified the reserved index.
167+ * The loop continues to reevaluate resource availability to determine if
168+ * the reservation can now be satisfied, possibly due to other operations
169+ * freeing up resources.
170+ */
171+ while (wqe_base > max_wqe_base) {
172+ uint64_t wqe_next = wqe_base + count;
173+ if (atomicCAS (reinterpret_cast <unsigned long long *>(&ep->sq_rsvd_index ),
174+ wqe_next, wqe_base) == wqe_next) {
175+ return UCT_RC_GDA_RESV_WQE_NO_RESOURCE;
176+ }
177+
178+ max_wqe_base = uct_rc_mlx5_gda_max_alloc_wqe_base (ep, count);
179+ }
180+
181+ return wqe_base;
110182}
111183
112184template <ucs_device_level_t level>
113185UCS_F_DEVICE void
114186uct_rc_mlx5_gda_reserv_wqe (uct_rc_gdaki_dev_ep_t *ep, unsigned count,
115187 unsigned lane_id, uint64_t &wqe_base)
116188{
117- wqe_base = 0 ;
118-
119189 if (lane_id == 0 ) {
120190 wqe_base = uct_rc_mlx5_gda_reserv_wqe_thread (ep, count);
121191 }
@@ -211,7 +281,7 @@ UCS_F_DEVICE void uct_rc_mlx5_gda_db(uct_rc_gdaki_dev_ep_t *ep,
211281UCS_F_DEVICE bool
212282uct_rc_mlx5_gda_fc (const uct_rc_gdaki_dev_ep_t *ep, uint16_t wqe_idx)
213283{
214- return (wqe_idx & ep->sq_fc_mask ) == 1 ;
284+ return ! (wqe_idx & ep->sq_fc_mask );
215285}
216286
217287template <ucs_device_level_t level>
@@ -494,82 +564,9 @@ uct_rc_mlx5_gda_qedump(const char *pfx, void *buff, ssize_t len)
494564 }
495565}
496566
497- UCS_F_DEVICE void uct_rc_mlx5_gda_progress_thread (uct_rc_gdaki_dev_ep_t *ep)
498- {
499- void *cqe = ep->cqe_daddr ;
500- size_t cqe_num = ep->cqe_num ;
501- uint64_t cqe_idx = ep->cqe_ci ;
502- uint32_t idx = cqe_idx & (cqe_num - 1 );
503- void *curr_cqe = (uint8_t *)cqe + (idx * DOCA_GPUNETIO_VERBS_CQE_SIZE);
504- auto *cqe64 = reinterpret_cast <mlx5_cqe64*>(curr_cqe);
505-
506- /* Read last 3 fields with a single atomic operation */
507- uint32_t *data_ptr = (uint32_t *)&cqe64->wqe_counter ;
508- uint32_t data = READ_ONCE (*data_ptr);
509- uint8_t op_owner = data >> 24 ;
510- if ((op_owner & MLX5_CQE_OWNER_MASK) ^ !!(cqe_idx & cqe_num)) {
511- return ;
512- }
513-
514- cuda::atomic_ref<uint64_t , cuda::thread_scope_device> ref (ep->cqe_ci );
515- if (!ref.compare_exchange_strong (cqe_idx, cqe_idx + 1 ,
516- cuda::std::memory_order_relaxed)) {
517- return ;
518- }
519-
520- uint8_t opcode = op_owner >> DOCA_GPUNETIO_VERBS_MLX5_CQE_OPCODE_SHIFT;
521- uint32_t data_cpu = doca_gpu_dev_verbs_bswap32 (data);
522- uint16_t wqe_cnt = (data_cpu >> 16 ) & 0xffff ;
523- uint16_t wqe_idx = wqe_cnt & (ep->sq_wqe_num - 1 );
524-
525- cuda::atomic_ref<uint64_t , cuda::thread_scope_device> pi_ref (ep->sq_wqe_pi );
526- uint64_t sq_wqe_pi = pi_ref.load (cuda::std::memory_order_relaxed);
527- uint64_t new_wqe_pi;
528-
529- do {
530- /* Skip CQE if it's older than current producer index, could be already
531- * processed by another thread. This handles CQE wrap-around. */
532- if ((int16_t )(wqe_cnt - (uint16_t )sq_wqe_pi) < 0 ) {
533- return ;
534- }
535-
536- uint16_t completed_delta = wqe_cnt - (uint16_t )sq_wqe_pi;
537- new_wqe_pi = sq_wqe_pi + completed_delta + 1 ;
538- } while (!pi_ref.compare_exchange_weak (sq_wqe_pi, new_wqe_pi,
539- cuda::std::memory_order_release,
540- cuda::std::memory_order_relaxed));
541-
542- if (opcode == MLX5_CQE_REQ) {
543- atomicAdd (&ep->avail_count , (int32_t )(new_wqe_pi - sq_wqe_pi));
544- return ;
545- }
546-
547- auto err_cqe = reinterpret_cast <mlx5_err_cqe_ex*>(cqe64);
548- auto wqe_ptr = uct_rc_mlx5_gda_get_wqe_ptr (ep, wqe_idx);
549- ucs_device_error (" CQE[%d] with syndrome:%x vendor:%x hw:%x "
550- " wqe_idx:0x%x qp:0x%x" ,
551- idx, err_cqe->syndrome , err_cqe->vendor_err_synd ,
552- err_cqe->hw_err_synd , wqe_idx,
553- doca_gpu_dev_verbs_bswap32 (err_cqe->s_wqe_opcode_qpn ) &
554- 0xffffff );
555- uct_rc_mlx5_gda_qedump (" WQE" , wqe_ptr, 64 );
556- uct_rc_mlx5_gda_qedump (" CQE" , cqe64, 64 );
557- pi_ref.fetch_max (sq_wqe_pi | UCT_RC_GDA_WQE_ERR);
558- }
559-
560567template <ucs_device_level_t level>
561568UCS_F_DEVICE void uct_rc_mlx5_gda_ep_progress (uct_device_ep_h tl_ep)
562569{
563- uct_rc_gdaki_dev_ep_t *ep = (uct_rc_gdaki_dev_ep_t *)tl_ep;
564- unsigned num_lanes;
565- unsigned lane_id;
566-
567- uct_rc_mlx5_gda_exec_init<level>(lane_id, num_lanes);
568- if (lane_id == 0 ) {
569- uct_rc_mlx5_gda_progress_thread (ep);
570- }
571-
572- uct_rc_mlx5_gda_sync<level>();
573570}
574571
575572template <ucs_device_level_t level>
@@ -578,13 +575,21 @@ UCS_F_DEVICE ucs_status_t uct_rc_mlx5_gda_ep_check_completion(
578575{
579576 uct_rc_gdaki_dev_ep_t *ep = reinterpret_cast <uct_rc_gdaki_dev_ep_t *>(tl_ep);
580577 uct_rc_gda_completion_t *comp = &tl_comp->rc_gda ;
581- uint64_t sq_wqe_pi = ep->sq_wqe_pi ;
578+ uint16_t wqe_cnt;
579+ uint8_t opcode;
580+ uint64_t pi;
581+
582+ pi = uct_rc_mlx5_gda_parse_cqe (ep, &wqe_cnt, &opcode);
582583
583- if ((sq_wqe_pi & UCT_RC_GDA_WQE_MASK) <= comp->wqe_idx ) {
584+ if (pi < comp->wqe_idx ) {
584585 return UCS_INPROGRESS;
585586 }
586587
587- if (sq_wqe_pi & UCT_RC_GDA_WQE_ERR) {
588+ if (opcode == MLX5_CQE_REQ_ERR) {
589+ uint16_t wqe_idx = wqe_cnt & (ep->sq_wqe_num - 1 );
590+ auto wqe_ptr = uct_rc_mlx5_gda_get_wqe_ptr (ep, wqe_idx);
591+ uct_rc_mlx5_gda_qedump (" WQE" , wqe_ptr, 64 );
592+ uct_rc_mlx5_gda_qedump (" CQE" , ep->cqe_daddr , 64 );
588593 return UCS_ERR_IO_ERROR;
589594 }
590595
0 commit comments