@@ -286,3 +286,151 @@ OPAL_DECLSPEC int opal_common_ucx_del_procs(opal_common_ucx_del_proc_t *procs, s
286286 return OPAL_SUCCESS ;
287287}
288288
289+ /***********************************************************************/
290+
291+ static inline void _cleanup_tlocal (void * arg )
292+ {
293+ // 1. Cleanup all rkeys in the window table
294+ // 2. Return all workers into the idle pool
295+ }
296+
297+ static ucp_worker_h _create_ctx_worker (opal_common_ucx_wpool_t * wpool )
298+ {
299+ ucp_worker_params_t worker_params ;
300+ ucp_worker_h worker ;
301+ ucs_status_t status ;
302+
303+ memset (& worker_params , 0 , sizeof (worker_params ));
304+ worker_params .field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE ;
305+ worker_params .thread_mode = UCS_THREAD_MODE_SINGLE ;
306+ status = ucp_worker_create (wpool -> ucp_ctx , & worker_params , & worker );
307+ if (UCS_OK != status ) {
308+ return NULL ;
309+ }
310+
311+ return worker ;
312+ }
313+
314+ static void _wpool_add_to_idle (opal_common_ucx_wpool_t * wpool ,
315+ _worker_engine_t * wkr )
316+ {
317+ _idle_list_item_t * item ;
318+
319+ if (wkr -> comm_size != 0 ) {
320+ int i ;
321+ for (i = 0 ; i < wkr -> comm_size ; i ++ ) {
322+ ucp_ep_destroy (wkr -> endpoints [i ]);
323+ }
324+ free (wkr -> endpoints );
325+ wkr -> endpoints = NULL ;
326+ wkr -> comm_size = 0 ;
327+ }
328+
329+ item = OBJ_NEW (_idle_list_item_t );
330+ item -> ptr = wkr ;
331+
332+ opal_mutex_lock (& wpool -> mutex );
333+ opal_list_append (& wpool -> idle_workers , & item -> super );
334+ opal_mutex_unlock (& wpool -> mutex );
335+ }
336+
337+ static _worker_engine_t * _wpool_remove_from_idle (opal_common_ucx_wpool_t * wpool )
338+ {
339+ _worker_engine_t * wkr = NULL ;
340+ _idle_list_item_t * item = NULL ;
341+
342+ opal_mutex_lock (& wpool -> mutex );
343+ if (!opal_list_is_empty (& wpool -> idle_workers )) {
344+ item = (_idle_list_item_t * )opal_list_get_first (& wpool -> idle_workers );
345+ opal_list_remove_item (& wpool -> idle_workers , & item -> super );
346+ }
347+ opal_mutex_unlock (& wpool -> mutex );
348+
349+ if (item != NULL ) {
350+ wkr = item -> ptr ;
351+ OBJ_RELEASE (item );
352+ }
353+
354+ return wkr ;
355+ }
356+
357+
358+ OPAL_DECLSPEC int opal_common_ucx_wpool_init (opal_common_ucx_wpool_t * wpool ,
359+ int proc_world_size ,
360+ ucp_request_init_callback_t req_init_ptr ,
361+ size_t req_size )
362+ {
363+ ucp_config_t * config = NULL ;
364+ ucp_params_t context_params ;
365+ _worker_engine_t * wkr ;
366+ ucs_status_t status ;
367+ int ret = OPAL_SUCCESS ;
368+
369+ wpool -> cur_ctxid = wpool -> cur_memid = 0 ;
370+ OBJ_CONSTRUCT (& wpool -> mutex , opal_mutex_t );
371+
372+ status = ucp_config_read ("MPI" , NULL , & config );
373+ if (UCS_OK != status ) {
374+ MCA_COMMON_UCX_VERBOSE (1 , "ucp_config_read failed: %d" , status );
375+ return OPAL_ERROR ;
376+ }
377+
378+ /* initialize UCP context */
379+ memset (& context_params , 0 , sizeof (context_params ));
380+ context_params .field_mask = UCP_PARAM_FIELD_FEATURES |
381+ UCP_PARAM_FIELD_MT_WORKERS_SHARED |
382+ UCP_PARAM_FIELD_ESTIMATED_NUM_EPS |
383+ UCP_PARAM_FIELD_REQUEST_INIT |
384+ UCP_PARAM_FIELD_REQUEST_SIZE ;
385+ context_params .features = UCP_FEATURE_RMA | UCP_FEATURE_AMO32 | UCP_FEATURE_AMO64 ;
386+ context_params .mt_workers_shared = 1 ;
387+ context_params .estimated_num_eps = proc_world_size ;
388+ context_params .request_init = req_init_ptr ;
389+ context_params .request_size = req_size ;
390+
391+ status = ucp_init (& context_params , config , & wpool -> ucp_ctx );
392+ ucp_config_release (config );
393+ if (UCS_OK != status ) {
394+ MCA_COMMON_UCX_VERBOSE (1 , "ucp_init failed: %d" , status );
395+ ret = OPAL_ERROR ;
396+ goto err_ucp_init ;
397+ }
398+
399+ /* create recv worker and add to idle pool */
400+ OBJ_CONSTRUCT (& wpool -> idle_workers , opal_list_t );
401+ wpool -> recv_worker = _create_ctx_worker (wpool );
402+ if (wpool -> recv_worker == NULL ) {
403+ MCA_COMMON_UCX_VERBOSE (1 , "_create_ctx_worker failed" );
404+ ret = OPAL_ERROR ;
405+ goto err_worker_create ;
406+ }
407+
408+ wkr = OBJ_NEW (_worker_engine_t );
409+ OBJ_CONSTRUCT (& wkr -> mutex , opal_mutex_t );
410+ wkr -> worker = wpool -> recv_worker ;
411+ wkr -> endpoints = NULL ;
412+ wkr -> comm_size = 0 ;
413+
414+ _wpool_add_to_idle (wpool , wkr );
415+
416+ status = ucp_worker_get_address (wpool -> recv_worker ,
417+ & wpool -> recv_waddr , & wpool -> recv_waddr_len );
418+ if (status != UCS_OK ) {
419+ MCA_COMMON_UCX_VERBOSE (1 , "ucp_worker_get_address failed: %d" , status );
420+ ret = OPAL_ERROR ;
421+ goto err_get_addr ;
422+ }
423+
424+ pthread_key_create (& _tlocal_key , _cleanup_tlocal );
425+
426+ return ret ;
427+
428+ err_get_addr :
429+ if (NULL != wpool -> recv_worker ) {
430+ ucp_worker_destroy (wpool -> recv_worker );
431+ }
432+ err_worker_create :
433+ ucp_cleanup (wpool -> ucp_ctx );
434+ err_ucp_init :
435+ return ret ;
436+ }
0 commit comments