Skip to content
This repository was archived by the owner on Sep 30, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions ompi/mca/bml/r2/bml_r2.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,22 +165,30 @@ static mca_bml_base_endpoint_t *mca_bml_r2_allocate_endpoint (ompi_proc_t *proc)
return bml_endpoint;
}

static void mca_bml_r2_register_progress (mca_btl_base_module_t *btl)
static void mca_bml_r2_register_progress (mca_btl_base_module_t *btl, bool hp)
{
if (NULL != btl->btl_component->btl_progress) {
bool found = false;
size_t p;

for (size_t p = 0 ; p < mca_bml_r2.num_btl_progress ; ++p) {
for (p = 0 ; p < mca_bml_r2.num_btl_progress ; ++p) {
if(mca_bml_r2.btl_progress[p] == btl->btl_component->btl_progress) {
found = true;
break;
}
}

if (found == false) {
mca_bml_r2.btl_progress[mca_bml_r2.num_btl_progress++] =
btl->btl_component->btl_progress;
opal_progress_register (btl->btl_component->btl_progress);
if (found == false || hp) {
if (found == false) {
mca_bml_r2.btl_progress[mca_bml_r2.num_btl_progress++] =
btl->btl_component->btl_progress;
}

if (hp) {
opal_progress_register (btl->btl_component->btl_progress);
} else {
opal_progress_register_lp (btl->btl_component->btl_progress);
}
}
}
}
Expand Down Expand Up @@ -403,7 +411,7 @@ static int mca_bml_r2_add_proc (struct ompi_proc_t *proc)
if (OMPI_SUCCESS != rc) {
btl->btl_del_procs (btl, 1, (opal_proc_t **) &proc, &btl_endpoint);
} else {
mca_bml_r2_register_progress (btl);
mca_bml_r2_register_progress (btl, true);
btl_in_use = true;
}
}
Expand Down Expand Up @@ -546,9 +554,7 @@ static int mca_bml_r2_add_procs( size_t nprocs,
btl_inuse++;
}

if (btl_inuse) {
mca_bml_r2_register_progress (btl);
}
mca_bml_r2_register_progress (btl, !!(btl_inuse));
}

free(btl_endpoints);
Expand Down
23 changes: 22 additions & 1 deletion opal/runtime/opal_params.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* reserved.
* Copyright (c) 2008-2015 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2009 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2010-2015 Los Alamos National Security, LLC.
* Copyright (c) 2010-2016 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2014 Hochschule Esslingen. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
Expand Down Expand Up @@ -45,6 +45,7 @@
#include "opal/dss/dss.h"
#include "opal/util/show_help.h"
#include "opal/util/timings.h"
#include "opal/util/bit_ops.h"

char *opal_signal_string = NULL;
char *opal_net_private_ipv4 = NULL;
Expand All @@ -66,6 +67,7 @@ int opal_leave_pinned = -1;
bool opal_leave_pinned_pipeline = false;
bool opal_abort_print_stack = false;
int opal_abort_delay = 0;
unsigned int opal_progress_lp_call_ratio = 8;

static bool opal_register_done = false;

Expand Down Expand Up @@ -279,6 +281,25 @@ int opal_register_params(void)
return ret;
}

opal_progress_lp_call_ratio = 8;
ret = mca_base_var_register("opal", "opal", NULL, "progress_lp_call_ratio",
"Ratio of calls to high-priority to low-priority progress "
"functions. Higher numbers decrease the frequency of the callback "
"rate. Must be a power of two (default: 8)",
MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0,
OPAL_INFO_LVL_5,
MCA_BASE_VAR_SCOPE_READONLY,
&opal_progress_lp_call_ratio);
if (0 > ret) {
return ret;
}

if (opal_progress_lp_call_ratio & (opal_progress_lp_call_ratio - 1)) {
opal_output(0, "MCA variable progress_lp_call_ratio must be a power of two. value = %u",
opal_progress_lp_call_ratio);
return OPAL_ERR_BAD_PARAM;
}

opal_abort_print_stack = false;
ret = mca_base_var_register("opal", "opal", NULL, "abort_print_stack",
"If nonzero, print out a stack trace when abort is invoked",
Expand Down
6 changes: 6 additions & 0 deletions opal/runtime/opal_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ OPAL_DECLSPEC extern bool opal_abort_print_stack;
*/
OPAL_DECLSPEC extern int opal_abort_delay;

/**
* Ratio of calls to high-priority to low-priority progress functions.
* Must be a power of two.
*/
OPAL_DECLSPEC extern unsigned int opal_progress_lp_call_ratio;

#if OPAL_ENABLE_DEBUG
extern bool opal_progress_debug;
#endif
Expand Down
188 changes: 145 additions & 43 deletions opal/runtime/opal_progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006-2014 Los Alamos National Security, LLC. All rights
* Copyright (c) 2006-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2015-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
Expand Down Expand Up @@ -55,10 +55,15 @@ int opal_progress_spin_count = 10000;
static opal_atomic_lock_t progress_lock;

/* callbacks to progress */
static opal_progress_callback_t *callbacks = NULL;
static volatile opal_progress_callback_t *callbacks = NULL;
static size_t callbacks_len = 0;
static size_t callbacks_size = 0;

static volatile opal_progress_callback_t *callbacks_lp = NULL;
static size_t callbacks_lp_len = 0;
static size_t callbacks_lp_size = 0;
static uint64_t callbacks_lp_mask = 0x7;

/* do we want to call sched_yield() if nothing happened */
bool opal_progress_yield_when_idle = false;

Expand Down Expand Up @@ -89,6 +94,9 @@ static int debug_output = -1;
*/
static int fake_cb(void) { return 0; }

static int _opal_progress_unregister (opal_progress_callback_t cb, volatile opal_progress_callback_t *callback_array,
size_t *callback_array_len);

/* init the progress engine - called from orte_init */
int
opal_progress_init(void)
Expand All @@ -105,6 +113,30 @@ opal_progress_init(void)
}
#endif


callbacks_lp_mask = opal_progress_lp_call_ratio - 1;

callbacks_size = callbacks_lp_size = 8;

callbacks = malloc (callbacks_size * sizeof (callbacks[0]));
callbacks_lp = malloc (callbacks_lp_size * sizeof (callbacks_lp[0]));

if (NULL == callbacks || NULL == callbacks_lp) {
free ((void *) callbacks);
free ((void *) callbacks_lp);
callbacks_size = callbacks_lp_size = 0;
callbacks = callbacks_lp = NULL;
return OPAL_ERR_OUT_OF_RESOURCE;
}

for (size_t i = 0 ; i < callbacks_size ; ++i) {
callbacks[i] = fake_cb;
}

for (size_t i = 0 ; i < callbacks_lp_size ; ++i) {
callbacks_lp[i] = fake_cb;
}

OPAL_OUTPUT((debug_output, "progress: initialized event flag to: %x",
opal_progress_event_flag));
OPAL_OUTPUT((debug_output, "progress: initialized yield_when_idle to: %s",
Expand All @@ -126,10 +158,13 @@ opal_progress_finalize(void)

callbacks_len = 0;
callbacks_size = 0;
if (NULL != callbacks) {
free(callbacks);
callbacks = NULL;
}
free ((void *) callbacks);
callbacks = NULL;

callbacks_lp_len = 0;
callbacks_lp_size = 0;
free ((void *) callbacks_lp);
callbacks_lp = NULL;

opal_atomic_unlock(&progress_lock);

Expand All @@ -151,6 +186,7 @@ opal_progress_finalize(void)
void
opal_progress(void)
{
static volatile uint64_t num_calls = 0;
size_t i;
int events = 0;

Expand Down Expand Up @@ -189,6 +225,13 @@ opal_progress(void)
events += (callbacks[i])();
}

if ((OPAL_THREAD_ADD64((volatile int64_t *) &num_calls, 1) & callbacks_lp_mask) == 0) {
/* run low priority callbacks once every 8 calls to opal_progress() */
for (i = 0 ; i < callbacks_lp_len ; ++i) {
events += (callbacks_lp[i])();
}
}

#if OPAL_HAVE_SCHED_YIELD
if (opal_progress_yield_when_idle && events <= 0) {
/* If there is nothing to do - yield the processor - otherwise
Expand Down Expand Up @@ -310,71 +353,130 @@ opal_progress_set_event_poll_rate(int polltime)
#endif
}

static int opal_progress_find_cb (opal_progress_callback_t cb, volatile opal_progress_callback_t *cbs,
size_t cbs_len)
{
for (size_t i = 0 ; i < cbs_len ; ++i) {
if (cbs[i] == cb) {
return (int) i;
}
}

int
opal_progress_register(opal_progress_callback_t cb)
return OPAL_ERR_NOT_FOUND;
}

static int _opal_progress_register (opal_progress_callback_t cb, volatile opal_progress_callback_t **cbs,
size_t *cbs_size, size_t *cbs_len)
{
int ret = OPAL_SUCCESS;
size_t index;

opal_atomic_lock(&progress_lock);
if (OPAL_ERR_NOT_FOUND != opal_progress_find_cb (cb, *cbs, *cbs_len)) {
return OPAL_SUCCESS;
}

/* see if we need to allocate more space */
if (callbacks_len + 1 > callbacks_size) {
opal_progress_callback_t *tmp;
tmp = (opal_progress_callback_t*)realloc(callbacks, sizeof(opal_progress_callback_t) * (callbacks_size + 4));
if (*cbs_len + 1 > *cbs_size) {
opal_progress_callback_t *tmp, *old;

tmp = (opal_progress_callback_t *) malloc (sizeof (tmp[0]) * 2 * *cbs_size);
if (tmp == NULL) {
ret = OPAL_ERR_TEMP_OUT_OF_RESOURCE;
goto cleanup;
return OPAL_ERR_TEMP_OUT_OF_RESOURCE;
}

if (*cbs) {
/* copy old callbacks */
memcpy (tmp, (void *) *cbs, sizeof(tmp[0]) * *cbs_size);
}
/* registering fake callbacks to fill callbacks[] */
for( index = callbacks_len + 1 ; index < callbacks_size + 4 ; index++) {
tmp[index] = &fake_cb;

for (size_t i = *cbs_len ; i < 2 * *cbs_size ; ++i) {
tmp[i] = fake_cb;
}

callbacks = tmp;
callbacks_size += 4;
opal_atomic_wmb ();

/* swap out callback array */
old = opal_atomic_swap_ptr (cbs, tmp);

opal_atomic_wmb ();

free (old);
*cbs_size *= 2;
}

callbacks[callbacks_len++] = cb;
cbs[0][*cbs_len] = cb;
++*cbs_len;

cleanup:
opal_atomic_wmb ();

return ret;
}

int opal_progress_register (opal_progress_callback_t cb)
{
int ret;

opal_atomic_lock(&progress_lock);

(void) _opal_progress_unregister (cb, callbacks_lp, &callbacks_lp_len);

ret = _opal_progress_register (cb, &callbacks, &callbacks_size, &callbacks_len);

opal_atomic_unlock(&progress_lock);

return ret;
}

int
opal_progress_unregister(opal_progress_callback_t cb)
int opal_progress_register_lp (opal_progress_callback_t cb)
{
size_t i;
int ret = OPAL_ERR_NOT_FOUND;
int ret;

opal_atomic_lock(&progress_lock);

for (i = 0 ; i < callbacks_len ; ++i) {
if (cb == callbacks[i]) {
callbacks[i] = &fake_cb;
ret = OPAL_SUCCESS;
break;
}
(void) _opal_progress_unregister (cb, callbacks, &callbacks_len);

ret = _opal_progress_register (cb, &callbacks_lp, &callbacks_lp_size, &callbacks_lp_len);

opal_atomic_unlock(&progress_lock);

return ret;
}

static int _opal_progress_unregister (opal_progress_callback_t cb, volatile opal_progress_callback_t *callback_array,
size_t *callback_array_len)
{
int ret = opal_progress_find_cb (cb, callback_array, *callback_array_len);
if (OPAL_ERR_NOT_FOUND == ret) {
return ret;
}

/* If we found the function we're unregistering: If callbacks_len
is 0, we're not goig to do anything interesting anyway, so
skip. If callbacks_len is 1, it will soon be 0, so no need to
do any repacking. size_t can be unsigned, so 0 - 1 is bad for
a loop condition :). */
if (OPAL_SUCCESS == ret) {
if (callbacks_len > 1 ) {
/* now tightly pack the array */
for ( ; i < callbacks_len - 1 ; ++i) {
callbacks[i] = callbacks[i + 1];
}
}
callbacks[callbacks_len - 1] = &fake_cb;
callbacks_len--;
do any repacking. */
for (size_t i = (size_t) ret ; i < *callback_array_len - 1 ; ++i) {
/* copy callbacks atomically since another thread may be in
* opal_progress(). */
(void) opal_atomic_swap_ptr (callback_array + i, callback_array[i+1]);
}

callback_array[*callback_array_len] = fake_cb;
--*callback_array_len;

return OPAL_SUCCESS;
}

int opal_progress_unregister (opal_progress_callback_t cb)
{
int ret;

opal_atomic_lock(&progress_lock);

ret = _opal_progress_unregister (cb, callbacks, &callbacks_len);

if (OPAL_SUCCESS != ret) {
/* if not in the high-priority array try to remove from the lp array.
* a callback will never be in both. */
ret = _opal_progress_unregister (cb, callbacks_lp, &callbacks_lp_len);
}

opal_atomic_unlock(&progress_lock);
Expand Down
Loading