Skip to content

Commit 824afac

Browse files
committed
UCX common: add non-blocking compare-and-swap
Signed-off-by: Joseph Schuchart <[email protected]>
1 parent 5f786bc commit 824afac

File tree

2 files changed

+76
-15
lines changed

2 files changed

+76
-15
lines changed

opal/mca/common/ucx/common_ucx.h

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -206,22 +206,33 @@ int opal_common_ucx_atomic_cswap(ucp_ep_h ep, uint64_t compare,
206206
uint64_t remote_addr, ucp_rkey_h rkey,
207207
ucp_worker_h worker)
208208
{
209-
uint64_t tmp = value;
210-
int ret;
211-
212-
ret = opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, &tmp,
213-
op_size, remote_addr, rkey, worker);
214-
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
215-
/* in case if op_size is constant (like sizeof(type)) then this condition
216-
* is evaluated in compile time */
217-
if (op_size == sizeof(uint64_t)) {
218-
*(uint64_t*)result = tmp;
219-
} else {
220-
assert(op_size == sizeof(uint32_t));
221-
*(uint32_t*)result = tmp;
222-
}
209+
if (op_size == sizeof(uint64_t)) {
210+
*(uint64_t*)result = value;
211+
} else {
212+
assert(op_size == sizeof(uint32_t));
213+
*(uint32_t*)result = value;
223214
}
224-
return ret;
215+
216+
return opal_common_ucx_atomic_fetch(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result,
217+
op_size, remote_addr, rkey, worker);
218+
}
219+
220+
static inline
221+
ucs_status_ptr_t opal_common_ucx_atomic_cswap_nb(ucp_ep_h ep, uint64_t compare,
222+
uint64_t value, void *result, size_t op_size,
223+
uint64_t remote_addr, ucp_rkey_h rkey,
224+
ucp_send_callback_t req_handler,
225+
ucp_worker_h worker)
226+
{
227+
if (op_size == sizeof(uint64_t)) {
228+
*(uint64_t*)result = value;
229+
} else {
230+
assert(op_size == sizeof(uint32_t));
231+
*(uint32_t*)result = value;
232+
}
233+
234+
return opal_common_ucx_atomic_fetch_nb(ep, UCP_ATOMIC_FETCH_OP_CSWAP, compare, result,
235+
op_size, remote_addr, rkey, req_handler, worker);
225236
}
226237

227238
END_C_DECLS

opal/mca/common/ucx/common_ucx_wpool.h

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,56 @@ opal_common_ucx_wpmem_cmpswp(opal_common_ucx_wpmem_t *mem, uint64_t compare,
418418
return rc;
419419
}
420420

421+
422+
static inline int
423+
opal_common_ucx_wpmem_cmpswp_nb(opal_common_ucx_wpmem_t *mem, uint64_t compare,
424+
uint64_t value, int target, void *buffer, size_t len,
425+
uint64_t rem_addr,
426+
opal_common_ucx_user_req_handler_t user_req_cb,
427+
void *user_req_ptr)
428+
{
429+
ucp_ep_h ep;
430+
ucp_rkey_h rkey;
431+
opal_common_ucx_winfo_t *winfo = NULL;
432+
opal_common_ucx_request_t *req;
433+
int rc = OPAL_SUCCESS;
434+
435+
rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
436+
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
437+
MCA_COMMON_UCX_ERROR("opal_common_ucx_tlocal_fetch failed: %d", rc);
438+
return rc;
439+
}
440+
441+
/* Perform the operation */
442+
opal_mutex_lock(&winfo->mutex);
443+
req = opal_common_ucx_atomic_cswap_nb(ep, compare, value,
444+
buffer, len,
445+
rem_addr, rkey, opal_common_ucx_req_completion,
446+
winfo->worker);
447+
448+
if (UCS_PTR_IS_PTR(req)) {
449+
req->ext_req = user_req_ptr;
450+
req->ext_cb = user_req_cb;
451+
req->winfo = winfo;
452+
} else {
453+
if (user_req_cb != NULL) {
454+
(*user_req_cb)(user_req_ptr);
455+
}
456+
}
457+
458+
459+
rc = _periodical_flush_nb(mem, winfo, target);
460+
if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
461+
MCA_COMMON_UCX_VERBOSE(1, "_incr_and_check_inflight_ops failed: %d", rc);
462+
return rc;
463+
}
464+
465+
opal_mutex_unlock(&winfo->mutex);
466+
467+
return rc;
468+
}
469+
470+
421471
static inline int
422472
opal_common_ucx_wpmem_post(opal_common_ucx_wpmem_t *mem, ucp_atomic_post_op_t opcode,
423473
uint64_t value, int target, size_t len, uint64_t rem_addr)

0 commit comments

Comments
 (0)