Skip to content

Commit 04e0ae8

Browse files
committed
Merge pull request open-mpi#765 from yosefe/topic/pml-ucx-implement-cancel
pml_ucx: implement cancel, and add small optimizations.
2 parents 341e3ae + b710770 commit 04e0ae8

File tree

3 files changed

+45
-9
lines changed

3 files changed

+45
-9
lines changed

ompi/mca/pml/ucx/pml_ucx.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ int mca_pml_ucx_recv(void *buf, size_t count, ompi_datatype_t *datatype, int src
386386
return OMPI_ERROR;
387387
}
388388

389+
ucp_worker_progress(ompi_pml_ucx.ucp_worker);
389390
while (!req->req_complete) {
390391
opal_progress();
391392
}
@@ -492,10 +493,11 @@ int mca_pml_ucx_send(const void *buf, size_t count, ompi_datatype_t *datatype, i
492493
mca_pml_ucx_get_datatype(datatype),
493494
PML_UCX_MAKE_SEND_TAG(tag, comm),
494495
mca_pml_ucx_send_completion);
495-
if (req == NULL) {
496+
if (OPAL_LIKELY(req == NULL)) {
496497
return OMPI_SUCCESS;
497498
} else if (!UCS_PTR_IS_ERR(req)) {
498499
PML_UCX_VERBOSE(8, "got request %p", (void*)req);
500+
ucp_worker_progress(ompi_pml_ucx.ucp_worker);
499501
ompi_request_wait(&req, MPI_STATUS_IGNORE);
500502
return OMPI_SUCCESS;
501503
} else {
@@ -698,6 +700,7 @@ int mca_pml_ucx_start(size_t count, ompi_request_t** requests)
698700
PML_UCX_VERBOSE(8, "temporary request %p will complete persistent request %p",
699701
(void*)tmp_req, (void*)preq);
700702
tmp_req->req_complete_cb_data = preq;
703+
preq->tmp_req = tmp_req;
701704
}
702705
OPAL_THREAD_UNLOCK(&ompi_request_lock);
703706
} else {

ompi/mca/pml/ucx/pml_ucx_request.c

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ static int mca_pml_ucx_request_free(ompi_request_t **rptr)
2525
return OMPI_SUCCESS;
2626
}
2727

28+
static int mca_pml_ucx_request_cancel(ompi_request_t *req, int flag)
29+
{
30+
ucp_request_cancel(ompi_pml_ucx.ucp_worker, req);
31+
return OMPI_SUCCESS;
32+
}
33+
2834
void mca_pml_ucx_send_completion(void *request, ucs_status_t status)
2935
{
3036
ompi_request_t *req = request;
@@ -55,12 +61,19 @@ void mca_pml_ucx_recv_completion(void *request, ucs_status_t status,
5561
OPAL_THREAD_UNLOCK(&ompi_request_lock);
5662
}
5763

58-
void mca_pml_ucx_persistent_requset_complete(mca_pml_ucx_persistent_request_t *preq,
64+
static void mca_pml_ucx_persistent_request_detach(mca_pml_ucx_persistent_request_t *preq,
65+
ompi_request_t *tmp_req)
66+
{
67+
tmp_req->req_complete_cb_data = NULL;
68+
preq->tmp_req = NULL;
69+
}
70+
71+
void mca_pml_ucx_persistent_request_complete(mca_pml_ucx_persistent_request_t *preq,
5972
ompi_request_t *tmp_req)
6073
{
6174
preq->ompi.req_status = tmp_req->req_status;
6275
ompi_request_complete(&preq->ompi, true);
63-
tmp_req->req_complete_cb_data = NULL;
76+
mca_pml_ucx_persistent_request_detach(preq, tmp_req);
6477
mca_pml_ucx_request_reset(tmp_req);
6578
ucp_request_release(tmp_req);
6679
}
@@ -73,7 +86,8 @@ static inline void mca_pml_ucx_preq_completion(ompi_request_t *tmp_req)
7386
ompi_request_complete(tmp_req, false);
7487
preq = (mca_pml_ucx_persistent_request_t*)tmp_req->req_complete_cb_data;
7588
if (preq != NULL) {
76-
mca_pml_ucx_persistent_requset_complete(preq, tmp_req);
89+
PML_UCX_ASSERT(preq->tmp_req != NULL);
90+
mca_pml_ucx_persistent_request_complete(preq, tmp_req);
7791
}
7892
OPAL_THREAD_UNLOCK(&ompi_request_lock);
7993
}
@@ -120,7 +134,8 @@ void mca_pml_ucx_request_init(void *request)
120134
ompi_request_t* ompi_req = request;
121135
OBJ_CONSTRUCT(ompi_req, ompi_request_t);
122136
mca_pml_ucx_request_init_common(ompi_req, false, OMPI_REQUEST_ACTIVE,
123-
mca_pml_ucx_request_free, NULL);
137+
mca_pml_ucx_request_free,
138+
mca_pml_ucx_request_cancel);
124139
}
125140

126141
void mca_pml_ucx_request_cleanup(void *request)
@@ -133,18 +148,35 @@ void mca_pml_ucx_request_cleanup(void *request)
133148

134149
static int mca_pml_ucx_persistent_request_free(ompi_request_t **rptr)
135150
{
136-
mca_pml_ucx_persistent_request_t* req = (mca_pml_ucx_persistent_request_t*)*rptr;
151+
mca_pml_ucx_persistent_request_t* preq = (mca_pml_ucx_persistent_request_t*)*rptr;
152+
ompi_request_t *tmp_req = preq->tmp_req;
137153

154+
preq->ompi.req_state = OMPI_REQUEST_INVALID;
155+
if (tmp_req != NULL) {
156+
mca_pml_ucx_persistent_request_detach(preq, tmp_req);
157+
ucp_request_release(tmp_req);
158+
}
159+
PML_UCX_FREELIST_RETURN(&ompi_pml_ucx.persistent_reqs, &preq->ompi.super);
138160
*rptr = MPI_REQUEST_NULL;
139-
req->ompi.req_state = OMPI_REQUEST_INVALID;
140-
PML_UCX_FREELIST_RETURN(&ompi_pml_ucx.persistent_reqs, &req->ompi.super);
161+
return OMPI_SUCCESS;
162+
}
163+
164+
static int mca_pml_ucx_persistent_request_cancel(ompi_request_t *req, int flag)
165+
{
166+
mca_pml_ucx_persistent_request_t* preq = (mca_pml_ucx_persistent_request_t*)req;
167+
168+
if (preq->tmp_req != NULL) {
169+
ucp_request_cancel(ompi_pml_ucx.ucp_worker, preq->tmp_req);
170+
}
141171
return OMPI_SUCCESS;
142172
}
143173

144174
static void mca_pml_ucx_persisternt_request_construct(mca_pml_ucx_persistent_request_t* req)
145175
{
146176
mca_pml_ucx_request_init_common(&req->ompi, true, OMPI_REQUEST_INACTIVE,
147-
mca_pml_ucx_persistent_request_free, NULL);
177+
mca_pml_ucx_persistent_request_free,
178+
mca_pml_ucx_persistent_request_cancel);
179+
req->tmp_req = NULL;
148180
}
149181

150182
static void mca_pml_ucx_persisternt_request_destruct(mca_pml_ucx_persistent_request_t* req)

ompi/mca/pml/ucx/pml_ucx_request.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ enum {
8989

9090
struct pml_ucx_persistent_request {
9191
ompi_request_t ompi;
92+
ompi_request_t *tmp_req;
9293
unsigned flags;
9394
void *buffer;
9495
size_t count;

0 commit comments

Comments
 (0)