2727 return OMPI_ERROR; \
2828 }
2929
30- #define CHECK_DYNAMIC_WIN (_remote_addr , _module , _target , _ret ) \
31- if (_module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { \
32- _ret = get_dynamic_win_info(_remote_addr, _module, _target); \
33- if (_ret != OMPI_SUCCESS) { \
34- return _ret; \
35- } \
30+ #define CHECK_DYNAMIC_WIN (_remote_addr , _module , _target , _ret , _lock_required ) \
31+ if (_module->flavor == MPI_WIN_FLAVOR_DYNAMIC) { \
32+ _ret = get_dynamic_win_info(_remote_addr, _module, _target, _lock_required); \
33+ if (_ret != OMPI_SUCCESS) { \
34+ return _ret; \
35+ } \
3636 }
3737
3838typedef struct ucx_iovec {
@@ -251,89 +251,8 @@ static inline int ddt_put_get(ompi_osc_ucx_module_t *module,
251251 return ret ;
252252}
253253
254- static inline bool need_acc_lock (ompi_osc_ucx_module_t * module , int target )
255- {
256- ompi_osc_ucx_lock_t * lock = NULL ;
257- opal_hash_table_get_value_uint32 (& module -> outstanding_locks ,
258- (uint32_t ) target , (void * * ) & lock );
259-
260- /* if there is an exclusive lock there is no need to acqurie the accumulate lock */
261- return !(NULL != lock && LOCK_EXCLUSIVE == lock -> type );
262- }
263-
264- static inline int start_atomicity (
265- ompi_osc_ucx_module_t * module ,
266- int target ,
267- bool * lock_acquired ) {
268- uint64_t result_value = -1 ;
269- uint64_t remote_addr = (module -> state_addrs )[target ] + OSC_UCX_STATE_ACC_LOCK_OFFSET ;
270- int ret = OMPI_SUCCESS ;
271-
272- if (need_acc_lock (module , target )) {
273- for (;;) {
274- ret = opal_common_ucx_wpmem_cmpswp (module -> state_mem ,
275- TARGET_LOCK_UNLOCKED , TARGET_LOCK_EXCLUSIVE ,
276- target , & result_value , sizeof (result_value ),
277- remote_addr );
278- if (ret != OMPI_SUCCESS ) {
279- OSC_UCX_VERBOSE (1 , "opal_common_ucx_mem_cmpswp failed: %d" , ret );
280- return OMPI_ERROR ;
281- }
282- if (result_value == TARGET_LOCK_UNLOCKED ) {
283- break ;
284- }
285-
286- opal_common_ucx_wpool_progress (mca_osc_ucx_component .wpool );
287- }
288-
289- * lock_acquired = true;
290- } else {
291- * lock_acquired = false;
292- }
293-
294- return OMPI_SUCCESS ;
295- }
296-
297- static inline int end_atomicity (
298- ompi_osc_ucx_module_t * module ,
299- int target ,
300- bool lock_acquired ,
301- void * free_ptr ) {
302- uint64_t remote_addr = (module -> state_addrs )[target ] + OSC_UCX_STATE_ACC_LOCK_OFFSET ;
303- int ret = OMPI_SUCCESS ;
304-
305- if (lock_acquired ) {
306- uint64_t result_value = 0 ;
307- /* fence any still active operations */
308- ret = opal_common_ucx_wpmem_fence (module -> mem );
309- if (ret != OMPI_SUCCESS ) {
310- OSC_UCX_VERBOSE (1 , "opal_common_ucx_mem_fence failed: %d" , ret );
311- return OMPI_ERROR ;
312- }
313-
314- ret = opal_common_ucx_wpmem_fetch (module -> state_mem ,
315- UCP_ATOMIC_FETCH_OP_SWAP , TARGET_LOCK_UNLOCKED ,
316- target , & result_value , sizeof (result_value ),
317- remote_addr );
318- assert (result_value == TARGET_LOCK_EXCLUSIVE );
319- } else if (NULL != free_ptr ){
320- /* flush before freeing the buffer */
321- ret = opal_common_ucx_ctx_flush (module -> ctx , OPAL_COMMON_UCX_SCOPE_EP , target );
322- }
323- /* TODO: encapsulate in a request and make the release non-blocking */
324- if (NULL != free_ptr ) {
325- free (free_ptr );
326- }
327- if (ret != OMPI_SUCCESS ) {
328- OSC_UCX_VERBOSE (1 , "opal_common_ucx_mem_fetch failed: %d" , ret );
329- return OMPI_ERROR ;
330- }
331-
332- return ret ;
333- }
334-
335254static inline int get_dynamic_win_info (uint64_t remote_addr , ompi_osc_ucx_module_t * module ,
336- int target ) {
255+ int target , bool lock_required ) {
337256 uint64_t remote_state_addr = (module -> state_addrs )[target ] + OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET ;
338257 size_t remote_state_len = sizeof (uint64_t ) + sizeof (ompi_osc_dynamic_win_info_t ) * OMPI_OSC_UCX_ATTACH_MAX ;
339258 char * temp_buf = calloc (remote_state_len , 1 );
@@ -343,6 +262,17 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
343262 int insert = -1 ;
344263 int ret ;
345264
265+ bool lock_acquired = false;
266+ if (lock_required ) {
267+ /* We need to lock acc-lock even if the process has an exclusive lock.
268+ * Therefore, force lock is needed. Remote process protects its window
269+ * attach/detach operations with an acc-lock */
270+ ret = ompi_osc_state_lock (module , target , & lock_acquired , true);
271+ if (ret != OMPI_SUCCESS ) {
272+ return ret ;
273+ }
274+ }
275+
346276 ret = opal_common_ucx_wpmem_putget (module -> state_mem , OPAL_COMMON_UCX_GET , target ,
347277 (void * )((intptr_t )temp_buf ),
348278 remote_state_len , remote_state_addr );
@@ -360,32 +290,36 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
360290
361291 memcpy (& win_count , temp_buf , sizeof (uint64_t ));
362292 if (win_count > OMPI_OSC_UCX_ATTACH_MAX ) {
363- return MPI_ERR_RMA_RANGE ;
293+ ret = MPI_ERR_RMA_RANGE ;
294+ goto cleanup ;
364295 }
365296
366297 temp_dynamic_wins = (ompi_osc_dynamic_win_info_t * )(temp_buf + sizeof (uint64_t ));
367298 contain = ompi_osc_find_attached_region_position (temp_dynamic_wins , 0 , win_count - 1 ,
368299 remote_addr , 1 , & insert );
369- if (contain < 0 || contain >= win_count ) {
370- return MPI_ERR_RMA_RANGE ;
300+ if (contain < 0 || contain >= (int )win_count ) {
301+ OSC_UCX_ERROR ("Dynamic window index not found contain: %d win_count: %d\n" ,
302+ contain , win_count );
303+ ret = MPI_ERR_RMA_RANGE ;
304+ goto cleanup ;
371305 }
372306
373307 assert (module -> mem != NULL );
374308
375309 _mem_record_t * mem_rec = NULL ;
376310 ret = opal_tsd_tracked_key_get (& module -> mem -> tls_key , (void * * ) & mem_rec );
377311 if (OPAL_SUCCESS != ret ) {
378- return ret ;
312+ goto cleanup ;
379313 }
380314
381315 if (mem_rec == NULL ) {
382316 ret = opal_common_ucx_tlocal_fetch_spath (module -> mem , target );
383317 if (OPAL_SUCCESS != ret ) {
384- return ret ;
318+ goto cleanup ;
385319 }
386320 ret = opal_tsd_tracked_key_get (& module -> mem -> tls_key , (void * * ) & mem_rec );
387321 if (OPAL_SUCCESS != ret ) {
388- return ret ;
322+ goto cleanup ;
389323 }
390324
391325 }
@@ -408,12 +342,15 @@ static inline int get_dynamic_win_info(uint64_t remote_addr, ompi_osc_ucx_module
408342
409343 if (ret != UCS_OK ) {
410344 MCA_COMMON_UCX_VERBOSE (1 , "ucp_ep_rkey_unpack failed: %d" , ret );
411- return OPAL_ERROR ;
345+ ret = OPAL_ERROR ;
346+ goto cleanup ;
412347 }
413348
414349cleanup :
415350 free (temp_buf );
416351
352+ ompi_osc_state_unlock (module , target , lock_acquired , NULL );
353+
417354 return ret ;
418355}
419356
@@ -486,7 +423,7 @@ static int do_atomic_op_intrinsic(
486423
487424 uint64_t remote_addr = (module -> addrs [target ]) + target_disp * OSC_UCX_GET_DISP (module , target );
488425
489- CHECK_DYNAMIC_WIN (remote_addr , module , target , ret );
426+ CHECK_DYNAMIC_WIN (remote_addr , module , target , ret , true );
490427
491428 ucp_atomic_fetch_op_t opcode ;
492429 bool is_no_op = false;
@@ -555,7 +492,7 @@ int ompi_osc_ucx_put(const void *origin_addr, int origin_count, struct ompi_data
555492 return ret ;
556493 }
557494
558- CHECK_DYNAMIC_WIN (remote_addr , module , target , ret );
495+ CHECK_DYNAMIC_WIN (remote_addr , module , target , ret , true );
559496
560497 if (!target_count ) {
561498 return OMPI_SUCCESS ;
@@ -605,13 +542,12 @@ int ompi_osc_ucx_get(void *origin_addr, int origin_count,
605542 return ret ;
606543 }
607544
608- CHECK_DYNAMIC_WIN (remote_addr , module , target , ret );
545+ CHECK_DYNAMIC_WIN (remote_addr , module , target , ret , true );
609546
610547 if (!target_count ) {
611548 return OMPI_SUCCESS ;
612549 }
613550
614-
615551 ompi_datatype_get_true_extent (origin_dt , & origin_lb , & origin_extent );
616552 ompi_datatype_get_true_extent (target_dt , & target_lb , & target_extent );
617553
@@ -673,13 +609,14 @@ int accumulate_req(const void *origin_addr, int origin_count,
673609 target_disp , NULL , ucx_req );
674610 }
675611
676- CHECK_DYNAMIC_WIN (remote_addr , module , target , ret );
677-
678- ret = start_atomicity (module , target , & lock_acquired );
612+ /* Start atomicity by acquiring acc lock */
613+ ret = ompi_osc_state_lock (module , target , & lock_acquired , false);
679614 if (ret != OMPI_SUCCESS ) {
680615 return ret ;
681616 }
682617
618+ CHECK_DYNAMIC_WIN (remote_addr , module , target , ret , !lock_acquired );
619+
683620 if (op == & ompi_mpi_op_replace .op ) {
684621 ret = ompi_osc_ucx_put (origin_addr , origin_count , origin_dt , target ,
685622 target_disp , target_count , target_dt , win );
@@ -781,7 +718,7 @@ int accumulate_req(const void *origin_addr, int origin_count,
781718 ompi_request_complete (& ucx_req -> super , true);
782719 }
783720
784- return end_atomicity (module , target , lock_acquired , free_ptr );
721+ return ompi_osc_state_unlock (module , target , lock_acquired , free_ptr );
785722}
786723
787724int ompi_osc_ucx_accumulate (const void * origin_addr , int origin_count ,
@@ -804,13 +741,14 @@ do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr,
804741 size_t dt_bytes ;
805742 opal_common_ucx_wpmem_t * mem = module -> mem ;
806743 if (!module -> acc_single_intrinsic ) {
807- ret = start_atomicity (module , target , & lock_acquired );
744+ /* Start atomicity by acquiring acc lock */
745+ ret = ompi_osc_state_lock (module , target , & lock_acquired , false);
808746 if (ret != OMPI_SUCCESS ) {
809747 return ret ;
810748 }
811749 }
812750
813- CHECK_DYNAMIC_WIN (remote_addr , module , target , ret );
751+ CHECK_DYNAMIC_WIN (remote_addr , module , target , ret , ! lock_acquired );
814752
815753 ompi_datatype_type_size (dt , & dt_bytes );
816754 uint64_t compare_val = opal_common_ucx_load_uint64 (compare_addr , dt_bytes );
@@ -823,7 +761,7 @@ do_atomic_compare_and_swap(const void *origin_addr, const void *compare_addr,
823761 return ret ;
824762 }
825763
826- return end_atomicity (module , target , lock_acquired , NULL );
764+ return ompi_osc_state_unlock (module , target , lock_acquired , NULL );
827765}
828766
829767int ompi_osc_ucx_compare_and_swap (const void * origin_addr , const void * compare_addr ,
@@ -842,8 +780,6 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
842780 return ret ;
843781 }
844782
845- CHECK_DYNAMIC_WIN (remote_addr , module , target , ret );
846-
847783 ompi_datatype_type_size (dt , & dt_bytes );
848784 if (ompi_osc_base_is_atomic_size_supported (remote_addr , dt_bytes )) {
849785 // fast path using UCX atomic operations
@@ -854,11 +790,14 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
854790
855791 /* fall back to get-compare-put */
856792
857- ret = start_atomicity (module , target , & lock_acquired );
793+ /* Start atomicity by acquiring acc lock */
794+ ret = ompi_osc_state_lock (module , target , & lock_acquired , false);
858795 if (ret != OMPI_SUCCESS ) {
859796 return ret ;
860797 }
861798
799+ CHECK_DYNAMIC_WIN (remote_addr , module , target , ret , !lock_acquired );
800+
862801 ret = opal_common_ucx_wpmem_putget (mem , OPAL_COMMON_UCX_GET , target ,
863802 result_addr , dt_bytes , remote_addr );
864803 if (OPAL_SUCCESS != ret ) {
@@ -881,7 +820,7 @@ int ompi_osc_ucx_compare_and_swap(const void *origin_addr, const void *compare_a
881820 }
882821 }
883822
884- return end_atomicity (module , target , lock_acquired , NULL );
823+ return ompi_osc_state_unlock (module , target , lock_acquired , NULL );
885824}
886825
887826int ompi_osc_ucx_fetch_and_op (const void * origin_addr , void * result_addr ,
@@ -907,13 +846,14 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
907846 bool lock_acquired = false;
908847
909848 if (!module -> acc_single_intrinsic ) {
910- ret = start_atomicity (module , target , & lock_acquired );
849+ /* Start atomicity by acquiring acc lock */
850+ ret = ompi_osc_state_lock (module , target , & lock_acquired , false);
911851 if (ret != OMPI_SUCCESS ) {
912852 return ret ;
913853 }
914854 }
915855
916- CHECK_DYNAMIC_WIN (remote_addr , module , target , ret );
856+ CHECK_DYNAMIC_WIN (remote_addr , module , target , ret , ! lock_acquired );
917857
918858 value = origin_addr ? opal_common_ucx_load_uint64 (origin_addr , dt_bytes ) : 0 ;
919859
@@ -934,7 +874,7 @@ int ompi_osc_ucx_fetch_and_op(const void *origin_addr, void *result_addr,
934874 return ret ;
935875 }
936876
937- return end_atomicity (module , target , lock_acquired , NULL );
877+ return ompi_osc_state_unlock (module , target , lock_acquired , NULL );
938878 } else {
939879 return ompi_osc_ucx_get_accumulate (origin_addr , 1 , dt , result_addr , 1 , dt ,
940880 target , target_disp , 1 , dt , op , win );
@@ -970,13 +910,14 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
970910 target_disp , result_addr , ucx_req );
971911 }
972912
973- CHECK_DYNAMIC_WIN (remote_addr , module , target , ret );
974-
975- ret = start_atomicity (module , target , & lock_acquired );
913+ /* Start atomicity by acquiring acc lock */
914+ ret = ompi_osc_state_lock (module , target , & lock_acquired , false);
976915 if (ret != OMPI_SUCCESS ) {
977916 return ret ;
978917 }
979918
919+ CHECK_DYNAMIC_WIN (remote_addr , module , target , ret , !lock_acquired );
920+
980921 ret = ompi_osc_ucx_get (result_addr , result_count , result_dt , target ,
981922 target_disp , target_count , target_dt , win );
982923 if (ret != OMPI_SUCCESS ) {
@@ -1087,7 +1028,7 @@ int get_accumulate_req(const void *origin_addr, int origin_count,
10871028 }
10881029
10891030
1090- return end_atomicity (module , target , lock_acquired , free_addr );
1031+ return ompi_osc_state_unlock (module , target , lock_acquired , free_addr );
10911032}
10921033
10931034int ompi_osc_ucx_get_accumulate (const void * origin_addr , int origin_count ,
@@ -1119,7 +1060,7 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
11191060 return ret ;
11201061 }
11211062
1122- CHECK_DYNAMIC_WIN (remote_addr , module , target , ret );
1063+ CHECK_DYNAMIC_WIN (remote_addr , module , target , ret , true );
11231064
11241065 OMPI_OSC_UCX_REQUEST_ALLOC (win , ucx_req );
11251066 assert (NULL != ucx_req );
@@ -1175,7 +1116,7 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
11751116 return ret ;
11761117 }
11771118
1178- CHECK_DYNAMIC_WIN (remote_addr , module , target , ret );
1119+ CHECK_DYNAMIC_WIN (remote_addr , module , target , ret , true );
11791120
11801121 OMPI_OSC_UCX_REQUEST_ALLOC (win , ucx_req );
11811122 assert (NULL != ucx_req );
0 commit comments