Skip to content

Commit aa27b09

Browse files
authored
Merge pull request #10536 from devreal/osc-ucx-rputget-flushnb-main
osc/ucx: implement rput and rget using ucp_worker_flush_nb
2 parents 323db65 + eee891f commit aa27b09

File tree

3 files changed

+85
-23
lines changed

3 files changed

+85
-23
lines changed

ompi/mca/osc/ucx/osc_ucx_comm.c

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,21 +1139,28 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
11391139
return ret;
11401140
}
11411141

1142-
ret = opal_common_ucx_wpmem_fence(mem);
1143-
if (ret != OMPI_SUCCESS) {
1144-
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
1145-
return OMPI_ERROR;
1146-
}
1147-
11481142
mca_osc_ucx_component.num_incomplete_req_ops++;
1149-
/* TODO: investigate whether ucp_worker_flush_nb is a better choice here */
1150-
ret = opal_common_ucx_wpmem_fetch_nb(module->state_mem, UCP_ATOMIC_FETCH_OP_FADD,
1151-
0, target, &(module->req_result),
1152-
sizeof(uint64_t), remote_addr & (~0x7),
1153-
req_completion, ucx_req);
1143+
ret = opal_common_ucx_wpmem_flush_ep_nb(mem, target, req_completion, ucx_req);
1144+
11541145
if (ret != OMPI_SUCCESS) {
11551146
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
11561147
return ret;
1148+
1149+
/* fallback to using an atomic op to acquire a request handle */
1150+
ret = opal_common_ucx_wpmem_fence(mem);
1151+
if (ret != OMPI_SUCCESS) {
1152+
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
1153+
return OMPI_ERROR;
1154+
}
1155+
1156+
ret = opal_common_ucx_wpmem_fetch_nb(mem, UCP_ATOMIC_FETCH_OP_FADD,
1157+
0, target, &(module->req_result),
1158+
sizeof(uint64_t), remote_addr & (~0x7),
1159+
req_completion, ucx_req);
1160+
if (ret != OMPI_SUCCESS) {
1161+
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
1162+
return ret;
1163+
}
11571164
}
11581165

11591166
*request = &ucx_req->super;
@@ -1194,21 +1201,28 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
11941201
return ret;
11951202
}
11961203

1197-
ret = opal_common_ucx_wpmem_fence(mem);
1198-
if (ret != OMPI_SUCCESS) {
1199-
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
1200-
return OMPI_ERROR;
1201-
}
1202-
12031204
mca_osc_ucx_component.num_incomplete_req_ops++;
1204-
/* TODO: investigate whether ucp_worker_flush_nb is a better choice here */
1205-
ret = opal_common_ucx_wpmem_fetch_nb(module->state_mem, UCP_ATOMIC_FETCH_OP_FADD,
1206-
0, target, &(module->req_result),
1207-
sizeof(uint64_t), remote_addr & (~0x7),
1208-
req_completion, ucx_req);
1205+
ret = opal_common_ucx_wpmem_flush_ep_nb(mem, target, req_completion, ucx_req);
1206+
12091207
if (ret != OMPI_SUCCESS) {
12101208
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
12111209
return ret;
1210+
1211+
/* fallback to using an atomic op to acquire a request handle */
1212+
ret = opal_common_ucx_wpmem_fence(mem);
1213+
if (ret != OMPI_SUCCESS) {
1214+
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
1215+
return OMPI_ERROR;
1216+
}
1217+
1218+
ret = opal_common_ucx_wpmem_fetch_nb(mem, UCP_ATOMIC_FETCH_OP_FADD,
1219+
0, target, &(module->req_result),
1220+
sizeof(uint64_t), remote_addr & (~0x7),
1221+
req_completion, ucx_req);
1222+
if (ret != OMPI_SUCCESS) {
1223+
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
1224+
return ret;
1225+
}
12121226
}
12131227

12141228
*request = &ucx_req->super;

opal/mca/common/ucx/common_ucx_wpool.c

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, in
761761
((opal_common_ucx_request_t *) req)->winfo = winfo;
762762
}
763763

764-
if (OPAL_COMMON_UCX_FLUSH_B) {
764+
if (OPAL_COMMON_UCX_FLUSH_B == type) {
765765
rc = opal_common_ucx_wait_request_mt(req, "ucp_ep_flush_nb");
766766
} else {
767767
*req_ptr = req;
@@ -820,13 +820,57 @@ OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
820820
if (rc != OPAL_SUCCESS) {
821821
MCA_COMMON_UCX_ERROR("opal_common_ucx_flush failed: %d", rc);
822822
rc = OPAL_ERROR;
823+
break;
823824
}
824825
}
825826
opal_mutex_unlock(&ctx->mutex);
826827

827828
return rc;
828829
}
829830

831+
832+
OPAL_DECLSPEC int opal_common_ucx_wpmem_flush_ep_nb(opal_common_ucx_wpmem_t *mem,
833+
int target,
834+
opal_common_ucx_user_req_handler_t user_req_cb,
835+
void *user_req_ptr)
836+
{
837+
#if HAVE_DECL_UCP_EP_FLUSH_NB
838+
int rc = OPAL_SUCCESS;
839+
ucp_ep_h ep = NULL;
840+
ucp_rkey_h rkey = NULL;
841+
opal_common_ucx_winfo_t *winfo = NULL;
842+
843+
if (NULL == mem) {
844+
return OPAL_SUCCESS;
845+
}
846+
847+
rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
848+
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
849+
MCA_COMMON_UCX_ERROR("tlocal_fetch failed: %d", rc);
850+
return rc;
851+
}
852+
853+
opal_mutex_lock(&winfo->mutex);
854+
opal_common_ucx_request_t *req;
855+
req = ucp_worker_flush_nb(winfo->worker, 0, opal_common_ucx_req_completion);
856+
if (UCS_PTR_IS_PTR(req)) {
857+
req->ext_req = user_req_ptr;
858+
req->ext_cb = user_req_cb;
859+
req->winfo = winfo;
860+
} else {
861+
if (user_req_cb != NULL) {
862+
(*user_req_cb)(user_req_ptr);
863+
}
864+
}
865+
opal_mutex_unlock(&winfo->mutex);
866+
return rc;
867+
#else
868+
return OPAL_ERR_NOT_SUPPORTED;
869+
#endif // HAVE_DECL_UCP_EP_FLUSH_NB
870+
871+
}
872+
873+
830874
OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem)
831875
{
832876
ucs_status_t status = UCS_OK;

opal/mca/common/ucx/common_ucx_wpool.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,10 @@ OPAL_DECLSPEC void opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
247247

248248
OPAL_DECLSPEC int opal_common_ucx_wpmem_flush(opal_common_ucx_wpmem_t *mem,
249249
opal_common_ucx_flush_scope_t scope, int target);
250+
OPAL_DECLSPEC int opal_common_ucx_wpmem_flush_ep_nb(opal_common_ucx_wpmem_t *mem,
251+
int target,
252+
opal_common_ucx_user_req_handler_t user_req_cb,
253+
void *user_req_ptr);
250254
OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem);
251255

252256
OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, int target,

0 commit comments

Comments
 (0)