2121 memcpy(((char*)(_dst)) + (_off), _src, _len); \
2222 (_off) += (_len);
2323
24+ opal_mutex_t mca_osc_service_mutex = OPAL_MUTEX_STATIC_INIT ;
25+ static void _osc_ucx_init_lock (void )
26+ {
27+ if (mca_osc_ucx_component .enable_mpi_threads ) {
28+ opal_mutex_lock (& mca_osc_service_mutex );
29+ }
30+ }
31+ static void _osc_ucx_init_unlock (void )
32+ {
33+ if (mca_osc_ucx_component .enable_mpi_threads ) {
34+ opal_mutex_unlock (& mca_osc_service_mutex );
35+ }
36+ }
37+
2438static int component_open (void );
2539static int component_register (void );
2640static int component_init (bool enable_progress_threads , bool enable_mpi_threads );
@@ -254,6 +268,9 @@ static void ompi_osc_ucx_unregister_progress()
254268{
255269 int ret ;
256270
271+ /* May be called concurrently - protect */
272+ _osc_ucx_init_lock ();
273+
257274 mca_osc_ucx_component .num_modules -- ;
258275 OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules >= 0 );
259276 if (0 == mca_osc_ucx_component .num_modules ) {
@@ -262,6 +279,8 @@ static void ompi_osc_ucx_unregister_progress()
262279 OSC_UCX_VERBOSE (1 , "opal_progress_unregister failed: %d" , ret );
263280 }
264281 }
282+
283+ _osc_ucx_init_unlock ();
265284}
266285
267286static int component_select (struct ompi_win_t * win , void * * base , size_t size , int disp_unit ,
@@ -295,6 +314,8 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
295314 return OMPI_ERR_NOT_SUPPORTED ;
296315 }
297316
317+ _osc_ucx_init_lock ();
318+
298319 if (mca_osc_ucx_component .env_initialized == false) {
299320 ucp_config_t * config = NULL ;
300321 ucp_params_t context_params ;
@@ -304,7 +325,8 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
304325 status = ucp_config_read ("MPI" , NULL , & config );
305326 if (UCS_OK != status ) {
306327 OSC_UCX_VERBOSE (1 , "ucp_config_read failed: %d" , status );
307- return OMPI_ERROR ;
328+ ret = OMPI_ERROR ;
329+ goto select_unlock ;
308330 }
309331
310332 OBJ_CONSTRUCT (& mca_osc_ucx_component .requests , opal_free_list_t );
@@ -315,7 +337,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
315337 0 , 0 , 8 , 0 , 8 , NULL , 0 , NULL , NULL , NULL );
316338 if (OMPI_SUCCESS != ret ) {
317339 OSC_UCX_VERBOSE (1 , "opal_free_list_init failed: %d" , ret );
318- goto error ;
340+ goto select_unlock ;
319341 }
320342
321343 /* initialize UCP context */
@@ -337,7 +359,7 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
337359 if (UCS_OK != status ) {
338360 OSC_UCX_VERBOSE (1 , "ucp_init failed: %d" , status );
339361 ret = OMPI_ERROR ;
340- goto error ;
362+ goto select_unlock ;
341363 }
342364
343365 assert (mca_osc_ucx_component .ucp_worker == NULL );
@@ -349,29 +371,53 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
349371 & (mca_osc_ucx_component .ucp_worker ));
350372 if (UCS_OK != status ) {
351373 OSC_UCX_VERBOSE (1 , "ucp_worker_create failed: %d" , status );
352- ret = OMPI_ERROR ;
353- goto error_nomem ;
374+ ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE ;
375+ goto select_unlock ;
354376 }
355377
356378 /* query UCP worker attributes */
357379 worker_attr .field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE ;
358380 status = ucp_worker_query (mca_osc_ucx_component .ucp_worker , & worker_attr );
359381 if (UCS_OK != status ) {
360382 OSC_UCX_VERBOSE (1 , "ucp_worker_query failed: %d" , status );
361- ret = OMPI_ERROR ;
362- goto error_nomem ;
383+ ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE ;
384+ goto select_unlock ;
363385 }
364386
365387 if (mca_osc_ucx_component .enable_mpi_threads == true &&
366388 worker_attr .thread_mode != UCS_THREAD_MODE_MULTI ) {
367389 OSC_UCX_VERBOSE (1 , "ucx does not support multithreading" );
368- ret = OMPI_ERROR ;
369- goto error_nomem ;
390+ ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE ;
391+ goto select_unlock ;
370392 }
371393
372394 mca_osc_ucx_component .env_initialized = true;
373395 env_initialized = true;
374396 }
397+
398+ mca_osc_ucx_component .num_modules ++ ;
399+
400+ OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules > 0 );
401+ if (1 == mca_osc_ucx_component .num_modules ) {
402+ ret = opal_progress_register (progress_callback );
403+ if (OMPI_SUCCESS != ret ) {
404+ OSC_UCX_VERBOSE (1 , "opal_progress_register failed: %d" , ret );
405+ goto select_unlock ;
406+ }
407+ }
408+
409+ select_unlock :
410+ _osc_ucx_init_unlock ();
411+ switch (ret ) {
412+ case OMPI_SUCCESS :
413+ break ;
414+ case OMPI_ERROR :
415+ goto error ;
416+ case OMPI_ERR_TEMP_OUT_OF_RESOURCE :
417+ goto error_nomem ;
418+ default :
419+ goto error ;
420+ }
375421
376422 /* create module structure */
377423 module = (ompi_osc_ucx_module_t * )calloc (1 , sizeof (ompi_osc_ucx_module_t ));
@@ -380,7 +426,6 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
380426 goto error_nomem ;
381427 }
382428
383- mca_osc_ucx_component .num_modules ++ ;
384429
385430 /* fill in the function pointer part */
386431 memcpy (module , & ompi_osc_ucx_module_template , sizeof (ompi_osc_base_module_t ));
@@ -648,14 +693,6 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
648693 goto error ;
649694 }
650695
651- OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules > 0 );
652- if (1 == mca_osc_ucx_component .num_modules ) {
653- ret = opal_progress_register (progress_callback );
654- if (OMPI_SUCCESS != ret ) {
655- OSC_UCX_VERBOSE (1 , "opal_progress_register failed: %d" , ret );
656- goto error ;
657- }
658- }
659696 return ret ;
660697
661698 error :
0 commit comments