2727#define OMPI_OSC_UCX_ATTACH_MAX 48
2828#define OMPI_OSC_UCX_MEM_ADDR_MAX_LEN 1024
2929
30+
3031typedef struct ompi_osc_ucx_component {
3132 ompi_osc_base_component_t super ;
3233 opal_common_ucx_wpool_t * wpool ;
3334 bool enable_mpi_threads ;
3435 opal_free_list_t requests ; /* request free list for the r* communication variants */
36+ opal_free_list_t accumulate_requests ; /* request free list for the r* communication variants */
3537 bool env_initialized ; /* UCX environment is initialized or not */
36- int num_incomplete_req_ops ;
38+ int comm_world_size ;
39+ ucp_ep_h * endpoints ;
3740 int num_modules ;
3841 bool no_locks ; /* Default value of the no_locks info key for new windows */
3942 bool acc_single_intrinsic ;
@@ -44,6 +47,16 @@ typedef struct ompi_osc_ucx_component {
4447
4548OMPI_DECLSPEC extern ompi_osc_ucx_component_t mca_osc_ucx_component ;
4649
50+ #define OSC_UCX_INCREMENT_OUTSTANDING_NB_OPS (_module ) \
51+ do { \
52+ opal_atomic_add_fetch_size_t(&_module->ctx->num_incomplete_req_ops, 1); \
53+ } while(0);
54+
55+ #define OSC_UCX_DECREMENT_OUTSTANDING_NB_OPS (_module ) \
56+ do { \
57+ opal_atomic_add_fetch_size_t(&_module->ctx->num_incomplete_req_ops, -1); \
58+ } while(0);
59+
4760typedef enum ompi_osc_ucx_epoch {
4861 NONE_EPOCH ,
4962 FENCE_EPOCH ,
@@ -69,7 +82,8 @@ typedef struct ompi_osc_ucx_epoch_type {
6982#define OSC_UCX_STATE_COMPLETE_COUNT_OFFSET (sizeof(uint64_t) * 3)
7083#define OSC_UCX_STATE_POST_INDEX_OFFSET (sizeof(uint64_t) * 4)
7184#define OSC_UCX_STATE_POST_STATE_OFFSET (sizeof(uint64_t) * 5)
72- #define OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET (sizeof(uint64_t) * (5 + OMPI_OSC_UCX_POST_PEER_MAX))
85+ #define OSC_UCX_STATE_DYNAMIC_LOCK_OFFSET (sizeof(uint64_t) * 6)
86+ #define OSC_UCX_STATE_DYNAMIC_WIN_CNT_OFFSET (sizeof(uint64_t) * (6 + OMPI_OSC_UCX_POST_PEER_MAX))
7387
7488typedef struct ompi_osc_dynamic_win_info {
7589 uint64_t base ;
@@ -102,6 +116,7 @@ typedef struct ompi_osc_ucx_module {
102116 size_t size ;
103117 uint64_t * addrs ;
104118 uint64_t * state_addrs ;
119+ uint64_t * comm_world_ranks ;
105120 int disp_unit ; /* if disp_unit >= 0, then everyone has the same
106121 * disp unit size; if disp_unit == -1, then we
107122 * need to look at disp_units */
@@ -125,6 +140,7 @@ typedef struct ompi_osc_ucx_module {
125140 opal_common_ucx_wpmem_t * mem ;
126141 opal_common_ucx_wpmem_t * state_mem ;
127142
143+ bool skip_sync_check ;
128144 bool noncontig_shared_win ;
129145 size_t * sizes ;
130146 /* in shared windows, shmem_addrs can be used for direct load store to
@@ -147,9 +163,18 @@ typedef struct ompi_osc_ucx_lock {
147163 bool is_nocheck ;
148164} ompi_osc_ucx_lock_t ;
149165
150- #define OSC_UCX_GET_EP (comm_ , rank_ ) (ompi_comm_peer_lookup(comm_, rank_)->proc_endpoints[OMPI_PROC_ENDPOINT_TAG_UCX ])
166+ #define OSC_UCX_GET_EP (_module , rank_ ) (mca_osc_ucx_component.endpoints[_module->comm_world_ranks[rank_] ])
151167#define OSC_UCX_GET_DISP (module_ , rank_ ) ((module_->disp_unit < 0) ? module_->disp_units[rank_] : module_->disp_unit)
152168
169+ #define OSC_UCX_GET_DEFAULT_EP (_ep_ptr , _module , _target ) \
170+ if (opal_common_ucx_thread_enabled) { \
171+ _ep_ptr = NULL; \
172+ } else { \
173+ _ep_ptr = (ucp_ep_h *)&(OSC_UCX_GET_EP(_module, _target)); \
174+ }
175+
176+ extern size_t ompi_osc_ucx_outstanding_ops_flush_threshold ;
177+
153178int ompi_osc_ucx_shared_query (struct ompi_win_t * win , int rank , size_t * size ,
154179 int * disp_unit , void * baseptr );
155180int ompi_osc_ucx_win_attach (struct ompi_win_t * win , void * base , size_t len );
@@ -169,6 +194,11 @@ int ompi_osc_ucx_accumulate(const void *origin_addr, int origin_count,
169194 int target , ptrdiff_t target_disp , int target_count ,
170195 struct ompi_datatype_t * target_dt ,
171196 struct ompi_op_t * op , struct ompi_win_t * win );
197+ int ompi_osc_ucx_accumulate_nb (const void * origin_addr , int origin_count ,
198+ struct ompi_datatype_t * origin_dt ,
199+ int target , ptrdiff_t target_disp , int target_count ,
200+ struct ompi_datatype_t * target_dt ,
201+ struct ompi_op_t * op , struct ompi_win_t * win );
172202int ompi_osc_ucx_compare_and_swap (const void * origin_addr , const void * compare_addr ,
173203 void * result_addr , struct ompi_datatype_t * dt ,
174204 int target , ptrdiff_t target_disp ,
@@ -184,6 +214,13 @@ int ompi_osc_ucx_get_accumulate(const void *origin_addr, int origin_count,
184214 int target_rank , ptrdiff_t target_disp ,
185215 int target_count , struct ompi_datatype_t * target_datatype ,
186216 struct ompi_op_t * op , struct ompi_win_t * win );
217+ int ompi_osc_ucx_get_accumulate_nb (const void * origin_addr , int origin_count ,
218+ struct ompi_datatype_t * origin_datatype ,
219+ void * result_addr , int result_count ,
220+ struct ompi_datatype_t * result_datatype ,
221+ int target_rank , ptrdiff_t target_disp ,
222+ int target_count , struct ompi_datatype_t * target_datatype ,
223+ struct ompi_op_t * op , struct ompi_win_t * win );
187224int ompi_osc_ucx_rput (const void * origin_addr , int origin_count ,
188225 struct ompi_datatype_t * origin_dt ,
189226 int target , ptrdiff_t target_disp , int target_count ,
@@ -228,10 +265,7 @@ int ompi_osc_ucx_flush_local_all(struct ompi_win_t *win);
228265int ompi_osc_find_attached_region_position (ompi_osc_dynamic_win_info_t * dynamic_wins ,
229266 int min_index , int max_index ,
230267 uint64_t base , size_t len , int * insert );
231- extern inline bool ompi_osc_need_acc_lock (ompi_osc_ucx_module_t * module , int target );
232- extern inline int ompi_osc_state_lock (ompi_osc_ucx_module_t * module , int target ,
233- bool * lock_acquired , bool force_lock );
234- extern inline int ompi_osc_state_unlock (ompi_osc_ucx_module_t * module , int target ,
235- bool lock_acquired , void * free_ptr );
268+ int ompi_osc_ucx_dynamic_lock (ompi_osc_ucx_module_t * module , int target );
269+ int ompi_osc_ucx_dynamic_unlock (ompi_osc_ucx_module_t * module , int target );
236270
237271#endif /* OMPI_OSC_UCX_H */
0 commit comments