2424 memcpy(((char*)(_dst)) + (_off), _src, _len); \
2525 (_off) += (_len);
2626
27+ opal_mutex_t mca_osc_service_mutex = OPAL_MUTEX_STATIC_INIT ;
28+ static void _osc_ucx_init_lock (void )
29+ {
30+ if (mca_osc_ucx_component .enable_mpi_threads ) {
31+ opal_mutex_lock (& mca_osc_service_mutex );
32+ }
33+ }
34+ static void _osc_ucx_init_unlock (void )
35+ {
36+ if (mca_osc_ucx_component .enable_mpi_threads ) {
37+ opal_mutex_unlock (& mca_osc_service_mutex );
38+ }
39+ }
40+
41+
2742static int component_open (void );
2843static int component_register (void );
2944static int component_init (bool enable_progress_threads , bool enable_mpi_threads );
@@ -141,6 +156,9 @@ static int component_init(bool enable_progress_threads, bool enable_mpi_threads)
141156
142157static int component_finalize (void ) {
143158 opal_common_ucx_mca_deregister ();
159+ if (mca_osc_ucx_component .env_initialized ) {
160+ opal_common_ucx_wpool_finalize (mca_osc_ucx_component .wpool );
161+ }
144162 opal_common_ucx_wpool_free (mca_osc_ucx_component .wpool );
145163 return OMPI_SUCCESS ;
146164}
@@ -189,6 +207,9 @@ static void ompi_osc_ucx_unregister_progress()
189207{
190208 int ret ;
191209
210+ /* May be called concurrently - protect */
211+ _osc_ucx_init_lock ();
212+
192213 mca_osc_ucx_component .num_modules -- ;
193214 OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules >= 0 );
194215 if (0 == mca_osc_ucx_component .num_modules ) {
@@ -197,6 +218,8 @@ static void ompi_osc_ucx_unregister_progress()
197218 OSC_UCX_VERBOSE (1 , "opal_progress_unregister failed: %d" , ret );
198219 }
199220 }
221+
222+ _osc_ucx_init_unlock ();
200223}
201224
202225static int component_select (struct ompi_win_t * win , void * * base , size_t size , int disp_unit ,
@@ -223,7 +246,14 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
223246 return OMPI_ERR_NOT_SUPPORTED ;
224247 }
225248
249+ /* May be called concurrently - protect */
250+ _osc_ucx_init_lock ();
251+
226252 if (mca_osc_ucx_component .env_initialized == false) {
253+ /* Lazy initialization of the global state.
254+ * As not all of the MPI applications are using One-Sided functionality
255+ * we don't want to initialize in the component_init()
256+ */
227257
228258 OBJ_CONSTRUCT (& mca_osc_ucx_component .requests , opal_free_list_t );
229259 ret = opal_free_list_init (& mca_osc_ucx_component .requests ,
@@ -233,30 +263,52 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
233263 0 , 0 , 8 , 0 , 8 , NULL , 0 , NULL , NULL , NULL );
234264 if (OMPI_SUCCESS != ret ) {
235265 OSC_UCX_VERBOSE (1 , "opal_free_list_init failed: %d" , ret );
236- goto error ;
266+ goto select_unlock ;
237267 }
238268
239269 ret = opal_common_ucx_wpool_init (mca_osc_ucx_component .wpool ,
240270 ompi_proc_world_size (),
241271 mca_osc_ucx_component .enable_mpi_threads );
242272 if (OMPI_SUCCESS != ret ) {
243273 OSC_UCX_VERBOSE (1 , "opal_common_ucx_wpool_init failed: %d" , ret );
244- goto error ;
274+ goto select_unlock ;
245275 }
246276
277+ /* Make sure that all memory updates performed above are globally
278+ * observable before (mca_osc_ucx_component.env_initialized = true)
279+ */
247280 mca_osc_ucx_component .env_initialized = true;
248281 env_initialized = true;
249282 }
250283
284+ /* Account for the number of active "modules" = MPI windows */
285+ mca_osc_ucx_component .num_modules ++ ;
286+
287+ /* If this is the first window to be registered - register the progress
288+ * callback
289+ */
290+ OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules > 0 );
291+ if (1 == mca_osc_ucx_component .num_modules ) {
292+ ret = opal_progress_register (progress_callback );
293+ if (OMPI_SUCCESS != ret ) {
294+ OSC_UCX_VERBOSE (1 , "opal_progress_register failed: %d" , ret );
295+ goto error ;
296+ }
297+ }
298+
299+ select_unlock :
300+ _osc_ucx_init_unlock ();
301+ if (ret ) {
302+ goto error ;
303+ }
304+
251305 /* create module structure */
252306 module = (ompi_osc_ucx_module_t * )calloc (1 , sizeof (ompi_osc_ucx_module_t ));
253307 if (module == NULL ) {
254308 ret = OMPI_ERR_TEMP_OUT_OF_RESOURCE ;
255309 goto error_nomem ;
256310 }
257311
258- mca_osc_ucx_component .num_modules ++ ;
259-
260312 /* fill in the function pointer part */
261313 memcpy (module , & ompi_osc_ucx_module_template , sizeof (ompi_osc_base_module_t ));
262314
@@ -410,19 +462,15 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in
410462 goto error ;
411463 }
412464
413- OSC_UCX_ASSERT (mca_osc_ucx_component .num_modules > 0 );
414- if (1 == mca_osc_ucx_component .num_modules ) {
415- ret = opal_progress_register (progress_callback );
416- if (OMPI_SUCCESS != ret ) {
417- OSC_UCX_VERBOSE (1 , "opal_progress_register failed: %d" , ret );
418- goto error ;
419- }
420- }
421465 return ret ;
422466
423- error :
467+ error :
424468 if (module -> disp_units ) free (module -> disp_units );
425469 if (module -> comm ) ompi_comm_free (& module -> comm );
470+ /* We update the modules count and (if need) registering a callback right
471+ * prior to memory allocation for the module.
472+ * So we use it as an indirect sign here
473+ */
426474 if (module ) {
427475 free (module );
428476 ompi_osc_ucx_unregister_progress ();
@@ -575,8 +623,6 @@ int ompi_osc_ucx_free(struct ompi_win_t *win) {
575623 }
576624
577625 opal_common_ucx_wpctx_release (module -> ctx );
578-
579- opal_common_ucx_wpool_finalize (mca_osc_ucx_component .wpool );
580626
581627 if (module -> disp_units ) free (module -> disp_units );
582628 ompi_comm_free (& module -> comm );
0 commit comments