@@ -32,6 +32,8 @@ __thread int initialized = 0;
3232#endif
3333
3434bool opal_common_ucx_thread_enabled = false;
35+ opal_atomic_int64_t opal_common_ucx_ep_counts = 0 ;
36+ opal_atomic_int64_t opal_common_ucx_unpacked_rkey_counts = 0 ;
3537
3638static _ctx_record_t * _tlocal_add_ctx_rec (opal_common_ucx_ctx_t * ctx );
3739static inline _ctx_record_t * _tlocal_get_ctx_rec (opal_tsd_tracked_key_t tls_key );
@@ -102,6 +104,7 @@ static void _winfo_destructor(opal_common_ucx_winfo_t *winfo)
102104 for (i = 0 ; i < winfo -> comm_size ; i ++ ) {
103105 if (NULL != winfo -> endpoints [i ]) {
104106 ucp_ep_destroy (winfo -> endpoints [i ]);
107+ OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD (opal_common_ucx_ep_counts , -1 );
105108 }
106109 assert (winfo -> inflight_ops [i ] == 0 );
107110 }
@@ -326,9 +329,26 @@ static opal_common_ucx_winfo_t *_wpool_get_winfo(opal_common_ucx_wpool_t *wpool,
326329 return winfo ;
327330}
328331
332+ /* Remove the winfo from active workers and add it to idle workers */
329333static void _wpool_put_winfo (opal_common_ucx_wpool_t * wpool , opal_common_ucx_winfo_t * winfo )
330334{
331335 opal_mutex_lock (& wpool -> mutex );
336+ if (winfo -> comm_size != 0 ) {
337+ size_t i ;
338+ if (opal_common_ucx_thread_enabled ) {
339+ for (i = 0 ; i < winfo -> comm_size ; i ++ ) {
340+ if (NULL != winfo -> endpoints [i ]) {
341+ ucp_ep_destroy (winfo -> endpoints [i ]);
342+ OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD (opal_common_ucx_ep_counts , -1 );
343+ }
344+ assert (winfo -> inflight_ops [i ] == 0 );
345+ }
346+ }
347+ free (winfo -> endpoints );
348+ free (winfo -> inflight_ops );
349+ }
350+ winfo -> endpoints = NULL ;
351+ winfo -> comm_size = 0 ;
332352 opal_list_remove_item (& wpool -> active_workers , & winfo -> super );
333353 opal_list_prepend (& wpool -> idle_workers , & winfo -> super );
334354 opal_mutex_unlock (& wpool -> mutex );
@@ -632,6 +652,7 @@ static int _tlocal_ctx_connect(_ctx_record_t *ctx_rec, int target)
632652 memset (& ep_params , 0 , sizeof (ucp_ep_params_t ));
633653 ep_params .field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS ;
634654
655+ assert (winfo -> endpoints [target ] == NULL );
635656 opal_mutex_lock (& winfo -> mutex );
636657 displ = gctx -> recv_worker_displs [target ];
637658 ep_params .address = (ucp_address_t * ) & (gctx -> recv_worker_addrs [displ ]);
@@ -641,7 +662,9 @@ static int _tlocal_ctx_connect(_ctx_record_t *ctx_rec, int target)
641662 opal_mutex_unlock (& winfo -> mutex );
642663 return OPAL_ERROR ;
643664 }
665+ OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD (opal_common_ucx_ep_counts , 1 );
644666 opal_mutex_unlock (& winfo -> mutex );
667+ assert (winfo -> endpoints [target ] != NULL );
645668 return OPAL_SUCCESS ;
646669}
647670
@@ -662,6 +685,7 @@ static void _tlocal_mem_rec_cleanup(_mem_record_t *mem_rec)
662685 for (i = 0 ; i < mem_rec -> gmem -> ctx -> comm_size ; i ++ ) {
663686 if (mem_rec -> rkeys [i ]) {
664687 ucp_rkey_destroy (mem_rec -> rkeys [i ]);
688+ OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD (opal_common_ucx_unpacked_rkey_counts , -1 );
665689 }
666690 }
667691 opal_mutex_unlock (& mem_rec -> winfo -> mutex );
@@ -701,6 +725,7 @@ static int _tlocal_mem_create_rkey(_mem_record_t *mem_rec, ucp_ep_h ep, int targ
701725
702726 opal_mutex_lock (& mem_rec -> winfo -> mutex );
703727 status = ucp_ep_rkey_unpack (ep , & gmem -> mem_addrs [displ ], & mem_rec -> rkeys [target ]);
728+ OPAL_COMMON_UCX_DEBUG_ATOMIC_ADD (opal_common_ucx_unpacked_rkey_counts , 1 );
704729 opal_mutex_unlock (& mem_rec -> winfo -> mutex );
705730 if (status != UCS_OK ) {
706731 MCA_COMMON_UCX_VERBOSE (1 , "ucp_ep_rkey_unpack failed: %d" , status );
0 commit comments