Skip to content

Commit 3fd5c84

Browse files
authored
Merge pull request #6718 from hoopoepg/topic/pci-flush-on-quiet-v4.0
SPML/UCX: added synchronized flush on quiet - v4.0
2 parents 4a7f6a4 + 69923e7 commit 3fd5c84

File tree

5 files changed

+115
-4
lines changed

5 files changed

+115
-4
lines changed

oshmem/mca/atomic/ucx/atomic_ucx_cswap.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ int mca_atomic_ucx_cswap(shmem_ctx_t ctx,
4545
UCP_ATOMIC_FETCH_OP_CSWAP, cond, prev, size,
4646
rva, ucx_mkey->rkey,
4747
opal_common_ucx_empty_complete_cb);
48+
49+
if (OPAL_LIKELY(!UCS_PTR_IS_ERR(status_ptr))) {
50+
mca_spml_ucx_remote_op_posted(ucx_ctx, pe);
51+
}
52+
4853
return opal_common_ucx_wait_request(status_ptr, ucx_ctx->ucp_worker,
4954
"ucp_atomic_fetch_nb");
5055
}

oshmem/mca/atomic/ucx/atomic_ucx_module.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ int mca_atomic_ucx_op(shmem_ctx_t ctx,
5151
status = ucp_atomic_post(ucx_ctx->ucp_peers[pe].ucp_conn,
5252
op, value, size, rva,
5353
ucx_mkey->rkey);
54+
55+
if (OPAL_LIKELY(UCS_OK == status)) {
56+
mca_spml_ucx_remote_op_posted(ucx_ctx, pe);
57+
}
58+
5459
return ucx_status_to_oshmem(status);
5560
}
5661

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ mca_spml_ucx_t mca_spml_ucx = {
8080
.num_disconnect = 1,
8181
.heap_reg_nb = 0,
8282
.enabled = 0,
83-
.get_mkey_slow = NULL
83+
.get_mkey_slow = NULL,
84+
.synchronized_quiet = false
8485
};
8586

8687
mca_spml_ucx_ctx_t mca_spml_ucx_ctx_default = {
@@ -216,6 +217,40 @@ static void dump_address(int pe, char *addr, size_t len)
216217

217218
static char spml_ucx_transport_ids[1] = { 0 };
218219

220+
int mca_spml_ucx_init_put_op_mask(mca_spml_ucx_ctx_t *ctx, size_t nprocs)
221+
{
222+
int res;
223+
224+
if (mca_spml_ucx.synchronized_quiet) {
225+
ctx->put_proc_indexes = malloc(nprocs * sizeof(*ctx->put_proc_indexes));
226+
if (NULL == ctx->put_proc_indexes) {
227+
return OSHMEM_ERR_OUT_OF_RESOURCE;
228+
}
229+
230+
OBJ_CONSTRUCT(&ctx->put_op_bitmap, opal_bitmap_t);
231+
res = opal_bitmap_init(&ctx->put_op_bitmap, nprocs);
232+
if (OPAL_SUCCESS != res) {
233+
free(ctx->put_proc_indexes);
234+
ctx->put_proc_indexes = NULL;
235+
return res;
236+
}
237+
238+
ctx->put_proc_count = 0;
239+
}
240+
241+
return OSHMEM_SUCCESS;
242+
}
243+
244+
int mca_spml_ucx_clear_put_op_mask(mca_spml_ucx_ctx_t *ctx)
245+
{
246+
if (mca_spml_ucx.synchronized_quiet && ctx->put_proc_indexes) {
247+
OBJ_DESTRUCT(&ctx->put_op_bitmap);
248+
free(ctx->put_proc_indexes);
249+
}
250+
251+
return OSHMEM_SUCCESS;
252+
}
253+
219254
int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
220255
{
221256
size_t i, j, n;
@@ -235,6 +270,11 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
235270
goto error;
236271
}
237272

273+
rc = mca_spml_ucx_init_put_op_mask(&mca_spml_ucx_ctx_default, nprocs);
274+
if (OSHMEM_SUCCESS != rc) {
275+
goto error;
276+
}
277+
238278
err = ucp_worker_get_address(mca_spml_ucx_ctx_default.ucp_worker, &wk_local_addr, &wk_addr_len);
239279
if (err != UCS_OK) {
240280
goto error;
@@ -297,6 +337,8 @@ int mca_spml_ucx_add_procs(ompi_proc_t** procs, size_t nprocs)
297337
free(mca_spml_ucx.remote_addrs_tbl[i]);
298338
}
299339
}
340+
341+
mca_spml_ucx_clear_put_op_mask(&mca_spml_ucx_ctx_default);
300342
if (mca_spml_ucx_ctx_default.ucp_peers)
301343
free(mca_spml_ucx_ctx_default.ucp_peers);
302344
if (mca_spml_ucx.remote_addrs_tbl)
@@ -583,6 +625,11 @@ static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx
583625
goto error;
584626
}
585627

628+
rc = mca_spml_ucx_init_put_op_mask(ucx_ctx, nprocs);
629+
if (OSHMEM_SUCCESS != rc) {
630+
goto error2;
631+
}
632+
586633
for (i = 0; i < nprocs; i++) {
587634
ep_params.field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS;
588635
ep_params.address = (ucp_address_t *)(mca_spml_ucx.remote_addrs_tbl[i]);
@@ -621,6 +668,8 @@ static int mca_spml_ucx_ctx_create_common(long options, mca_spml_ucx_ctx_t **ucx
621668
}
622669
}
623670

671+
mca_spml_ucx_clear_put_op_mask(ucx_ctx);
672+
624673
if (ucx_ctx->ucp_peers)
625674
free(ucx_ctx->ucp_peers);
626675

@@ -715,6 +764,7 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add
715764
void *rva;
716765
spml_ucx_mkey_t *ucx_mkey;
717766
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
767+
int res;
718768
#if HAVE_DECL_UCP_PUT_NB
719769
ucs_status_ptr_t request;
720770
#else
@@ -725,12 +775,18 @@ int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_add
725775
#if HAVE_DECL_UCP_PUT_NB
726776
request = ucp_put_nb(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
727777
(uint64_t)rva, ucx_mkey->rkey, opal_common_ucx_empty_complete_cb);
728-
return opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker, "ucp_put_nb");
778+
res = opal_common_ucx_wait_request(request, ucx_ctx->ucp_worker, "ucp_put_nb");
729779
#else
730780
status = ucp_put(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
731781
(uint64_t)rva, ucx_mkey->rkey);
732-
return ucx_status_to_oshmem(status);
782+
res = ucx_status_to_oshmem(status);
733783
#endif
784+
785+
if (OPAL_LIKELY(OSHMEM_SUCCESS == res)) {
786+
mca_spml_ucx_remote_op_posted(ucx_ctx, dst);
787+
}
788+
789+
return res;
734790
}
735791

736792
int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_addr, int dst, void **handle)
@@ -744,6 +800,10 @@ int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_
744800
status = ucp_put_nbi(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
745801
(uint64_t)rva, ucx_mkey->rkey);
746802

803+
if (OPAL_LIKELY(status >= 0)) {
804+
mca_spml_ucx_remote_op_posted(ucx_ctx, dst);
805+
}
806+
747807
return ucx_status_to_oshmem_nb(status);
748808
}
749809

@@ -767,9 +827,28 @@ int mca_spml_ucx_fence(shmem_ctx_t ctx)
767827

768828
int mca_spml_ucx_quiet(shmem_ctx_t ctx)
769829
{
830+
int flush_get_data;
770831
int ret;
832+
unsigned i;
833+
int idx;
771834
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
772835

836+
if (mca_spml_ucx.synchronized_quiet) {
837+
for (i = 0; i < ucx_ctx->put_proc_count; i++) {
838+
idx = ucx_ctx->put_proc_indexes[i];
839+
ret = mca_spml_ucx_get_nb(ctx,
840+
ucx_ctx->ucp_peers[idx].mkeys->super.super.va_base,
841+
sizeof(flush_get_data), &flush_get_data, idx, NULL);
842+
if (OMPI_SUCCESS != ret) {
843+
oshmem_shmem_abort(-1);
844+
return ret;
845+
}
846+
847+
opal_bitmap_clear_bit(&ucx_ctx->put_op_bitmap, idx);
848+
}
849+
ucx_ctx->put_proc_count = 0;
850+
}
851+
773852
opal_atomic_wmb();
774853

775854
ret = opal_common_ucx_worker_flush(ucx_ctx->ucp_worker);

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
#include "opal/class/opal_free_list.h"
3535
#include "opal/class/opal_list.h"
36+
#include "opal/class/opal_bitmap.h"
3637

3738
#include "orte/runtime/orte_globals.h"
3839
#include "opal/mca/common/ucx/common_ucx.h"
@@ -70,6 +71,9 @@ struct mca_spml_ucx_ctx {
7071
ucp_worker_h ucp_worker;
7172
ucp_peer_t *ucp_peers;
7273
long options;
74+
opal_bitmap_t put_op_bitmap;
75+
int *put_proc_indexes;
76+
unsigned put_proc_count;
7377
};
7478
typedef struct mca_spml_ucx_ctx mca_spml_ucx_ctx_t;
7579

@@ -104,7 +108,7 @@ struct mca_spml_ucx {
104108
mca_spml_ucx_ctx_t *aux_ctx;
105109
pthread_spinlock_t async_lock;
106110
int aux_refcnt;
107-
111+
bool synchronized_quiet;
108112
};
109113
typedef struct mca_spml_ucx mca_spml_ucx_t;
110114

@@ -171,6 +175,9 @@ extern int spml_ucx_ctx_progress(void);
171175
extern int spml_ucx_progress_aux_ctx(void);
172176
void mca_spml_ucx_async_cb(int fd, short event, void *cbdata);
173177

178+
int mca_spml_ucx_init_put_op_mask(mca_spml_ucx_ctx_t *ctx, size_t nprocs);
179+
int mca_spml_ucx_clear_put_op_mask(mca_spml_ucx_ctx_t *ctx);
180+
174181
static inline void mca_spml_ucx_aux_lock(void)
175182
{
176183
if (mca_spml_ucx.async_progress) {
@@ -224,6 +231,16 @@ static inline int ucx_status_to_oshmem_nb(ucs_status_t status)
224231
#endif
225232
}
226233

234+
static inline void mca_spml_ucx_remote_op_posted(mca_spml_ucx_ctx_t *ctx, int dst)
235+
{
236+
if (OPAL_UNLIKELY(mca_spml_ucx.synchronized_quiet)) {
237+
if (!opal_bitmap_is_set_bit(&ctx->put_op_bitmap, dst)) {
238+
ctx->put_proc_indexes[ctx->put_proc_count++] = dst;
239+
opal_bitmap_set_bit(&ctx->put_op_bitmap, dst);
240+
}
241+
}
242+
}
243+
227244
#define MCA_SPML_UCX_CTXS_ARRAY_SIZE 64
228245
#define MCA_SPML_UCX_CTXS_ARRAY_INC 64
229246

oshmem/mca/spml/ucx/spml_ucx_component.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ static int mca_spml_ucx_component_register(void)
128128
"Asynchronous progress tick granularity (in usec)",
129129
&mca_spml_ucx.async_tick);
130130

131+
mca_spml_ucx_param_register_bool("synchronized_quiet", 0,
132+
"Use synchronized quiet on shmem_quiet or shmem_barrier_all operations",
133+
&mca_spml_ucx.synchronized_quiet);
134+
131135
opal_common_ucx_mca_var_register(&mca_spml_ucx_component.spmlm_version);
132136

133137
return OSHMEM_SUCCESS;
@@ -329,6 +333,7 @@ static void _ctx_cleanup(mca_spml_ucx_ctx_t *ctx)
329333
mca_spml_ucx.num_disconnect,
330334
ctx->ucp_worker);
331335
free(del_procs);
336+
mca_spml_ucx_clear_put_op_mask(ctx);
332337
free(ctx->ucp_peers);
333338
}
334339

0 commit comments

Comments
 (0)