2222
2323#include "osc_ucx.h"
2424#include "osc_ucx_request.h"
25+ #include "opal/util/sys_limits.h"
2526
2627#define memcpy_off (_dst , _src , _len , _off ) \
2728 memcpy(((char*)(_dst)) + (_off), _src, _len); \
@@ -81,6 +82,7 @@ ompi_osc_ucx_component_t mca_osc_ucx_component = {
8182
8283ompi_osc_ucx_module_t ompi_osc_ucx_module_template = {
8384 {
85+ .osc_win_shared_query = ompi_osc_ucx_shared_query ,
8486 .osc_win_attach = ompi_osc_ucx_win_attach ,
8587 .osc_win_detach = ompi_osc_ucx_win_detach ,
8688 .osc_free = ompi_osc_ucx_free ,
@@ -182,6 +184,19 @@ static int component_register(void) {
182184
183185 opal_common_ucx_mca_var_register (& mca_osc_ucx_component .super .osc_version );
184186
187+ if (0 == access ("/dev/shm" , W_OK )) {
188+ mca_osc_ucx_component .backing_directory = "/dev/shm" ;
189+ } else {
190+ mca_osc_ucx_component .backing_directory = ompi_process_info .proc_session_dir ;
191+ }
192+
193+ (void ) mca_base_component_var_register (& mca_osc_ucx_component .super .osc_version , "backing_directory" ,
194+ "Directory to place backing files for memory windows. "
195+ "This directory should be on a local filesystem such as /tmp or "
196+ "/dev/shm (default: (linux) /dev/shm, (others) session directory)" ,
197+ MCA_BASE_VAR_TYPE_STRING , NULL , 0 , 0 , OPAL_INFO_LVL_3 ,
198+ MCA_BASE_VAR_SCOPE_READONLY , & mca_osc_ucx_component .backing_directory );
199+
185200 return OMPI_SUCCESS ;
186201}
187202
@@ -210,7 +225,6 @@ static int component_finalize(void) {
210225
211226static int component_query (struct ompi_win_t * win , void * * base , size_t size , int disp_unit ,
212227 struct ompi_communicator_t * comm , struct opal_info_t * info , int flavor ) {
213- if (MPI_WIN_FLAVOR_SHARED == flavor ) return -1 ;
214228 return mca_osc_ucx_component .priority ;
215229}
216230
@@ -299,14 +313,54 @@ static const char* ompi_osc_ucx_set_no_lock_info(opal_infosubscriber_t *obj, con
299313 return module -> no_locks ? "true" : "false" ;
300314}
301315
316+ int ompi_osc_ucx_shared_query (struct ompi_win_t * win , int rank , size_t * size ,
317+ int * disp_unit , void * baseptr )
318+ {
319+ ompi_osc_ucx_module_t * module =
320+ (ompi_osc_ucx_module_t * ) win -> w_osc_module ;
321+
322+ if (module -> flavor != MPI_WIN_FLAVOR_SHARED ) {
323+ return MPI_ERR_WIN ;
324+ }
325+
326+ if (MPI_PROC_NULL != rank ) {
327+ * size = module -> sizes [rank ];
328+ * ((void * * ) baseptr ) = module -> shmem_addrs [rank ];
329+ if (module -> disp_unit == -1 ) {
330+ * disp_unit = module -> disp_units [rank ];
331+ } else {
332+ * disp_unit = module -> disp_unit ;
333+ }
334+ } else {
335+ int i = 0 ;
336+
337+ * size = 0 ;
338+ * ((void * * ) baseptr ) = NULL ;
339+ * disp_unit = 0 ;
340+ for (i = 0 ; i < ompi_comm_size (module -> comm ) ; ++ i ) {
341+ if (0 != module -> sizes [i ]) {
342+ * size = module -> sizes [i ];
343+ * ((void * * ) baseptr ) = module -> shmem_addrs [i ];
344+ if (module -> disp_unit == -1 ) {
345+ * disp_unit = module -> disp_units [rank ];
346+ } else {
347+ * disp_unit = module -> disp_unit ;
348+ }
349+ break ;
350+ }
351+ }
352+ }
353+
354+ return OMPI_SUCCESS ;
355+ }
356+
302357static int component_select (struct ompi_win_t * win , void * * base , size_t size , int disp_unit ,
303358 struct ompi_communicator_t * comm , struct opal_info_t * info ,
304359 int flavor , int * model ) {
305360 ompi_osc_ucx_module_t * module = NULL ;
306361 char * name = NULL ;
307362 long values [2 ];
308363 int ret = OMPI_SUCCESS ;
309- //ucs_status_t status;
310364 int i , comm_size = ompi_comm_size (comm );
311365 bool env_initialized = false;
312366 void * state_base = NULL ;
@@ -316,12 +370,10 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
316370 int my_mem_addr_size ;
317371 void * my_info = NULL ;
318372 char * recv_buf = NULL ;
319-
320- /* the osc/sm component is the exclusive provider for support for
321- * shared memory windows */
322- if (flavor == MPI_WIN_FLAVOR_SHARED ) {
323- return OMPI_ERR_NOT_SUPPORTED ;
324- }
373+ unsigned long total , * rbuf ;
374+ int flag ;
375+ size_t pagesize ;
376+ bool unlink_needed = false;
325377
326378 /* May be called concurrently - protect */
327379 _osc_ucx_init_lock ();
@@ -442,17 +494,151 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
442494 goto error ;
443495 }
444496
445- if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE ) {
497+ if (flavor == MPI_WIN_FLAVOR_SHARED ) {
498+ /* create the segment */
499+ opal_output_verbose (MCA_BASE_VERBOSE_DEBUG , ompi_osc_base_framework .framework_output ,
500+ "allocating shared memory region of size %ld\n" , (long ) size );
501+ /* get the pagesize */
502+ pagesize = opal_getpagesize ();
503+
504+ rbuf = malloc (sizeof (unsigned long ) * comm_size );
505+ if (NULL == rbuf ) return OMPI_ERR_TEMP_OUT_OF_RESOURCE ;
506+
507+ /* Note that the alloc_shared_noncontig info key only has
508+ * meaning during window creation. Once the window is
509+ * created, we can't move memory around without making
510+ * everything miserable. So we intentionally do not subcribe
511+ * to updates on the info key, because there's no useful
512+ * update to occur. */
513+ module -> noncontig_shared_win = false;
514+ if (OMPI_SUCCESS != opal_info_get_bool (info , "alloc_shared_noncontig" ,
515+ & module -> noncontig_shared_win , & flag )) {
516+ goto error ;
517+ }
518+
519+ if (module -> noncontig_shared_win ) {
520+ opal_output_verbose (MCA_BASE_VERBOSE_DEBUG , ompi_osc_base_framework .framework_output ,
521+ "allocating window using non-contiguous strategy" );
522+ total = ((size - 1 ) / pagesize + 1 ) * pagesize ;
523+ } else {
524+ opal_output_verbose (MCA_BASE_VERBOSE_DEBUG , ompi_osc_base_framework .framework_output ,
525+ "allocating window using contiguous strategy" );
526+ total = size ;
527+ }
528+ ret = module -> comm -> c_coll -> coll_allgather (& total , 1 , MPI_UNSIGNED_LONG ,
529+ rbuf , 1 , MPI_UNSIGNED_LONG ,
530+ module -> comm ,
531+ module -> comm -> c_coll -> coll_allgather_module );
532+ if (OMPI_SUCCESS != ret ) return ret ;
533+
534+ total = 0 ;
535+ for (i = 0 ; i < comm_size ; ++ i ) {
536+ total += rbuf [i ];
537+ }
538+
539+ module -> segment_base = NULL ;
540+ module -> shmem_addrs = NULL ;
541+ module -> sizes = NULL ;
542+
543+ if (total != 0 ) {
544+ /* user opal/shmem directly to create a shared memory segment */
545+ if (0 == ompi_comm_rank (module -> comm )) {
546+ char * data_file ;
547+ ret = opal_asprintf (& data_file , "%s" OPAL_PATH_SEP "osc_ucx.%s.%x.%d.%s" ,
548+ mca_osc_ucx_component .backing_directory , ompi_process_info .nodename ,
549+ OMPI_PROC_MY_NAME -> jobid , (int ) OMPI_PROC_MY_NAME -> vpid ,
550+ ompi_comm_print_cid (module -> comm ));
551+ if (ret < 0 ) {
552+ free (rbuf );
553+ return OMPI_ERR_OUT_OF_RESOURCE ;
554+ }
555+
556+ ret = opal_shmem_segment_create (& module -> seg_ds , data_file , total );
557+ free (data_file );
558+ if (OPAL_SUCCESS != ret ) {
559+ free (rbuf );
560+ goto error ;
561+ }
562+
563+ unlink_needed = true;
564+ }
565+
566+ ret = module -> comm -> c_coll -> coll_bcast (& module -> seg_ds , sizeof (module -> seg_ds ), MPI_BYTE , 0 ,
567+ module -> comm , module -> comm -> c_coll -> coll_bcast_module );
568+ if (OMPI_SUCCESS != ret ) {
569+ free (rbuf );
570+ goto error ;
571+ }
572+
573+ module -> segment_base = opal_shmem_segment_attach (& module -> seg_ds );
574+ if (NULL == module -> segment_base ) {
575+ free (rbuf );
576+ goto error ;
577+ }
578+
579+ /* wait for all processes to attach */
580+ ret = module -> comm -> c_coll -> coll_barrier (module -> comm , module -> comm -> c_coll -> coll_barrier_module );
581+ if (OMPI_SUCCESS != ret ) {
582+ free (rbuf );
583+ goto error ;
584+ }
585+
586+ if (0 == ompi_comm_rank (module -> comm )) {
587+ opal_shmem_unlink (& module -> seg_ds );
588+ unlink_needed = false;
589+ }
590+ }
591+
592+ /* Although module->segment_base is pointing to a same physical address
593+ * for all the processes, its value which is a virtual address can be
594+ * different between different processes. To use direct load/store,
595+ * shmem_addrs can be used, however, for RDMA, virtual address of
596+ * remote process that will be stored in module->addrs should be used */
597+ module -> sizes = malloc (sizeof (size_t ) * comm_size );
598+ if (NULL == module -> sizes ) {
599+ free (rbuf );
600+ ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE ;
601+ goto error ;
602+ }
603+ module -> shmem_addrs = malloc (sizeof (uint64_t ) * comm_size );
604+ if (NULL == module -> shmem_addrs ) {
605+ free (module -> sizes );
606+ free (rbuf );
607+ ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE ;
608+ goto error ;
609+ }
610+
611+
612+ for (i = 0 , total = 0 ; i < comm_size ; ++ i ) {
613+ module -> sizes [i ] = rbuf [i ];
614+ if (module -> sizes [i ] || !module -> noncontig_shared_win ) {
615+ module -> shmem_addrs [i ] = ((uint64_t ) module -> segment_base ) + total ;
616+ total += rbuf [i ];
617+ } else {
618+ module -> shmem_addrs [i ] = (uint64_t )NULL ;
619+ }
620+ }
621+
622+ free (rbuf );
623+
624+ module -> size = module -> sizes [ompi_comm_rank (module -> comm )];
625+ * base = module -> shmem_addrs [ompi_comm_rank (module -> comm )];
626+ }
627+
628+ if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE ||
629+ flavor == MPI_WIN_FLAVOR_SHARED ) {
446630 switch (flavor ) {
447631 case MPI_WIN_FLAVOR_ALLOCATE :
448632 mem_type = OPAL_COMMON_UCX_MEM_ALLOCATE_MAP ;
449633 break ;
450634 case MPI_WIN_FLAVOR_CREATE :
451635 mem_type = OPAL_COMMON_UCX_MEM_MAP ;
452636 break ;
637+ case MPI_WIN_FLAVOR_SHARED :
638+ mem_type = OPAL_COMMON_UCX_MEM_MAP ;
639+ break ;
453640 }
454-
455- ret = opal_common_ucx_wpmem_create (module -> ctx , base , size ,
641+ ret = opal_common_ucx_wpmem_create (module -> ctx , base , module -> size ,
456642 mem_type , & exchange_len_info ,
457643 OPAL_COMMON_UCX_WPMEM_ADDR_EXCHANGE_FULL ,
458644 (void * )module -> comm ,
@@ -483,7 +669,8 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
483669 goto error ;
484670 }
485671
486- if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE ) {
672+ if (flavor == MPI_WIN_FLAVOR_ALLOCATE || flavor == MPI_WIN_FLAVOR_CREATE ||
673+ flavor == MPI_WIN_FLAVOR_SHARED ) {
487674 memcpy (my_info , base , sizeof (uint64_t ));
488675 } else {
489676 memcpy (my_info , & zero , sizeof (uint64_t ));
@@ -563,6 +750,9 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
563750 mca_osc_ucx_component .env_initialized = false;
564751 }
565752
753+ if (0 == ompi_comm_rank (module -> comm ) && unlink_needed ) {
754+ opal_shmem_unlink (& module -> seg_ds );
755+ }
566756 ompi_osc_ucx_unregister_progress ();
567757 return ret ;
568758}
@@ -700,6 +890,15 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
700890 return ret ;
701891 }
702892
893+ if (module -> flavor == MPI_WIN_FLAVOR_SHARED ) {
894+ if (module -> segment_base != NULL )
895+ opal_shmem_segment_detach (& module -> seg_ds );
896+ if (module -> shmem_addrs != NULL )
897+ free (module -> shmem_addrs );
898+ if (module -> sizes != NULL )
899+ free (module -> sizes );
900+ }
901+
703902 /* MPI_Win_free should detach any memory attached to dynamic windows */
704903 for (i = 0 ; i < module -> state .dynamic_win_count ; i ++ ) {
705904 assert (module -> local_dynamic_win_info [i ].refcnt == 1 );
0 commit comments