Skip to content

Commit 137346c

Browse files
committed
UCX: treat sizes and disp-units same between shared and regular windows
Signed-off-by: Joseph Schuchart <[email protected]>
1 parent e5cc49a commit 137346c

File tree

2 files changed

+73
-96
lines changed

2 files changed

+73
-96
lines changed

ompi/mca/osc/ucx/osc_ucx.h

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ typedef struct ompi_osc_ucx_module {
121121
struct ompi_communicator_t *comm;
122122
int flavor;
123123
size_t size;
124-
size_t *sizes; /* used if !same_size*/
124+
size_t *sizes; /* used if not every process has the same size */
125125
uint64_t *addrs;
126126
uint64_t *state_addrs;
127127
uint64_t *comm_world_ranks;
@@ -173,7 +173,7 @@ typedef struct ompi_osc_ucx_lock {
173173
} ompi_osc_ucx_lock_t;
174174

175175
#define OSC_UCX_GET_EP(_module, rank_) (mca_osc_ucx_component.endpoints[_module->comm_world_ranks[rank_]])
176-
#define OSC_UCX_GET_DISP(module_, rank_) ((module_->disp_unit < 0) ? module_->disp_units[rank_] : module_->disp_unit)
176+
#define OSC_UCX_GET_DISP(module_, rank_) ompi_osc_ucx_get_disp_unit((module_), (rank_))
177177

178178
#define OSC_UCX_GET_DEFAULT_EP(_ep_ptr, _module, _target) \
179179
if (opal_common_ucx_thread_enabled) { \
@@ -277,4 +277,24 @@ int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_
277277
int ompi_osc_ucx_dynamic_lock(ompi_osc_ucx_module_t *module, int target);
278278
int ompi_osc_ucx_dynamic_unlock(ompi_osc_ucx_module_t *module, int target);
279279

280+
/* returns the size at the peer */
281+
static inline int ompi_osc_ucx_get_size(ompi_osc_ucx_module_t *module, int rank)
282+
{
283+
if (module->sizes) {
284+
return module->sizes[rank];
285+
} else {
286+
return module->size;
287+
}
288+
}
289+
290+
/* returns the displacement unit for the given peer */
291+
static inline int ompi_osc_ucx_get_disp_unit(ompi_osc_ucx_module_t *module, int rank)
292+
{
293+
if (module->disp_units) {
294+
return module->disp_units[rank];
295+
} else {
296+
return module->disp_unit;
297+
}
298+
}
299+
280300
#endif /* OMPI_OSC_UCX_H */

ompi/mca/osc/ucx/osc_ucx_component.c

Lines changed: 51 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ static int ompi_osc_ucx_shared_query_peer(ompi_osc_ucx_module_t *module, int pee
487487
if (UCS_OK != ucp_rkey_ptr(rkey, module->addrs[peer], &addr_p)) {
488488
return OMPI_ERR_NOT_AVAILABLE;
489489
}
490-
*size = module->same_size ? module->size : module->sizes[peer];
490+
*size = ompi_osc_ucx_get_size(module, peer);
491491
*((void**) baseptr) = addr_p;
492492
*disp_unit = (module->disp_unit < 0) ? module->disp_units[peer] : module->disp_unit;
493493

@@ -508,39 +508,30 @@ int ompi_osc_ucx_shared_query(struct ompi_win_t *win, int rank, size_t *size,
508508

509509
if (MPI_PROC_NULL == rank) {
510510
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
511-
if (0 != module->sizes[i]) {
511+
if (0 != ompi_osc_ucx_get_size(module, i)) {
512512
if (OMPI_SUCCESS == ompi_osc_ucx_shared_query_peer(module, i, size, disp_unit, baseptr)) {
513513
return OMPI_SUCCESS;
514514
}
515515
}
516516
}
517517
} else {
518-
if (0 != module->sizes[rank]) {
518+
if (0 != ompi_osc_ucx_get_size(module, rank)) {
519519
return ompi_osc_ucx_shared_query_peer(module, rank, size, disp_unit, baseptr);
520520
}
521521
}
522522
return OMPI_ERR_NOT_SUPPORTED;
523523

524524
} else if (MPI_PROC_NULL != rank) { // shared memory window with given rank
525-
*size = module->sizes[rank];
525+
*size = ompi_osc_ucx_get_size(module, rank);
526526
*((void**) baseptr) = (void *)module->shmem_addrs[rank];
527-
if (module->disp_unit == -1) {
528-
*disp_unit = module->disp_units[rank];
529-
} else {
530-
*disp_unit = module->disp_unit;
531-
}
527+
*disp_unit = ompi_osc_ucx_get_disp_unit(module, rank);
532528
} else { // shared memory window with MPI_PROC_NULL
533-
int i = 0;
534-
535-
for (i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
536-
if (0 != module->sizes[i]) {
537-
*size = module->sizes[i];
529+
for (int i = 0 ; i < ompi_comm_size(module->comm) ; ++i) {
530+
size_t peer_size = ompi_osc_ucx_get_size(module, i);
531+
if (0 != size) {
532+
*size = peer_size;
538533
*((void**) baseptr) = (void *)module->shmem_addrs[i];
539-
if (module->disp_unit == -1) {
540-
*disp_unit = module->disp_units[rank];
541-
} else {
542-
*disp_unit = module->disp_unit;
543-
}
534+
*disp_unit = ompi_osc_ucx_get_disp_unit(module, rank);
544535
break;
545536
}
546537
}
@@ -566,7 +557,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, pt
566557
uint64_t my_info[3] = {0};
567558
char *recv_buf = NULL;
568559
void *dynamic_base = NULL;
569-
unsigned long total, *rbuf;
560+
unsigned long adjusted_size = size;
570561
int flag;
571562
size_t pagesize;
572563
bool unlink_needed = false;
@@ -680,12 +671,42 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, pt
680671
module->acc_single_intrinsic = check_config_value_bool ("acc_single_intrinsic", info);
681672
module->skip_sync_check = false;
682673

674+
if (flavor == MPI_WIN_FLAVOR_SHARED) {
675+
opal_output_verbose(MCA_BASE_VERBOSE_DEBUG, ompi_osc_base_framework.framework_output,
676+
"allocating shared memory region of size %ld\n", (long) size);
677+
/* get the pagesize */
678+
pagesize = opal_getpagesize();
679+
680+
/* Note that the alloc_shared_noncontig info key only has
681+
* meaning during window creation. Once the window is
682+
* created, we can't move memory around without making
683+
* everything miserable. So we intentionally do not subscribe
684+
* to updates on the info key, because there's no useful
685+
* update to occur. */
686+
module->noncontig_shared_win = false;
687+
if (OMPI_SUCCESS != opal_info_get_bool(info, "alloc_shared_noncontig",
688+
&module->noncontig_shared_win, &flag)) {
689+
err = OMPI_ERR_BAD_PARAM;
690+
goto error;
691+
}
692+
693+
if (module->noncontig_shared_win) {
694+
opal_output_verbose(MCA_BASE_VERBOSE_DEBUG, ompi_osc_base_framework.framework_output,
695+
"allocating window using non-contiguous strategy");
696+
adjusted_size = ((size - 1) / pagesize + 1) * pagesize;
697+
} else {
698+
opal_output_verbose(MCA_BASE_VERBOSE_DEBUG, ompi_osc_base_framework.framework_output,
699+
"allocating window using contiguous strategy");
700+
adjusted_size = size;
701+
}
702+
}
703+
683704
/* share everyone's displacement units. Only do an allgather if
684705
strictly necessary, since it requires O(p) state. */
685706
values[0] = disp_unit;
686707
values[1] = -disp_unit;
687-
values[2] = size;
688-
values[3] = -(int64_t)size;
708+
values[2] = adjusted_size;
709+
values[3] = -(long)adjusted_size;
689710

690711
ret = module->comm->c_coll->coll_allreduce(MPI_IN_PLACE, values, 4, MPI_LONG,
691712
MPI_MIN, module->comm,
@@ -710,7 +731,6 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, pt
710731
}
711732

712733
if (!same_disp_unit || !same_size) {
713-
714734
ret = module->comm->c_coll->coll_allgather(values, val_count * sizeof(long), MPI_BYTE,
715735
(void *)my_info, sizeof(long) * val_count, MPI_BYTE,
716736
module->comm,
@@ -743,7 +763,6 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, pt
743763
module->sizes[i] = (size_t)values[i*val_count + val_count-1];
744764
}
745765
}
746-
747766
}
748767

749768
ret = opal_common_ucx_wpctx_create(mca_osc_ucx_component.wpool, comm_size,
@@ -755,50 +774,14 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, pt
755774

756775
if (flavor == MPI_WIN_FLAVOR_SHARED) {
757776
/* create the segment */
758-
opal_output_verbose(MCA_BASE_VERBOSE_DEBUG, ompi_osc_base_framework.framework_output,
759-
"allocating shared memory region of size %ld\n", (long) size);
760-
/* get the pagesize */
761-
pagesize = opal_getpagesize();
762-
763-
rbuf = malloc(sizeof(unsigned long) * comm_size);
764-
if (NULL == rbuf) return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
765-
766-
/* Note that the alloc_shared_noncontig info key only has
767-
* meaning during window creation. Once the window is
768-
* created, we can't move memory around without making
769-
* everything miserable. So we intentionally do not subscribe
770-
* to updates on the info key, because there's no useful
771-
* update to occur. */
772-
module->noncontig_shared_win = false;
773-
if (OMPI_SUCCESS != opal_info_get_bool(info, "alloc_shared_noncontig",
774-
&module->noncontig_shared_win, &flag)) {
775-
free(rbuf);
776-
goto error;
777-
}
778-
779-
if (module->noncontig_shared_win) {
780-
opal_output_verbose(MCA_BASE_VERBOSE_DEBUG, ompi_osc_base_framework.framework_output,
781-
"allocating window using non-contiguous strategy");
782-
total = ((size - 1) / pagesize + 1) * pagesize;
783-
} else {
784-
opal_output_verbose(MCA_BASE_VERBOSE_DEBUG, ompi_osc_base_framework.framework_output,
785-
"allocating window using contiguous strategy");
786-
total = size;
787-
}
788-
ret = module->comm->c_coll->coll_allgather(&total, 1, MPI_UNSIGNED_LONG,
789-
rbuf, 1, MPI_UNSIGNED_LONG,
790-
module->comm,
791-
module->comm->c_coll->coll_allgather_module);
792-
if (OMPI_SUCCESS != ret) return ret;
793777

794-
total = 0;
778+
unsigned long total = 0;
795779
for (i = 0 ; i < comm_size ; ++i) {
796-
total += rbuf[i];
780+
total += ompi_osc_ucx_get_size(module, i);
797781
}
798782

799783
module->segment_base = NULL;
800784
module->shmem_addrs = NULL;
801-
module->sizes = NULL;
802785

803786
if (total != 0) {
804787
/* user opal/shmem directly to create a shared memory segment */
@@ -809,14 +792,12 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, pt
809792
OMPI_PROC_MY_NAME->jobid, (int) OMPI_PROC_MY_NAME->vpid,
810793
ompi_comm_print_cid(module->comm));
811794
if (ret < 0) {
812-
free(rbuf);
813795
return OMPI_ERR_OUT_OF_RESOURCE;
814796
}
815797

816798
ret = opal_shmem_segment_create (&module->seg_ds, data_file, total);
817799
free(data_file);
818800
if (OPAL_SUCCESS != ret) {
819-
free(rbuf);
820801
goto error;
821802
}
822803

@@ -826,20 +807,18 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, pt
826807
ret = module->comm->c_coll->coll_bcast (&module->seg_ds, sizeof (module->seg_ds), MPI_BYTE, 0,
827808
module->comm, module->comm->c_coll->coll_bcast_module);
828809
if (OMPI_SUCCESS != ret) {
829-
free(rbuf);
830810
goto error;
831811
}
832812

833813
module->segment_base = opal_shmem_segment_attach (&module->seg_ds);
834814
if (NULL == module->segment_base) {
835-
free(rbuf);
815+
ret = OMPI_ERR_OUT_OF_RESOURCE;
836816
goto error;
837817
}
838818

839819
/* wait for all processes to attach */
840820
ret = module->comm->c_coll->coll_barrier (module->comm, module->comm->c_coll->coll_barrier_module);
841821
if (OMPI_SUCCESS != ret) {
842-
free(rbuf);
843822
goto error;
844823
}
845824

@@ -854,47 +833,26 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, pt
854833
* different between different processes. To use direct load/store,
855834
* shmem_addrs can be used, however, for RDMA, virtual address of
856835
* remote process that will be stored in module->addrs should be used */
857-
module->sizes = malloc(sizeof(size_t) * comm_size);
858-
if (NULL == module->sizes) {
859-
free(rbuf);
860-
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
861-
goto error;
862-
}
863836
module->shmem_addrs = malloc(sizeof(uint64_t) * comm_size);
864837
if (NULL == module->shmem_addrs) {
865838
free(module->sizes);
866-
free(rbuf);
867839
ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE;
868840
goto error;
869841
}
870842

871843

872844
for (i = 0, total = 0; i < comm_size ; ++i) {
873-
module->sizes[i] = rbuf[i];
874-
if (module->sizes[i] || !module->noncontig_shared_win) {
845+
size_t size = ompi_osc_ucx_get_size(module, i);
846+
if (size || !module->noncontig_shared_win) {
875847
module->shmem_addrs[i] = ((uint64_t) module->segment_base) + total;
876-
total += rbuf[i];
848+
total += size;
877849
} else {
878850
module->shmem_addrs[i] = (uint64_t)NULL;
879851
}
880852
}
881853

882-
free(rbuf);
883-
884-
module->size = module->sizes[ompi_comm_rank(module->comm)];
854+
module->size = ompi_osc_ucx_get_size(module, ompi_comm_rank(module->comm));
885855
*base = (void *)module->shmem_addrs[ompi_comm_rank(module->comm)];
886-
} else {
887-
/* non-shared memory: exchange sizes and addresses so they can be queried for shared memory */
888-
for (i = 0; i < comm_size; i++) {
889-
ompi_proc_t *peer = ompi_comm_peer_lookup(module->comm, i);
890-
peer->
891-
if (ompi_comm_peer_lookup(module->comm, i) == NULL) {
892-
OSC_UCX_ERROR("Failed to lookup peer %d in communicator %s", i, ompi_comm_print_cid(module->comm));
893-
ret = OMPI_ERR_COMM_FAILURE;
894-
goto error;
895-
}
896-
}
897-
898856
}
899857

900858
void **mem_base = base;
@@ -1030,6 +988,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, pt
1030988
error:
1031989
if (module->disp_units) free(module->disp_units);
1032990
if (module->comm) ompi_comm_free(&module->comm);
991+
if (module->sizes) ompi_comm_free(&module->sizes);
1033992
free(module);
1034993
module = NULL;
1035994

@@ -1256,8 +1215,6 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
12561215
opal_shmem_segment_detach(&module->seg_ds);
12571216
if (module->shmem_addrs != NULL)
12581217
free(module->shmem_addrs);
1259-
if (module->sizes != NULL)
1260-
free(module->sizes);
12611218
}
12621219

12631220
if (module->flavor == MPI_WIN_FLAVOR_DYNAMIC) {

0 commit comments

Comments
 (0)