1010 * University of Stuttgart. All rights reserved.
1111 * Copyright (c) 2004-2005 The Regents of the University of California.
1212 * All rights reserved.
13- * Copyright (c) 2006-2014 Los Alamos National Security, LLC. All rights
13+ * Copyright (c) 2006-2016 Los Alamos National Security, LLC. All rights
1414 * reserved.
1515 * Copyright (c) 2015-2016 Research Organization for Information Science
1616 * and Technology (RIST). All rights reserved.
@@ -55,10 +55,15 @@ int opal_progress_spin_count = 10000;
5555static opal_atomic_lock_t progress_lock ;
5656
5757/* callbacks to progress */
58- static opal_progress_callback_t * callbacks = NULL ;
58+ static volatile opal_progress_callback_t * callbacks = NULL ;
5959static size_t callbacks_len = 0 ;
6060static size_t callbacks_size = 0 ;
6161
62+ static volatile opal_progress_callback_t * callbacks_lp = NULL ;
63+ static size_t callbacks_lp_len = 0 ;
64+ static size_t callbacks_lp_size = 0 ;
65+ static uint64_t callbacks_lp_mask = 0x7 ;
66+
6267/* do we want to call sched_yield() if nothing happened */
6368bool opal_progress_yield_when_idle = false;
6469
@@ -89,6 +94,9 @@ static int debug_output = -1;
8994 */
9095static int fake_cb (void ) { return 0 ; }
9196
97+ static int _opal_progress_unregister (opal_progress_callback_t cb , volatile opal_progress_callback_t * callback_array ,
98+ size_t * callback_array_len );
99+
92100/* init the progress engine - called from orte_init */
93101int
94102opal_progress_init (void )
@@ -105,6 +113,30 @@ opal_progress_init(void)
105113 }
106114#endif
107115
116+
117+ callbacks_lp_mask = opal_progress_lp_call_ratio - 1 ;
118+
119+ callbacks_size = callbacks_lp_size = 8 ;
120+
121+ callbacks = malloc (callbacks_size * sizeof (callbacks [0 ]));
122+ callbacks_lp = malloc (callbacks_lp_size * sizeof (callbacks_lp [0 ]));
123+
124+ if (NULL == callbacks || NULL == callbacks_lp ) {
125+ free ((void * ) callbacks );
126+ free ((void * ) callbacks_lp );
127+ callbacks_size = callbacks_lp_size = 0 ;
128+ callbacks = callbacks_lp = NULL ;
129+ return OPAL_ERR_OUT_OF_RESOURCE ;
130+ }
131+
132+ for (size_t i = 0 ; i < callbacks_size ; ++ i ) {
133+ callbacks [i ] = fake_cb ;
134+ }
135+
136+ for (size_t i = 0 ; i < callbacks_lp_size ; ++ i ) {
137+ callbacks_lp [i ] = fake_cb ;
138+ }
139+
108140 OPAL_OUTPUT ((debug_output , "progress: initialized event flag to: %x" ,
109141 opal_progress_event_flag ));
110142 OPAL_OUTPUT ((debug_output , "progress: initialized yield_when_idle to: %s" ,
@@ -126,10 +158,13 @@ opal_progress_finalize(void)
126158
127159 callbacks_len = 0 ;
128160 callbacks_size = 0 ;
129- if (NULL != callbacks ) {
130- free (callbacks );
131- callbacks = NULL ;
132- }
161+ free ((void * ) callbacks );
162+ callbacks = NULL ;
163+
164+ callbacks_lp_len = 0 ;
165+ callbacks_lp_size = 0 ;
166+ free ((void * ) callbacks_lp );
167+ callbacks_lp = NULL ;
133168
134169 opal_atomic_unlock (& progress_lock );
135170
@@ -151,6 +186,7 @@ opal_progress_finalize(void)
151186void
152187opal_progress (void )
153188{
189+ static volatile uint64_t num_calls = 0 ;
154190 size_t i ;
155191 int events = 0 ;
156192
@@ -189,6 +225,13 @@ opal_progress(void)
189225 events += (callbacks [i ])();
190226 }
191227
228+ if ((OPAL_THREAD_ADD64 ((volatile int64_t * ) & num_calls , 1 ) & callbacks_lp_mask ) == 0 ) {
229+ /* run low priority callbacks once every 8 calls to opal_progress() */
230+ for (i = 0 ; i < callbacks_lp_len ; ++ i ) {
231+ events += (callbacks_lp [i ])();
232+ }
233+ }
234+
192235#if OPAL_HAVE_SCHED_YIELD
193236 if (opal_progress_yield_when_idle && events <= 0 ) {
194237 /* If there is nothing to do - yield the processor - otherwise
@@ -310,71 +353,130 @@ opal_progress_set_event_poll_rate(int polltime)
310353#endif
311354}
312355
356+ static int opal_progress_find_cb (opal_progress_callback_t cb , volatile opal_progress_callback_t * cbs ,
357+ size_t cbs_len )
358+ {
359+ for (size_t i = 0 ; i < cbs_len ; ++ i ) {
360+ if (cbs [i ] == cb ) {
361+ return (int ) i ;
362+ }
363+ }
313364
314- int
315- opal_progress_register (opal_progress_callback_t cb )
365+ return OPAL_ERR_NOT_FOUND ;
366+ }
367+
368+ static int _opal_progress_register (opal_progress_callback_t cb , volatile opal_progress_callback_t * * cbs ,
369+ size_t * cbs_size , size_t * cbs_len )
316370{
317371 int ret = OPAL_SUCCESS ;
318- size_t index ;
319372
320- opal_atomic_lock (& progress_lock );
373+ if (OPAL_ERR_NOT_FOUND != opal_progress_find_cb (cb , * cbs , * cbs_len )) {
374+ return OPAL_SUCCESS ;
375+ }
321376
322377 /* see if we need to allocate more space */
323- if (callbacks_len + 1 > callbacks_size ) {
324- opal_progress_callback_t * tmp ;
325- tmp = (opal_progress_callback_t * )realloc (callbacks , sizeof (opal_progress_callback_t ) * (callbacks_size + 4 ));
378+ if (* cbs_len + 1 > * cbs_size ) {
379+ opal_progress_callback_t * tmp , * old ;
380+
381+ tmp = (opal_progress_callback_t * ) malloc (sizeof (tmp [0 ]) * 2 * * cbs_size );
326382 if (tmp == NULL ) {
327- ret = OPAL_ERR_TEMP_OUT_OF_RESOURCE ;
328- goto cleanup ;
383+ return OPAL_ERR_TEMP_OUT_OF_RESOURCE ;
384+ }
385+
386+ if (* cbs ) {
387+ /* copy old callbacks */
388+ memcpy (tmp , (void * ) * cbs , sizeof (tmp [0 ]) * * cbs_size );
329389 }
330- /* registering fake callbacks to fill callbacks[] */
331- for ( index = callbacks_len + 1 ; index < callbacks_size + 4 ; index ++ ) {
332- tmp [index ] = & fake_cb ;
390+
391+ for ( size_t i = * cbs_len ; i < 2 * * cbs_size ; ++ i ) {
392+ tmp [i ] = fake_cb ;
333393 }
334394
335- callbacks = tmp ;
336- callbacks_size += 4 ;
395+ opal_atomic_wmb ();
396+
397+ /* swap out callback array */
398+ old = opal_atomic_swap_ptr (cbs , tmp );
399+
400+ opal_atomic_wmb ();
401+
402+ free (old );
403+ * cbs_size *= 2 ;
337404 }
338405
339- callbacks [callbacks_len ++ ] = cb ;
406+ cbs [0 ][* cbs_len ] = cb ;
407+ ++ * cbs_len ;
340408
341- cleanup :
409+ opal_atomic_wmb ();
410+
411+ return ret ;
412+ }
413+
414+ int opal_progress_register (opal_progress_callback_t cb )
415+ {
416+ int ret ;
417+
418+ opal_atomic_lock (& progress_lock );
419+
420+ (void ) _opal_progress_unregister (cb , callbacks_lp , & callbacks_lp_len );
421+
422+ ret = _opal_progress_register (cb , & callbacks , & callbacks_size , & callbacks_len );
342423
343424 opal_atomic_unlock (& progress_lock );
344425
345426 return ret ;
346427}
347428
348- int
349- opal_progress_unregister (opal_progress_callback_t cb )
429+ int opal_progress_register_lp (opal_progress_callback_t cb )
350430{
351- size_t i ;
352- int ret = OPAL_ERR_NOT_FOUND ;
431+ int ret ;
353432
354433 opal_atomic_lock (& progress_lock );
355434
356- for (i = 0 ; i < callbacks_len ; ++ i ) {
357- if (cb == callbacks [i ]) {
358- callbacks [i ] = & fake_cb ;
359- ret = OPAL_SUCCESS ;
360- break ;
361- }
435+ (void ) _opal_progress_unregister (cb , callbacks , & callbacks_len );
436+
437+ ret = _opal_progress_register (cb , & callbacks_lp , & callbacks_lp_size , & callbacks_lp_len );
438+
439+ opal_atomic_unlock (& progress_lock );
440+
441+ return ret ;
442+ }
443+
444+ static int _opal_progress_unregister (opal_progress_callback_t cb , volatile opal_progress_callback_t * callback_array ,
445+ size_t * callback_array_len )
446+ {
447+ int ret = opal_progress_find_cb (cb , callback_array , * callback_array_len );
448+ if (OPAL_ERR_NOT_FOUND == ret ) {
449+ return ret ;
362450 }
363451
364452 /* If we found the function we're unregistering: If callbacks_len
365453 is 0, we're not goig to do anything interesting anyway, so
366454 skip. If callbacks_len is 1, it will soon be 0, so no need to
367- do any repacking. size_t can be unsigned, so 0 - 1 is bad for
368- a loop condition :). */
369- if (OPAL_SUCCESS == ret ) {
370- if (callbacks_len > 1 ) {
371- /* now tightly pack the array */
372- for ( ; i < callbacks_len - 1 ; ++ i ) {
373- callbacks [i ] = callbacks [i + 1 ];
374- }
375- }
376- callbacks [callbacks_len - 1 ] = & fake_cb ;
377- callbacks_len -- ;
455+ do any repacking. */
456+ for (size_t i = (size_t ) ret ; i < * callback_array_len - 1 ; ++ i ) {
457+ /* copy callbacks atomically since another thread may be in
458+ * opal_progress(). */
459+ (void ) opal_atomic_swap_ptr (callback_array + i , callback_array [i + 1 ]);
460+ }
461+
462+ callback_array [* callback_array_len ] = fake_cb ;
463+ -- * callback_array_len ;
464+
465+ return OPAL_SUCCESS ;
466+ }
467+
468+ int opal_progress_unregister (opal_progress_callback_t cb )
469+ {
470+ int ret ;
471+
472+ opal_atomic_lock (& progress_lock );
473+
474+ ret = _opal_progress_unregister (cb , callbacks , & callbacks_len );
475+
476+ if (OPAL_SUCCESS != ret ) {
477+ /* if not in the high-priority array try to remove from the lp array.
478+ * a callback will never be in both. */
479+ ret = _opal_progress_unregister (cb , callbacks_lp , & callbacks_lp_len );
378480 }
379481
380482 opal_atomic_unlock (& progress_lock );
0 commit comments