2424 memcpy(((char*)(_dst)) + (_off), _src, _len); \
2525 (_off) += (_len);
2626
27+ opal_mutex_t mca_osc_service_mutex = OPAL_MUTEX_STATIC_INIT ;
28+ static void _osc_ucx_init_lock (void )
29+ {
30+ if (mca_osc_ucx_component .enable_mpi_threads ) {
31+ opal_mutex_lock (& mca_osc_service_mutex );
32+ }
33+ }
34+ static void _osc_ucx_init_unlock (void )
35+ {
36+ if (mca_osc_ucx_component .enable_mpi_threads ) {
37+ opal_mutex_unlock (& mca_osc_service_mutex );
38+ }
39+ }
40+
41+
2742static int component_open (void );
2843static int component_register (void );
2944static int component_init (bool enable_progress_threads , bool enable_mpi_threads );
@@ -192,6 +207,9 @@ static void ompi_osc_ucx_unregister_progress()
192207{
193208 int ret ;
194209
210+ /* May be called concurrently - protect */
211+ _osc_ucx_init_lock ();
212+
195213 mca_osc_ucx_component .num_modules -- ;
196214 OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules >= 0 );
197215 if (0 == mca_osc_ucx_component .num_modules ) {
@@ -200,6 +218,8 @@ static void ompi_osc_ucx_unregister_progress()
200218 OSC_UCX_VERBOSE (1 , "opal_progress_unregister failed: %d" , ret );
201219 }
202220 }
221+
222+ _osc_ucx_init_unlock ();
203223}
204224
205225static int component_select (struct ompi_win_t * win , void * * base , size_t size , int disp_unit ,
@@ -226,7 +246,14 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
226246 return OMPI_ERR_NOT_SUPPORTED ;
227247 }
228248
249+ /* May be called concurrently - protect */
250+ _osc_ucx_init_lock ();
251+
229252 if (mca_osc_ucx_component .env_initialized == false) {
253+ /* Lazy initialization of the global state.
254+ * As not all of the MPI applications are using One-Sided functionality
255+ * we don't want to initialize in the component_init()
256+ */
230257
231258 OBJ_CONSTRUCT (& mca_osc_ucx_component .requests , opal_free_list_t );
232259 ret = opal_free_list_init (& mca_osc_ucx_component .requests ,
@@ -236,30 +263,52 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
236263 0 , 0 , 8 , 0 , 8 , NULL , 0 , NULL , NULL , NULL );
237264 if (OMPI_SUCCESS != ret ) {
238265 OSC_UCX_VERBOSE (1 , "opal_free_list_init failed: %d" , ret );
239- goto error ;
266+ goto select_unlock ;
240267 }
241268
242269 ret = opal_common_ucx_wpool_init (mca_osc_ucx_component .wpool ,
243270 ompi_proc_world_size (),
244271 mca_osc_ucx_component .enable_mpi_threads );
245272 if (OMPI_SUCCESS != ret ) {
246273 OSC_UCX_VERBOSE (1 , "opal_common_ucx_wpool_init failed: %d" , ret );
247- goto error ;
274+ goto select_unlock ;
248275 }
249276
277+ /* Make sure that all memory updates performed above are globally
278+ * observable before (mca_osc_ucx_component.env_initialized = true)
279+ */
250280 mca_osc_ucx_component .env_initialized = true;
251281 env_initialized = true;
252282 }
253283
284+ /* Account for the number of active "modules" = MPI windows */
285+ mca_osc_ucx_component .num_modules ++ ;
286+
287+ /* If this is the first window to be registered - register the progress
288+ * callback
289+ */
290+ OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules > 0 );
291+ if (1 == mca_osc_ucx_component .num_modules ) {
292+ ret = opal_progress_register (progress_callback );
293+ if (OMPI_SUCCESS != ret ) {
294+ OSC_UCX_VERBOSE (1 , "opal_progress_register failed: %d" , ret );
295+ goto error ;
296+ }
297+ }
298+
299+ select_unlock :
300+ _osc_ucx_init_unlock ();
301+ if (ret ) {
302+ goto error ;
303+ }
304+
254305 /* create module structure */
255306 module = (ompi_osc_ucx_module_t * )calloc (1 , sizeof (ompi_osc_ucx_module_t ));
256307 if (module == NULL ) {
257308 ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE ;
258309 goto error_nomem ;
259310 }
260311
261- mca_osc_ucx_component .num_modules ++ ;
262-
263312 /* fill in the function pointer part */
264313 memcpy (module , & ompi_osc_ucx_module_template , sizeof (ompi_osc_base_module_t ));
265314
@@ -413,19 +462,15 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
413462 goto error ;
414463 }
415464
416- OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules > 0 );
417- if (1 == mca_osc_ucx_component .num_modules ) {
418- ret = opal_progress_register (progress_callback );
419- if (OMPI_SUCCESS != ret ) {
420- OSC_UCX_VERBOSE (1 , "opal_progress_register failed: %d" , ret );
421- goto error ;
422- }
423- }
424465 return ret ;
425466
426- error :
467+ error :
427468 if (module -> disp_units ) free (module -> disp_units );
428469 if (module -> comm ) ompi_comm_free (& module -> comm );
470+ /* We update the modules count and (if need) registering a callback right
471+ * prior to memory allocation for the module.
472+ * So we use it as an indirect sign here
473+ */
429474 if (module ) {
430475 free (module );
431476 ompi_osc_ucx_unregister_progress ();
0 commit comments