Skip to content

Commit 5a54327

Browse files
authored
UCP/PERF: Fixed races in progress (#10935)
* UCP/PERF: Fixed race in progress * UCP/GTEST: Added concurrent litmus test * UCP/GDAKI: Fixed race in FC calculation * UCP/GDAKI: CQ size = 2xTX_QP * UCP/GDAKI: Improved FC perf * UCP/GDAKI: Fixed race in progress/reservation with 2 counters * UCP/GTEST: Fixed test infra * UCP/GDA: Fixed uninitialized bug * UCP/GDA: Cleanup & minor changes * UCP/GTEST: Added stress test for multi * UCP/GDA: Fixed CQ overflow by validating the CQE after read * UCP/GTEST: Cosmetic changes * UCP: Addressed PR comments * UCP/GTEST: Reduce MAX_THREADS to 128, skip on valgrind * UCP: Single atomic read in progress * UCP/GDA: Sync 64 bits of wqe_base * UCP/GDAKI: Fix for uninit wqe_base * UCP/GTEST: Disable stress tests temporarily
1 parent 03898fe commit 5a54327

File tree

7 files changed

+244
-128
lines changed

7 files changed

+244
-128
lines changed

src/uct/ib/mlx5/gdaki/gdaki.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,16 @@ static UCS_CLASS_INIT_FUNC(uct_rc_gdaki_ep_t, const uct_ep_params_t *params)
176176
dev_ep.sq_num = self->qp.super.qp_num;
177177
dev_ep.sq_wqe_daddr = UCS_PTR_BYTE_OFFSET(self->ep_gpu,
178178
qp_attr.umem_offset);
179-
dev_ep.sq_wqe_num = qp_attr.max_tx;
180-
dev_ep.sq_dbrec = &self->ep_gpu->qp_dbrec[MLX5_SND_DBR];
179+
dev_ep.sq_rsvd_index = 0;
180+
dev_ep.sq_ready_index = 0;
181+
dev_ep.sq_wqe_pi = 0;
182+
dev_ep.sq_wqe_num = qp_attr.max_tx;
183+
/* FC mask is used to determine if WQE should be posted with completion.
184+
* qp_attr.max_tx must be a power of 2. */
185+
dev_ep.sq_fc_mask = (qp_attr.max_tx >> 1) - 1;
186+
dev_ep.avail_count = qp_attr.max_tx;
187+
dev_ep.sq_dbrec = &self->ep_gpu->qp_dbrec[MLX5_SND_DBR];
188+
181189
dev_ep.cqe_daddr = UCS_PTR_BYTE_OFFSET(self->ep_gpu, cq_attr.umem_offset);
182190
dev_ep.cqe_num = cq_attr.cq_size;
183191
dev_ep.sq_db = self->sq_db;

src/uct/ib/mlx5/gdaki/gdaki.cuh

Lines changed: 60 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -93,71 +93,33 @@ template<ucs_device_level_t level> UCS_F_DEVICE void uct_rc_mlx5_gda_sync(void)
9393
}
9494
}
9595

96-
UCS_F_DEVICE uint64_t uct_rc_mlx5_gda_max_alloc_wqe_base(
97-
uct_rc_gdaki_dev_ep_t *ep, unsigned count)
96+
UCS_F_DEVICE uint64_t
97+
uct_rc_mlx5_gda_reserv_wqe_thread(uct_rc_gdaki_dev_ep_t *ep, unsigned count)
9898
{
99-
/* TODO optimize by including sq_wqe_num in qp->sq_wqe_pi and updating it
100-
when processing a new completion */
101-
uint64_t pi = doca_gpu_dev_verbs_atomic_read<uint64_t,
102-
DOCA_GPUNETIO_VERBS_RESOURCE_SHARING_MODE_GPU>(&ep->sq_wqe_pi);
103-
return pi + ep->sq_wqe_num - count;
104-
}
105-
106-
UCS_F_DEVICE uint64_t uct_rc_mlx5_gda_reserv_wqe_thread(
107-
uct_rc_gdaki_dev_ep_t *ep, unsigned count)
108-
{
109-
/* Do not attempt to reserve if the available space is less than the
110-
* requested count, to avoid starvation of threads trying to rollback the
111-
* reservation with atomicCAS. */
112-
uint64_t max_wqe_base = uct_rc_mlx5_gda_max_alloc_wqe_base(ep, count);
113-
if (ep->sq_rsvd_index > max_wqe_base) {
99+
/* Try to reserve optimistically */
100+
int32_t prev = atomicAdd(&ep->avail_count, -(int32_t)count);
101+
if (prev < (int32_t)count) {
102+
/* Rollback */
103+
atomicAdd(&ep->avail_count, count);
114104
return UCT_RC_GDA_RESV_WQE_NO_RESOURCE;
115105
}
116106

117-
uint64_t wqe_base = atomicAdd(reinterpret_cast<unsigned long long*>(
118-
&ep->sq_rsvd_index),
119-
static_cast<unsigned long long>(count));
120-
121-
/*
122-
* Attempt to reserve 'count' WQEs by atomically incrementing the reserved
123-
* index. If the reservation exceeds the available space in the work queue,
124-
* enter a rollback loop.
125-
*
126-
* Rollback Logic:
127-
* - Calculate the next potential index (wqe_next) after attempting the
128-
* reservation.
129-
* - Use atomic CAS to check if the current reserved index matches wqe_next.
130-
* If it does, revert the reservation by resetting the reserved index to
131-
* wqe_base.
132-
* - A successful CAS indicates no other thread has modified the reserved
133-
* index, allowing the rollback to complete, and the function returns
134-
* UCT_RC_GDA_RESV_WQE_NO_RESOURCE to signal insufficient resources.
135-
* - If CAS fails, it means another thread has modified the reserved index.
136-
* The loop continues to reevaluate resource availability to determine if
137-
* the reservation can now be satisfied, possibly due to other operations
138-
* freeing up resources.
139-
*/
140-
while (wqe_base > max_wqe_base) {
141-
uint64_t wqe_next = wqe_base + count;
142-
if (atomicCAS(reinterpret_cast<unsigned long long*>(&ep->sq_rsvd_index),
143-
wqe_next, wqe_base) == wqe_next) {
144-
return UCT_RC_GDA_RESV_WQE_NO_RESOURCE;
145-
}
146-
147-
max_wqe_base = uct_rc_mlx5_gda_max_alloc_wqe_base(ep, count);
148-
}
149-
150-
return wqe_base;
107+
/* We own count elements, now can safely increment the reserved index */
108+
return atomicAdd(reinterpret_cast<unsigned long long*>(&ep->sq_rsvd_index),
109+
count);
151110
}
152111

153112
template<ucs_device_level_t level>
154113
UCS_F_DEVICE void
155114
uct_rc_mlx5_gda_reserv_wqe(uct_rc_gdaki_dev_ep_t *ep, unsigned count,
156115
unsigned lane_id, uint64_t &wqe_base)
157116
{
117+
wqe_base = 0;
118+
158119
if (lane_id == 0) {
159120
wqe_base = uct_rc_mlx5_gda_reserv_wqe_thread(ep, count);
160121
}
122+
161123
if (level == UCS_DEVICE_LEVEL_WARP) {
162124
wqe_base = __shfl_sync(0xffffffff, wqe_base, 0);
163125
} else if (level == UCS_DEVICE_LEVEL_BLOCK) {
@@ -231,6 +193,12 @@ UCS_F_DEVICE void uct_rc_mlx5_gda_db(uct_rc_gdaki_dev_ep_t *ep,
231193
&ep->sq_lock);
232194
}
233195

196+
UCS_F_DEVICE bool
197+
uct_rc_mlx5_gda_fc(const uct_rc_gdaki_dev_ep_t *ep, uint16_t wqe_idx)
198+
{
199+
return (wqe_idx & ep->sq_fc_mask) == 1;
200+
}
201+
234202
template<ucs_device_level_t level>
235203
UCS_F_DEVICE ucs_status_t uct_rc_mlx5_gda_ep_single(
236204
uct_rc_gdaki_dev_ep_t *ep, const uct_device_mem_element_t *tl_mem_elem,
@@ -240,23 +208,20 @@ UCS_F_DEVICE ucs_status_t uct_rc_mlx5_gda_ep_single(
240208
uint64_t add)
241209
{
242210
uct_rc_gda_completion_t *comp = &tl_comp->rc_gda;
243-
unsigned cflag = 0;
211+
unsigned cflag = 0;
244212
uint64_t wqe_base;
245-
uint64_t wqe_idx;
246213
unsigned lane_id;
247214
unsigned num_lanes;
248-
uint32_t fc;
249215

250216
uct_rc_mlx5_gda_exec_init<level>(lane_id, num_lanes);
251217
uct_rc_mlx5_gda_reserv_wqe<level>(ep, 1, lane_id, wqe_base);
252218
if (wqe_base == UCT_RC_GDA_RESV_WQE_NO_RESOURCE) {
253219
return UCS_ERR_NO_RESOURCE;
254220
}
255221

256-
fc = doca_gpu_dev_verbs_wqe_idx_inc_mask(ep->sq_wqe_pi, ep->sq_wqe_num / 2);
257-
wqe_idx = wqe_base & 0xffff;
258222
if (lane_id == 0) {
259-
if ((comp != nullptr) || (wqe_idx == fc)) {
223+
uint16_t wqe_idx = (uint16_t)wqe_base;
224+
if ((comp != nullptr) || uct_rc_mlx5_gda_fc(ep, wqe_idx)) {
260225
cflag = DOCA_GPUNETIO_MLX5_WQE_CTRL_CQ_UPDATE;
261226
if (comp != nullptr) {
262227
comp->wqe_idx = wqe_base;
@@ -325,15 +290,13 @@ UCS_F_DEVICE ucs_status_t uct_rc_mlx5_gda_ep_put_multi(
325290
auto mem_list = reinterpret_cast<const uct_rc_gdaki_device_mem_element_t*>(
326291
tl_mem_list);
327292
uct_rc_gda_completion_t *comp = &tl_comp->rc_gda;
328-
329-
int count = mem_list_count;
330-
int counter_index = count - 1;
331-
bool atomic = false;
293+
int count = mem_list_count;
294+
int counter_index = count - 1;
295+
bool atomic = false;
332296
uint64_t wqe_idx;
333297
unsigned cflag;
334298
unsigned lane_id;
335299
unsigned num_lanes;
336-
uint32_t fc;
337300
uint64_t wqe_base;
338301
size_t length;
339302
void *address;
@@ -357,7 +320,6 @@ UCS_F_DEVICE ucs_status_t uct_rc_mlx5_gda_ep_put_multi(
357320
return UCS_ERR_NO_RESOURCE;
358321
}
359322

360-
fc = doca_gpu_dev_verbs_wqe_idx_inc_mask(ep->sq_wqe_pi, ep->sq_wqe_num / 2);
361323
wqe_idx = doca_gpu_dev_verbs_wqe_idx_inc_mask(wqe_base, lane_id);
362324
for (uint32_t i = lane_id; i < count; i += num_lanes) {
363325
if (i == counter_index) {
@@ -379,7 +341,7 @@ UCS_F_DEVICE ucs_status_t uct_rc_mlx5_gda_ep_put_multi(
379341

380342
cflag = 0;
381343
if (((comp != nullptr) && (i == count - 1)) ||
382-
((comp == nullptr) && (wqe_idx == fc))) {
344+
((comp == nullptr) && uct_rc_mlx5_gda_fc(ep, wqe_idx))) {
383345
cflag = DOCA_GPUNETIO_MLX5_WQE_CTRL_CQ_UPDATE;
384346
if (comp != nullptr) {
385347
comp->wqe_idx = wqe_base;
@@ -420,13 +382,12 @@ UCS_F_DEVICE ucs_status_t uct_rc_mlx5_gda_ep_put_multi_partial(
420382
auto mem_list = reinterpret_cast<const uct_rc_gdaki_device_mem_element_t*>(
421383
tl_mem_list);
422384
uct_rc_gda_completion_t *comp = &tl_comp->rc_gda;
423-
unsigned count = mem_list_count;
424-
bool atomic = false;
385+
unsigned count = mem_list_count;
386+
bool atomic = false;
425387
uint64_t wqe_idx;
426388
unsigned lane_id;
427389
unsigned num_lanes;
428390
unsigned cflag;
429-
uint32_t fc;
430391
uint64_t wqe_base;
431392
size_t length;
432393
void *address;
@@ -451,7 +412,6 @@ UCS_F_DEVICE ucs_status_t uct_rc_mlx5_gda_ep_put_multi_partial(
451412
return UCS_ERR_NO_RESOURCE;
452413
}
453414

454-
fc = doca_gpu_dev_verbs_wqe_idx_inc_mask(ep->sq_wqe_pi, ep->sq_wqe_num / 2);
455415
wqe_idx = doca_gpu_dev_verbs_wqe_idx_inc_mask(wqe_base, lane_id);
456416
for (uint32_t i = lane_id; i < count; i += num_lanes) {
457417
if (i == mem_list_count) {
@@ -475,7 +435,7 @@ UCS_F_DEVICE ucs_status_t uct_rc_mlx5_gda_ep_put_multi_partial(
475435

476436
cflag = 0;
477437
if (((comp != nullptr) && (i == count - 1)) ||
478-
((comp == nullptr) && (wqe_idx == fc))) {
438+
((comp == nullptr) && uct_rc_mlx5_gda_fc(ep, wqe_idx))) {
479439
cflag = DOCA_GPUNETIO_MLX5_WQE_CTRL_CQ_UPDATE;
480440
if (comp != nullptr) {
481441
comp->wqe_idx = wqe_base;
@@ -502,20 +462,6 @@ UCS_F_DEVICE ucs_status_t uct_rc_mlx5_gda_ep_put_multi_partial(
502462
return UCS_INPROGRESS;
503463
}
504464

505-
UCS_F_DEVICE uint16_t uct_rc_mlx5_gda_bswap16(uint16_t x)
506-
{
507-
uint32_t ret;
508-
asm volatile("{\n\t"
509-
".reg .b32 mask;\n\t"
510-
".reg .b32 ign;\n\t"
511-
"mov.b32 mask, 0x1;\n\t"
512-
"prmt.b32 %0, %1, ign, mask;\n\t"
513-
"}"
514-
: "=r"(ret)
515-
: "r"((uint32_t)x));
516-
return ret;
517-
}
518-
519465
UCS_F_DEVICE void
520466
uct_rc_mlx5_gda_qedump(const char *pfx, void *buff, ssize_t len)
521467
{
@@ -535,16 +481,17 @@ uct_rc_mlx5_gda_qedump(const char *pfx, void *buff, ssize_t len)
535481

536482
UCS_F_DEVICE void uct_rc_mlx5_gda_progress_thread(uct_rc_gdaki_dev_ep_t *ep)
537483
{
538-
void *cqe = ep->cqe_daddr;
539-
size_t cqe_num = ep->cqe_num;
540-
uint64_t cqe_idx = ep->cqe_ci;
541-
const size_t cqe_sz = DOCA_GPUNETIO_VERBS_CQE_SIZE;
542-
uint32_t idx = cqe_idx & (cqe_num - 1);
543-
void *curr_cqe = (uint8_t*)cqe + idx * cqe_sz;
544-
auto *cqe64 = reinterpret_cast<mlx5_cqe64*>(curr_cqe);
545-
uint8_t op_owner;
546-
547-
op_owner = READ_ONCE(cqe64->op_own);
484+
void *cqe = ep->cqe_daddr;
485+
size_t cqe_num = ep->cqe_num;
486+
uint64_t cqe_idx = ep->cqe_ci;
487+
uint32_t idx = cqe_idx & (cqe_num - 1);
488+
void *curr_cqe = (uint8_t*)cqe + (idx * DOCA_GPUNETIO_VERBS_CQE_SIZE);
489+
auto *cqe64 = reinterpret_cast<mlx5_cqe64*>(curr_cqe);
490+
491+
/* Read last 3 fields with a single atomic operation */
492+
uint32_t *data_ptr = (uint32_t *)&cqe64->wqe_counter;
493+
uint32_t data = READ_ONCE(*data_ptr);
494+
uint8_t op_owner = data >> 24;
548495
if ((op_owner & MLX5_CQE_OWNER_MASK) ^ !!(cqe_idx & cqe_num)) {
549496
return;
550497
}
@@ -555,16 +502,30 @@ UCS_F_DEVICE void uct_rc_mlx5_gda_progress_thread(uct_rc_gdaki_dev_ep_t *ep)
555502
return;
556503
}
557504

558-
uint8_t opcode = op_owner >> DOCA_GPUNETIO_VERBS_MLX5_CQE_OPCODE_SHIFT;
559-
uint16_t wqe_cnt = uct_rc_mlx5_gda_bswap16(cqe64->wqe_counter);
560-
uint16_t wqe_idx = wqe_cnt & (ep->sq_wqe_num - 1);
505+
uint8_t opcode = op_owner >> DOCA_GPUNETIO_VERBS_MLX5_CQE_OPCODE_SHIFT;
506+
uint32_t data_cpu = doca_gpu_dev_verbs_bswap32(data);
507+
uint16_t wqe_cnt = (data_cpu >> 16) & 0xffff;
508+
uint16_t wqe_idx = wqe_cnt & (ep->sq_wqe_num - 1);
561509

562510
cuda::atomic_ref<uint64_t, cuda::thread_scope_device> pi_ref(ep->sq_wqe_pi);
563-
uint64_t sq_wqe_pi = ep->sq_wqe_pi;
564-
sq_wqe_pi = ((wqe_cnt - sq_wqe_pi) & 0xffff) + sq_wqe_pi + 1;
511+
uint64_t sq_wqe_pi = pi_ref.load(cuda::std::memory_order_relaxed);
512+
uint64_t new_wqe_pi;
513+
514+
do {
515+
/* Skip CQE if it's older than current producer index, could be already
516+
* processed by another thread. This handles CQE wrap-around. */
517+
if ((int16_t)(wqe_cnt - (uint16_t)sq_wqe_pi) < 0) {
518+
return;
519+
}
520+
521+
uint16_t completed_delta = wqe_cnt - (uint16_t)sq_wqe_pi;
522+
new_wqe_pi = sq_wqe_pi + completed_delta + 1;
523+
} while (!pi_ref.compare_exchange_weak(sq_wqe_pi, new_wqe_pi,
524+
cuda::std::memory_order_release,
525+
cuda::std::memory_order_relaxed));
565526

566527
if (opcode == MLX5_CQE_REQ) {
567-
pi_ref.fetch_max(sq_wqe_pi);
528+
atomicAdd(&ep->avail_count, (int32_t)(new_wqe_pi - sq_wqe_pi));
568529
return;
569530
}
570531

src/uct/ib/mlx5/gdaki/gdaki_dev.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ typedef struct {
3030
uint32_t cqe_num;
3131
uint16_t sq_wqe_num;
3232
uint32_t sq_num;
33+
uint16_t sq_fc_mask;
34+
int32_t avail_count;
3335
} uct_rc_gdaki_dev_ep_t;
3436

3537

test/gtest/common/cuda.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ template<typename T> class device_result_ptr {
3636
return *m_ptr;
3737
}
3838

39+
T *operator->() const
40+
{
41+
return m_ptr.get();
42+
}
43+
3944
T *device_ptr()
4045
{
4146
T *device_ptr;

0 commit comments

Comments
 (0)