Skip to content

Commit 1b58e3d

Browse files
author
Tomislav Janjusic
committed
oshmem/ucx: Improves performance for non-blocking put/get operations.
Improves the performance when excess non-blocking operations are posted by periodically calling progress on ucx workers. Co-authored with: Artem Y. Polyakov <[email protected]>, Manjunath Gorentla Venkata <[email protected]> Signed-off-by: Tomislav Janjusic <[email protected]>
1 parent d14d8ad commit 1b58e3d

File tree

3 files changed

+109
-0
lines changed

3 files changed

+109
-0
lines changed

oshmem/mca/spml/ucx/spml_ucx.c

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,30 @@ int mca_spml_ucx_get_nb(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_
774774
return ucx_status_to_oshmem_nb(status);
775775
}
776776

777+
int mca_spml_ucx_get_nb_wprogress(shmem_ctx_t ctx, void *src_addr, size_t size, void *dst_addr, int src, void **handle)
778+
{
779+
unsigned int i;
780+
void *rva;
781+
ucs_status_t status;
782+
spml_ucx_mkey_t *ucx_mkey;
783+
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
784+
785+
ucx_mkey = mca_spml_ucx_get_mkey(ctx, src, src_addr, &rva, &mca_spml_ucx);
786+
status = ucp_get_nbi(ucx_ctx->ucp_peers[src].ucp_conn, dst_addr, size,
787+
(uint64_t)rva, ucx_mkey->rkey);
788+
789+
if (++ucx_ctx->nb_progress_cnt > mca_spml_ucx.nb_get_progress_thresh) {
790+
for (i = 0; i < mca_spml_ucx.nb_ucp_worker_progress; i++) {
791+
if (!ucp_worker_progress(ucx_ctx->ucp_worker)) {
792+
ucx_ctx->nb_progress_cnt = 0;
793+
break;
794+
}
795+
}
796+
}
797+
798+
return ucx_status_to_oshmem_nb(status);
799+
}
800+
777801
int mca_spml_ucx_put(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_addr, int dst)
778802
{
779803
void *rva;
@@ -822,7 +846,33 @@ int mca_spml_ucx_put_nb(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_
822846
return ucx_status_to_oshmem_nb(status);
823847
}
824848

849+
int mca_spml_ucx_put_nb_wprogress(shmem_ctx_t ctx, void* dst_addr, size_t size, void* src_addr, int dst, void **handle)
850+
{
851+
unsigned int i;
852+
void *rva;
853+
ucs_status_t status;
854+
spml_ucx_mkey_t *ucx_mkey;
855+
mca_spml_ucx_ctx_t *ucx_ctx = (mca_spml_ucx_ctx_t *)ctx;
856+
857+
ucx_mkey = mca_spml_ucx_get_mkey(ctx, dst, dst_addr, &rva, &mca_spml_ucx);
858+
status = ucp_put_nbi(ucx_ctx->ucp_peers[dst].ucp_conn, src_addr, size,
859+
(uint64_t)rva, ucx_mkey->rkey);
860+
861+
if (OPAL_LIKELY(status >= 0)) {
862+
mca_spml_ucx_remote_op_posted(ucx_ctx, dst);
863+
}
825864

865+
if (++ucx_ctx->nb_progress_cnt > mca_spml_ucx.nb_put_progress_thresh) {
866+
for (i = 0; i < mca_spml_ucx.nb_ucp_worker_progress; i++) {
867+
if (!ucp_worker_progress(ucx_ctx->ucp_worker)) {
868+
ucx_ctx->nb_progress_cnt = 0;
869+
break;
870+
}
871+
}
872+
}
873+
874+
return ucx_status_to_oshmem_nb(status);
875+
}
826876

827877
int mca_spml_ucx_fence(shmem_ctx_t ctx)
828878
{
@@ -880,6 +930,8 @@ int mca_spml_ucx_quiet(shmem_ctx_t ctx)
880930
}
881931
}
882932

933+
ucx_ctx->nb_progress_cnt = 0;
934+
883935
return OSHMEM_SUCCESS;
884936
}
885937

oshmem/mca/spml/ucx/spml_ucx.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ struct mca_spml_ucx_ctx {
7171
ucp_peer_t *ucp_peers;
7272
long options;
7373
opal_bitmap_t put_op_bitmap;
74+
unsigned long nb_progress_cnt;
7475
int *put_proc_indexes;
7576
unsigned put_proc_count;
7677
};
@@ -108,6 +109,10 @@ struct mca_spml_ucx {
108109
pthread_spinlock_t async_lock;
109110
int aux_refcnt;
110111
bool synchronized_quiet;
112+
unsigned long nb_progress_thresh_global;
113+
unsigned long nb_put_progress_thresh;
114+
unsigned long nb_get_progress_thresh;
115+
unsigned long nb_ucp_worker_progress;
111116
};
112117
typedef struct mca_spml_ucx mca_spml_ucx_t;
113118

@@ -122,13 +127,21 @@ extern int mca_spml_ucx_get(shmem_ctx_t ctx,
122127
size_t size,
123128
void* src_addr,
124129
int src);
130+
125131
extern int mca_spml_ucx_get_nb(shmem_ctx_t ctx,
126132
void* dst_addr,
127133
size_t size,
128134
void* src_addr,
129135
int src,
130136
void **handle);
131137

138+
extern int mca_spml_ucx_get_nb_wprogress(shmem_ctx_t ctx,
139+
void* dst_addr,
140+
size_t size,
141+
void* src_addr,
142+
int src,
143+
void **handle);
144+
132145
extern int mca_spml_ucx_put(shmem_ctx_t ctx,
133146
void* dst_addr,
134147
size_t size,
@@ -142,6 +155,13 @@ extern int mca_spml_ucx_put_nb(shmem_ctx_t ctx,
142155
int dst,
143156
void **handle);
144157

158+
extern int mca_spml_ucx_put_nb_wprogress(shmem_ctx_t ctx,
159+
void* dst_addr,
160+
size_t size,
161+
void* src_addr,
162+
int dst,
163+
void **handle);
164+
145165
extern int mca_spml_ucx_recv(void* buf, size_t size, int src);
146166
extern int mca_spml_ucx_send(void* buf,
147167
size_t size,

oshmem/mca/spml/ucx/spml_ucx_component.c

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,20 @@ mca_spml_base_component_2_0_0_t mca_spml_ucx_component = {
6060
.spmlm_finalize = mca_spml_ucx_component_fini
6161
};
6262

63+
static inline void mca_spml_ucx_param_register_ulong(const char* param_name,
64+
unsigned long default_value,
65+
const char *help_msg,
66+
unsigned long *storage)
67+
{
68+
*storage = default_value;
69+
(void) mca_base_component_var_register(&mca_spml_ucx_component.spmlm_version,
70+
param_name,
71+
help_msg,
72+
MCA_BASE_VAR_TYPE_UNSIGNED_LONG, NULL, 0, 0,
73+
OPAL_INFO_LVL_9,
74+
MCA_BASE_VAR_SCOPE_READONLY,
75+
storage);
76+
}
6377

6478
static inline void mca_spml_ucx_param_register_int(const char* param_name,
6579
int default_value,
@@ -132,6 +146,22 @@ static int mca_spml_ucx_component_register(void)
132146
"Use synchronized quiet on shmem_quiet or shmem_barrier_all operations",
133147
&mca_spml_ucx.synchronized_quiet);
134148

149+
mca_spml_ucx_param_register_ulong("nb_progress_thresh_global", 0,
150+
"Number of nb_put or nb_get operations before ucx progress is triggered. Disabled by default (0)",
151+
&mca_spml_ucx.nb_progress_thresh_global);
152+
153+
mca_spml_ucx_param_register_ulong("nb_put_progress_thresh", mca_spml_ucx.nb_progress_thresh_global,
154+
"Number of nb_put operations before ucx progress is triggered. Disabled by default (0), setting this value will override nb_progress_thresh_global",
155+
&mca_spml_ucx.nb_put_progress_thresh);
156+
157+
mca_spml_ucx_param_register_ulong("nb_get_progress_thresh", mca_spml_ucx.nb_progress_thresh_global,
158+
"Number of nb_get operations before ucx progress is triggered. Disabled by default (0), setting this value will override nb_progress_thresh_global ",
159+
&mca_spml_ucx.nb_get_progress_thresh);
160+
161+
mca_spml_ucx_param_register_ulong("nb_ucp_worker_progress", 32,
162+
"Maximum number of ucx worker progress calls if triggered during nb_put or nb_get",
163+
&mca_spml_ucx.nb_ucp_worker_progress);
164+
135165
opal_common_ucx_mca_var_register(&mca_spml_ucx_component.spmlm_version);
136166

137167
return OSHMEM_SUCCESS;
@@ -294,6 +324,13 @@ static int spml_ucx_init(void)
294324
mca_spml_ucx.aux_ctx = NULL;
295325
mca_spml_ucx.aux_refcnt = 0;
296326

327+
if (mca_spml_ucx.nb_put_progress_thresh) {
328+
mca_spml_ucx.super.spml_put_nb = &mca_spml_ucx_put_nb_wprogress;
329+
}
330+
if (mca_spml_ucx.nb_get_progress_thresh) {
331+
mca_spml_ucx.super.spml_get_nb = &mca_spml_ucx_get_nb_wprogress;
332+
}
333+
297334
oshmem_ctx_default = (shmem_ctx_t) &mca_spml_ucx_ctx_default;
298335

299336
return OSHMEM_SUCCESS;

0 commit comments

Comments
 (0)