diff --git a/ompi/communicator/ft/comm_ft_revoke.c b/ompi/communicator/ft/comm_ft_revoke.c index 0e4c3158afa..923e3a73ecf 100644 --- a/ompi/communicator/ft/comm_ft_revoke.c +++ b/ompi/communicator/ft/comm_ft_revoke.c @@ -18,6 +18,10 @@ #include "ompi/communicator/communicator.h" #include "ompi/mca/pml/pml.h" +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + static int ompi_comm_revoke_local(ompi_communicator_t* comm, ompi_comm_rbcast_message_t* msg); @@ -93,6 +97,14 @@ static int ompi_comm_revoke_local(ompi_communicator_t* comm, ompi_comm_rbcast_me MCA_PML_CALL(revoke_comm(comm, false)); /* Signal the point-to-point stack to recheck requests */ wait_sync_global_wakeup(MPI_ERR_REVOKED); + +#ifdef OMPI_HAVE_MPI_EXT_CONTINUE + /* Continuations: + * Release continuations and mark them as failed. + */ + ompi_continue_global_wakeup(MPI_ERR_PROC_FAILED); +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + return true; } diff --git a/ompi/errhandler/errhandler.c b/ompi/errhandler/errhandler.c index 17fd48dbe9b..bd5eacf1001 100644 --- a/ompi/errhandler/errhandler.c +++ b/ompi/errhandler/errhandler.c @@ -40,6 +40,10 @@ #include "opal/mca/backtrace/backtrace.h" #include "ompi/runtime/mpiruntime.h" +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + /* * Table for Fortran <-> C errhandler handle conversion */ @@ -415,6 +419,13 @@ int ompi_errhandler_proc_failed_internal(ompi_proc_t* ompi_proc, int status, boo */ wait_sync_global_wakeup(PMIX_ERR_PROC_ABORTED == status? MPI_ERR_PROC_ABORTED: MPI_ERR_PROC_FAILED); +#ifdef OMPI_HAVE_MPI_EXT_CONTINUE + /* Continuations: + * Release continuations and mark them as failed. + */ + ompi_continue_global_wakeup(MPI_ERR_PROC_FAILED); +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + /* Collectives: * Propagate the error (this has been selected rather than the "roll * forward through errors in collectives" as this is less intrusive to the diff --git a/ompi/errhandler/errhandler_invoke.c b/ompi/errhandler/errhandler_invoke.c index d9d3ede4677..f4f58c66e76 100644 --- a/ompi/errhandler/errhandler_invoke.c +++ b/ompi/errhandler/errhandler_invoke.c @@ -34,6 +34,9 @@ #include "ompi/errhandler/errhandler.h" #include "ompi/mpi/fortran/base/fint_2_int.h" +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ int ompi_errhandler_invoke(ompi_errhandler_t *errhandler, void *mpi_object, int object_type, int err_code, const char *message) @@ -174,6 +177,13 @@ int ompi_errhandler_request_invoke(int count, mpi_object = requests[i]->req_mpi_object; type = requests[i]->req_type; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == type) { + /* take the mpi object stored in the continuation request */ + ompi_continue_get_error_info(requests[i], &mpi_object, &type); + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + /* Since errors on requests cause them to not be freed (until we can examine them here), go through and free all requests with errors. We only invoke the error on the *first* request diff --git a/ompi/include/mpi.h.in b/ompi/include/mpi.h.in index 0c26fa08d8f..05203c4fb0b 100644 --- a/ompi/include/mpi.h.in +++ b/ompi/include/mpi.h.in @@ -754,6 +754,8 @@ enum { #define MPI_ERR_SESSION 78 #define MPI_ERR_VALUE_TOO_LARGE 79 +#define MPI_ERR_CONT 78 + /* Per MPI-3 p349 47, MPI_ERR_LASTCODE must be >= the last predefined MPI_ERR_ code. Set the last code to allow some room for adding error codes without breaking ABI. */ diff --git a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c index a3db1458938..f15aa1870fd 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c @@ -369,7 +369,7 @@ int mca_pml_ob1_revoke_comm( struct ompi_communicator_t* ompi_comm, bool coll_on /* note this is not an ompi_proc, but a ob1_comm_proc, thus we don't * use ompi_proc_is_sentinel to verify if initialized. */ if( NULL == proc ) continue; - /* remove the frag from the unexpected list, add to the nack list + /* remove the frag from the unexpected list, add to the nack list * so that we can send the nack as needed to remote cancel the send * from outside the match lock. */ @@ -384,7 +384,7 @@ int mca_pml_ob1_revoke_comm( struct ompi_communicator_t* ompi_comm, bool coll_on } } /* same for the cantmatch queue/heap; this list is more complicated - * Keep it simple: we pop all of the complex list, put the bad items + * Keep it simple: we pop all of the complex list, put the bad items * in the nack_list, and keep the good items in the keep_list; * then we reinsert the good items in the cantmatch heaplist */ mca_pml_ob1_recv_frag_t* frag; diff --git a/ompi/mca/pml/ucx/pml_ucx_request.h b/ompi/mca/pml/ucx/pml_ucx_request.h index 8132f6b54ba..0ad22dd6f29 100644 --- a/ompi/mca/pml/ucx/pml_ucx_request.h +++ b/ompi/mca/pml/ucx/pml_ucx_request.h @@ -157,6 +157,7 @@ int mca_pml_ucx_request_cancel_send(ompi_request_t *req, int flag); static inline void mca_pml_ucx_request_reset(ompi_request_t *req) { + OMPI_REQUEST_INIT(req, req->req_persistent); req->req_complete = REQUEST_PENDING; } diff --git a/ompi/mpi/c/start.c b/ompi/mpi/c/start.c index 5bf202385f8..c05134516bc 100644 --- a/ompi/mpi/c/start.c +++ b/ompi/mpi/c/start.c @@ -78,8 +78,8 @@ int MPI_Start(MPI_Request *request) case OMPI_REQUEST_PML: case OMPI_REQUEST_COLL: case OMPI_REQUEST_PART: - if ( MPI_PARAM_CHECK && !((*request)->req_persistent && - OMPI_REQUEST_INACTIVE == (*request)->req_state)) { + case OMPI_REQUEST_CONT: + if ( MPI_PARAM_CHECK && !(*request)->req_persistent) { return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_REQUEST, FUNC_NAME); } diff --git a/ompi/mpi/c/startall.c b/ompi/mpi/c/startall.c index a733e7586cf..9af8823dce6 100644 --- a/ompi/mpi/c/startall.c +++ b/ompi/mpi/c/startall.c @@ -70,6 +70,7 @@ int MPI_Startall(int count, MPI_Request requests[]) (OMPI_REQUEST_PML != requests[i]->req_type && OMPI_REQUEST_COLL != requests[i]->req_type && OMPI_REQUEST_PART != requests[i]->req_type && + OMPI_REQUEST_CONT != requests[i]->req_type && OMPI_REQUEST_NOOP != requests[i]->req_type)) { rc = MPI_ERR_REQUEST; break; diff --git a/ompi/mpiext/continue/Makefile.am b/ompi/mpiext/continue/Makefile.am new file mode 100644 index 00000000000..70cd9a7c85a --- /dev/null +++ b/ompi/mpiext/continue/Makefile.am @@ -0,0 +1,22 @@ +# -*- shell-script -*- +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# This Makefile is not traversed during a normal "make all" in an OMPI +# build. It *is* traversed during "make dist", however. So you can +# put EXTRA_DIST targets in here. +# +# You can also use this as a convenience for building this MPI +# extension (i.e., "make all" in this directory to invoke "make all" +# in all the subdirectories). + +SUBDIRS = c + diff --git a/ompi/mpiext/continue/c/Makefile.am b/ompi/mpiext/continue/c/Makefile.am new file mode 100644 index 00000000000..9e5b0cff253 --- /dev/null +++ b/ompi/mpiext/continue/c/Makefile.am @@ -0,0 +1,43 @@ +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# OMPI_BUILD_MPI_PROFILING is enabled when we want our generated MPI_* symbols +# to be replaced by PMPI_*. +# In this directory, we need it to be 0 + +AM_CPPFLAGS = -DOMPI_BUILD_MPI_PROFILING=0 -DOMPI_COMPILING_FORTRAN_WRAPPERS=0 + +include $(top_srcdir)/Makefile.ompi-rules + +noinst_LTLIBRARIES = libmpiext_continue_c.la + +# This is where the top-level header file (that is included in +# ) must be installed. +ompidir = $(ompiincludedir)/mpiext + +# This is the header file that is installed. +nodist_ompi_HEADERS = mpiext_continue_c.h + +libmpiext_continue_c_la_SOURCES = \ + continuation.c \ + continue.c \ + continueall.c \ + continue_init.c \ + mpiext_continue_module.c + +#libmpiext_continue_c_la_LDFLAGS = -module -avoid-version + +dist_ompidata_DATA = help-mpi-continue.txt + +ompi_HEADERS = $(headers) + +MAINTAINERCLEANFILES = $(nodist_libmpiext_continue_c_la_SOURCES) + diff --git a/ompi/mpiext/continue/c/continuation.c b/ompi/mpiext/continue/c/continuation.c new file mode 100644 index 00000000000..b1ce5131e00 --- /dev/null +++ b/ompi/mpiext/continue/c/continuation.c @@ -0,0 +1,1164 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + + +#include "ompi_config.h" +#include "opal/class/opal_fifo.h" +#include "opal/class/opal_list.h" +#include "opal/class/opal_free_list.h" +#include "opal/sys/atomic.h" +#include "opal/util/show_help.h" +#include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/request/request.h" + +#include "ompi/communicator/communicator.h" +#include "ompi/file/file.h" +#include "ompi/win/win.h" + + +static opal_free_list_t ompi_continuation_freelist; +static opal_free_list_t ompi_request_cont_data_freelist; + +/* Forward-decl */ +typedef struct ompi_cont_request_t ompi_cont_request_t; + +static int ompi_continue_request_free(ompi_request_t** cont_req); + +static int ompi_continue_request_start(size_t count, ompi_request_t** cont_req_ptr); + +/** + * Continuation class containing the callback, callback data, status, + * and number of outstanding operation requests. + */ +OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_continuation_t); + +struct ompi_continuation_t { + opal_free_list_item_t super; /**< Base type */ + struct ompi_cont_request_t *cont_req; /**< The continuation request this continuation is registered with */ + MPIX_Continue_cb_function *cont_cb; /**< The callback function to invoke */ + void *cont_data; /**< Continuation state provided by the user */ + MPI_Status *cont_status; /**< user-provided pointers to status objects */ + MPI_Request *cont_opreqs; /**< operation requests, user-provided buffer */ + int cont_num_opreqs; /**< the number of opreqs */ + opal_atomic_int32_t cont_num_active; /**< The number of active operation requests on this callback */ + opal_atomic_int32_t cont_failed; /**< the continution is failed */ + opal_atomic_int32_t cont_request_check; /**< flag set by the failed continuation handler to block + * completing threads from freeing their request */ + int cont_rc; /** return code to be passed to callback */ + bool cont_invoke_failed; /** if true, failed continuations will be invoked and passed the error code */ +}; + +/* Convenience typedef */ +typedef struct ompi_continuation_t ompi_continuation_t; + +static void ompi_continuation_construct(ompi_continuation_t* cont) +{ + cont->cont_req = NULL; + cont->cont_cb = NULL; + cont->cont_data = NULL; + cont->cont_num_active = 0; + cont->cont_num_opreqs = 0; + cont->cont_opreqs = NULL; + cont->cont_failed = 0; + cont->cont_request_check = 0; + cont->cont_rc = MPI_SUCCESS; + cont->cont_invoke_failed = false; +} + +static void ompi_continuation_destruct(ompi_continuation_t* cont) +{ + assert(cont->cont_req == NULL); + assert(cont->cont_cb == NULL); + assert(cont->cont_data == NULL); + assert(cont->cont_num_active == 0); +} + +OBJ_CLASS_INSTANCE( + ompi_continuation_t, + opal_free_list_item_t, + ompi_continuation_construct, + ompi_continuation_destruct); + +struct ompi_cont_errorinfo_t { + ompi_mpi_object_t mpi_object; + int type; +}; +typedef struct ompi_cont_errorinfo_t ompi_cont_errorinfo_t; + +/** + * Continuation request, derived from an OMPI request. Continuation request + * keep track of registered continuations and complete once no active + * continuations are registered. + */ +OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_cont_request_t); +struct ompi_cont_request_t { + ompi_request_t super; + opal_list_item_t cont_list_item; /**< List item to store the continuation request in the list of requests */ + opal_atomic_lock_t cont_lock; /**< Lock used completing/restarting the cont request */ + bool cont_enqueue_complete; /**< Whether to enqueue immediately complete requests */ + opal_atomic_int32_t cont_num_active; /**< The number of active continuations registered with a continuation request */ + uint32_t continue_max_poll; /**< max number of local continuations to execute at once */ + opal_list_t *cont_complete_list; /**< List of complete continuations to be invoked during test */ + ompi_wait_sync_t *sync; /**< Sync object this continuation request is attached to */ + opal_list_t cont_complete_defer_list; /**< List of complete continuations deferred on inactive requests */ + opal_list_t cont_incomplete_list; /**< List of incomplete continuations, used if error checking is enabled */ + opal_list_t cont_failed_list; /**< List of failed continuations */ + int cont_flags; /**< flags provided by user */ + ompi_cont_errorinfo_t cont_errorinfo; /**< info on the error handler to use when an error is detected */ +}; + +static void ompi_cont_request_construct(ompi_cont_request_t* cont_req) +{ + OMPI_REQUEST_INIT(&cont_req->super, true); + OBJ_CONSTRUCT(&cont_req->cont_list_item, opal_list_item_t); + cont_req->super.req_type = OMPI_REQUEST_CONT; + cont_req->super.req_complete = REQUEST_COMPLETED; + cont_req->super.req_state = OMPI_REQUEST_INACTIVE; + cont_req->super.req_persistent = true; + cont_req->super.req_free = &ompi_continue_request_free; + cont_req->super.req_start = &ompi_continue_request_start; + cont_req->super.req_status = ompi_status_empty; /* always returns MPI_SUCCESS */ + opal_atomic_lock_init(&cont_req->cont_lock, false); + cont_req->cont_enqueue_complete = false; + opal_atomic_lock_init(&cont_req->cont_lock, false); + cont_req->cont_num_active = 0; + cont_req->continue_max_poll = UINT32_MAX; + cont_req->cont_complete_list = NULL; + cont_req->sync = NULL; + cont_req->cont_flags = 0; + OBJ_CONSTRUCT(&cont_req->cont_complete_defer_list, opal_list_t); + OBJ_CONSTRUCT(&cont_req->cont_incomplete_list, opal_list_t); + OBJ_CONSTRUCT(&cont_req->cont_failed_list, opal_list_t); +} + +static void ompi_cont_request_destruct(ompi_cont_request_t* cont_req) +{ + OMPI_REQUEST_FINI(&cont_req->super); + assert(cont_req->cont_num_active == 0); + if (NULL != cont_req->cont_complete_list) { + OPAL_LIST_RELEASE(cont_req->cont_complete_list); + cont_req->cont_complete_list = NULL; + } + OBJ_DESTRUCT(&cont_req->cont_complete_defer_list); + OBJ_DESTRUCT(&cont_req->cont_incomplete_list); + OBJ_DESTRUCT(&cont_req->cont_failed_list); +} + +OBJ_CLASS_INSTANCE( + ompi_cont_request_t, + ompi_request_t, + ompi_cont_request_construct, + ompi_cont_request_destruct); + +/** + * Data block associated with requests + * The same structure is used for continuation requests and operation + * requests with attached continuations. + */ +OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_request_cont_data_t); + +struct ompi_request_cont_data_t { + opal_free_list_item_t super; + ompi_continuation_t *cont_obj; /**< User-defined continuation state */ + ompi_status_public_t *cont_status; /**< The status object to set before invoking continuation */ + int cont_idx; /**< Index in the user-provided request array */ +}; + +/* Convenience typedef */ +typedef struct ompi_request_cont_data_t ompi_request_cont_data_t; + +OBJ_CLASS_INSTANCE( + ompi_request_cont_data_t, + opal_free_list_item_t, + NULL, NULL); + +/** + * List of continuations eligible for execution + */ +static opal_list_t continuation_list; + +/** + * Mutex to protect the continuation_list + */ +static opal_mutex_t request_cont_lock; + +/** + * Flag indicating whether the progress callback has been registered. + */ +static bool progress_callback_registered = false; + +/** + * Thread-private data holding continuations to be executed by a given thread only + * and whether that thread is currently progressing continuations + */ +struct thread_local_data_t { + opal_list_t thread_progress_list; + opal_list_t tmplist; // temporary list used to reduce lock pressure + bool in_progress; // whether the thread is currently progressing continuations, to avoid recursion + bool is_initialized; // whether the thread_progress_list has been initialized +}; +typedef struct thread_local_data_t thread_local_data_t; + +static void init_tl_data(thread_local_data_t* tld) +{ + OBJ_CONSTRUCT(&tld->thread_progress_list, opal_list_t); + OBJ_CONSTRUCT(&tld->tmplist, opal_list_t); + tld->is_initialized = true; + +} + +static inline __opal_attribute_always_inline__ +thread_local_data_t* get_tl_data(void) +{ + static opal_thread_local thread_local_data_t tl_data = { .in_progress = false, .is_initialized = false }; + /* process global tl_data if threads are disabled */ + static thread_local_data_t gl_data = { .in_progress = false, .is_initialized = false }; + + thread_local_data_t* tld = &gl_data; + if (opal_using_threads()) { + tld = &tl_data; + } + + if (OPAL_UNLIKELY(!tld->is_initialized)) { + init_tl_data(tld); + } + return tld; +} + +/** + * List of continuation requests to be checked for failure with ULFM + */ +static opal_list_t cont_req_list; +static opal_mutex_t cont_req_list_mtx; + +static inline +int ompi_continue_progress_request_n(ompi_cont_request_t *cont_req, + uint32_t max_poll, + thread_local_data_t *tld); + +static inline +int ompi_continue_check_request_error_abort(ompi_request_t *req); + +static +void ompi_continue_cont_release(ompi_continuation_t *cont, int rc) +{ + ompi_cont_request_t *cont_req = cont->cont_req; + assert(OMPI_REQUEST_CONT == cont_req->super.req_type); + + if (OMPI_SUCCESS != rc) { + /* The continuation has failed, move it to the failed list and set the error + * on the continuation request. + * We do not mark the CR as complete, all other runnable continuations have + * to complete first. + */ + opal_atomic_lock(&cont_req->cont_lock); + opal_list_append(&cont_req->cont_failed_list, &cont->super.super); + opal_atomic_unlock(&cont_req->cont_lock); + cont_req->super.req_status.MPI_ERROR = rc; + /* we have no object to associate this error with */ + cont_req->cont_errorinfo.mpi_object.comm = NULL; + cont_req->cont_errorinfo.type = OMPI_REQUEST_CONT; + } + + opal_atomic_lock(&cont_req->cont_lock); + int num_active = OPAL_THREAD_ADD_FETCH32(&cont_req->cont_num_active, -1); + if (num_active == 0) { + opal_atomic_wmb(); + //opal_atomic_lock(&cont_req->cont_lock); + if (!REQUEST_COMPLETE(&cont_req->super)) { + /* signal that all continuations were found complete */ + //printf("COMPLETE cont_req %p cont %p\n", cont_req, cont); + ompi_request_complete(&cont_req->super, true); + } + //opal_atomic_unlock(&cont_req->cont_lock); + } + opal_atomic_unlock(&cont_req->cont_lock); + + OBJ_RELEASE(cont_req); + + if (OMPI_SUCCESS == rc) { +#ifdef OPAL_ENABLE_DEBUG + cont->cont_cb = NULL; + cont->cont_data = NULL; + cont->cont_req = NULL; + opal_atomic_wmb(); +#endif // OPAL_ENABLE_DEBUG + opal_free_list_return(&ompi_continuation_freelist, &cont->super); + } +} + +/** + * Process a callback. Returns the callback object to the freelist. + */ +static inline +int ompi_continue_cont_invoke(ompi_continuation_t *cont) +{ +#ifndef NDEBUG + ompi_cont_request_t *cont_req = cont->cont_req; + assert(NULL != cont_req); + assert(OMPI_REQUEST_CONT == cont_req->super.req_type); +#endif // NDEBUG + + MPIX_Continue_cb_function *fn = cont->cont_cb; + void *cont_data = cont->cont_data; + int rc = fn(cont->cont_rc, cont_data); + ompi_continue_cont_release(cont, rc); + return rc; +} + +static +int ompi_continue_progress_n(const uint32_t max, thread_local_data_t *tld) +{ + + if (tld->in_progress) return 0; + + uint32_t completed = 0; + + + const bool using_threads = opal_using_threads(); + + /* execute thread-local continuations first + * (e.g., from continuation requests the current thread is waiting on) */ + if (!opal_list_is_empty(&tld->thread_progress_list)) { + ompi_cont_request_t *cont_req; + OPAL_LIST_FOREACH(cont_req, &tld->thread_progress_list, ompi_cont_request_t) { + completed += ompi_continue_progress_request_n(cont_req, max - completed, tld); + if (max <= completed) break; + } + } + + tld->in_progress = 1; + if (!opal_list_is_empty(&continuation_list)) { + /* global progress */ + //TODO: steal some requests and process them in chunks to reduce locking? + while (max > completed && !opal_list_is_empty(&continuation_list)) { + ompi_continuation_t *cont; + if (using_threads) { + opal_mutex_atomic_lock(&request_cont_lock); + cont = (ompi_continuation_t*)opal_list_remove_first(&continuation_list); + opal_mutex_atomic_unlock(&request_cont_lock); + } else { + cont = (ompi_continuation_t*)opal_list_remove_first(&continuation_list); + } + if (NULL == cont) break; + ompi_continue_cont_invoke(cont); + + ++completed; + } + } + + tld->in_progress = 0; + + return completed; +} + +static int ompi_continue_progress_callback(void) +{ + thread_local_data_t *tld = get_tl_data(); + return ompi_continue_progress_n(1, tld); +} + +static int ompi_continue_wait_progress_callback(void) +{ + thread_local_data_t *tld = get_tl_data(); + return ompi_continue_progress_n(UINT32_MAX, tld); +} + +static inline +int ompi_continue_progress_request_n(ompi_cont_request_t *cont_req, uint32_t max_poll, thread_local_data_t *tld) +{ + if (tld->in_progress) return 0; + if (NULL == cont_req->cont_complete_list) { + /* nothing to progress in this request */ + return 0; + } + if (opal_list_is_empty(cont_req->cont_complete_list)) { + return 0; + } + + uint32_t completed = 0; + const bool using_threads = opal_using_threads(); + + tld->in_progress = 1; + + /* take the continuations from the local list */ + if (using_threads) { + opal_atomic_lock(&cont_req->cont_lock); + } + opal_list_join(&tld->tmplist, opal_list_get_begin(&tld->tmplist), cont_req->cont_complete_list); + if (using_threads) { + opal_atomic_unlock(&cont_req->cont_lock); + } + while (max_poll > completed && !opal_list_is_empty(&tld->tmplist)) { + ompi_continuation_t *cont; + cont = (ompi_continuation_t *) opal_list_remove_first(&tld->tmplist); + if (NULL == cont) break; + + ompi_continue_cont_invoke(cont); + ++completed; + } + + /* put any remaining continuations back into the local list */ + if (!opal_list_is_empty(&tld->tmplist)) { + if (using_threads) { + opal_atomic_lock(&cont_req->cont_lock); + } + opal_list_join(cont_req->cont_complete_list, opal_list_get_begin(cont_req->cont_complete_list), &tld->tmplist); + if (using_threads) { + opal_atomic_unlock(&cont_req->cont_lock); + } + } + + tld->in_progress = 0; + + return completed; +} + +int ompi_continue_progress_request(ompi_request_t *req) +{ + thread_local_data_t *tld = get_tl_data(); + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; + int rc = ompi_continue_progress_request_n(cont_req, cont_req->continue_max_poll, tld); + /* global progress, if the request isn't complete yet */ + if (!REQUEST_COMPLETE(req)) { + rc += ompi_continue_progress_n(cont_req->continue_max_poll - rc, tld); + } + return rc; +} + +/** + * Register the continuation request so that it will be progressed even if + * it is poll-only and the thread is waiting on the provided sync object. + */ +int ompi_continue_register_request_progress(ompi_request_t *req, ompi_wait_sync_t *sync) +{ + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; + + thread_local_data_t *tld = get_tl_data(); + + if (NULL == cont_req->cont_complete_list) { + if (!REQUEST_COMPLETE(&cont_req->super)) { + /* progress requests to see if we can complete it already */ + ompi_continue_progress_n(UINT32_MAX, tld); + } + + return OMPI_SUCCESS; + } + + /* add the continuation request to the thread-local list, + * will be removed in ompi_continue_deregister_request_progress */ + opal_list_append(&tld->thread_progress_list, &cont_req->super.super.super); + + if (REQUEST_COMPLETE(&cont_req->super)) { + return OMPI_SUCCESS; + } + + /* progress request to see if we can complete it already */ + ompi_continue_progress_request_n(cont_req, UINT32_MAX, tld); + + if (!REQUEST_COMPLETE(req)) { + ompi_continue_progress_n(UINT32_MAX, tld); + } + + if (REQUEST_COMPLETE(req)) return OMPI_SUCCESS; + + /* register with the sync object */ + if (NULL != sync) { + sync->num_req_need_progress++; + sync->progress_cb = &ompi_continue_wait_progress_callback; + } + cont_req->sync = sync; + + return OMPI_SUCCESS; +} + +/** + * Remove the poll-only continuation request from the thread's progress list after + * it has completed. + */ +int ompi_continue_deregister_request_progress(ompi_request_t *req) +{ + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; + + if (NULL == cont_req->cont_complete_list) return OMPI_SUCCESS; + + thread_local_data_t *tld = get_tl_data(); + + /* let the sync know we're done, it may suspend the thread now */ + if (NULL != cont_req->sync) { + cont_req->sync->num_req_need_progress--; + } + + /* remove the continuation request from the thread-local progress list */ + opal_list_remove_item(&tld->thread_progress_list, &req->super.super); + + return OMPI_SUCCESS; +} + +int ompi_continuation_init(void) +{ + OBJ_CONSTRUCT(&request_cont_lock, opal_mutex_t); + OBJ_CONSTRUCT(&continuation_list, opal_list_t); + + OBJ_CONSTRUCT(&ompi_continuation_freelist, opal_free_list_t); + opal_free_list_init(&ompi_continuation_freelist, + sizeof(ompi_continuation_t), + opal_cache_line_size, + OBJ_CLASS(ompi_continuation_t), + 0, opal_cache_line_size, + 0, -1 , 8, NULL, 0, NULL, NULL, NULL); + + OBJ_CONSTRUCT(&ompi_request_cont_data_freelist, opal_free_list_t); + opal_free_list_init(&ompi_request_cont_data_freelist, + sizeof(ompi_request_cont_data_t), + opal_cache_line_size, + OBJ_CLASS(ompi_request_cont_data_t), + 0, opal_cache_line_size, + 0, -1 , 8, NULL, 0, NULL, NULL, NULL); + + OBJ_CONSTRUCT(&cont_req_list, opal_list_t); + OBJ_CONSTRUCT(&cont_req_list_mtx, opal_mutex_t); + + return OMPI_SUCCESS; +} + +int ompi_continuation_fini(void) +{ + if (progress_callback_registered) { + opal_progress_unregister(&ompi_continue_progress_callback); + } + + if (!opal_list_is_empty(&continuation_list)) { + opal_show_help("help-mpi-continue.txt", "continue:incomplete_shutdown", 1, + opal_list_get_size(&continuation_list)); + } + OBJ_DESTRUCT(&continuation_list); + + OBJ_DESTRUCT(&request_cont_lock); + OBJ_DESTRUCT(&ompi_continuation_freelist); + OBJ_DESTRUCT(&ompi_request_cont_data_freelist); + + OBJ_DESTRUCT(&cont_req_list); + OBJ_DESTRUCT(&cont_req_list_mtx); + + return OMPI_SUCCESS; +} + +/** + * Enqueue the continuation for later invocation. + */ +static void +ompi_continue_enqueue_runnable(ompi_continuation_t *cont) +{ + bool req_volatile = (NULL == cont->cont_opreqs); + const bool using_threads = opal_using_threads(); + ompi_cont_request_t *cont_req = cont->cont_req; +recheck: + if (NULL != cont_req->cont_complete_list || cont_req->super.req_state == OMPI_REQUEST_INACTIVE) { + /* put the continuation into a local list, no need to go through the deferred list ever */ + if (using_threads) { opal_atomic_lock(&cont_req->cont_lock); } + if (!req_volatile) { + opal_list_remove_item(&cont_req->cont_incomplete_list, &cont->super.super); + } + if (NULL != cont_req->cont_complete_list) { + opal_list_append(cont_req->cont_complete_list, &cont->super.super); + } else if (cont_req->super.req_state == OMPI_REQUEST_INACTIVE) { + opal_list_append(&cont_req->cont_complete_defer_list, &cont->super.super); + } else { + /* someone started the request before we took the lock, go back and check again + * this should be rare so we don't care about taking the lock again */ + if (!req_volatile) { + /* put it back into the list of incomplete ops */ + opal_list_append(&cont_req->cont_incomplete_list, &cont->super.super); + } + if (using_threads) { opal_atomic_unlock(&cont_req->cont_lock); } + goto recheck; + } + if (using_threads) { opal_atomic_unlock(&cont_req->cont_lock); } + } else { + /* put the continuation into the global list */ + if (!req_volatile) { + if (using_threads) { opal_atomic_lock(&cont_req->cont_lock); } + opal_list_remove_item(&cont_req->cont_incomplete_list, &cont->super.super); + if (using_threads) { opal_atomic_unlock(&cont_req->cont_lock); } + } + if (using_threads) { opal_mutex_atomic_lock(&request_cont_lock); } + opal_list_append(&continuation_list, &cont->super.super); + if (using_threads) { opal_mutex_atomic_unlock(&request_cont_lock); } + } + + if (OPAL_UNLIKELY(!progress_callback_registered)) { + /* TODO: Ideally, we want to ensure that the callback is called *after* + * all the other progress callbacks are done so that any + * completions have happened before we attempt to execute + * callbacks. There doesn't seem to exist the infrastructure though. + */ + if (using_threads) { opal_mutex_atomic_lock(&request_cont_lock); } + if (!progress_callback_registered) { + opal_progress_register(&ompi_continue_progress_callback); + progress_callback_registered = true; + } + if (using_threads) { opal_mutex_atomic_unlock(&request_cont_lock); } + } +} + +/** + * Create and initialize a continuation object. + */ +static inline +ompi_continuation_t *ompi_continue_cont_create( + int count, + ompi_cont_request_t *cont_req, + MPIX_Continue_cb_function *cont_cb, + void *cont_data, + MPI_Status *cont_status, + bool req_volatile) +{ + const bool using_threads = opal_using_threads(); + ompi_continuation_t *cont; + cont = (ompi_continuation_t *)opal_free_list_get(&ompi_continuation_freelist); + cont->cont_req = cont_req; + cont->cont_cb = cont_cb; + cont->cont_data = cont_data; + cont->cont_num_active = count; + cont->cont_status = cont_status; + + /* signal that the continuation request has a new continuation */ + OBJ_RETAIN(cont_req); + + if (using_threads) { + opal_atomic_lock(&cont_req->cont_lock); + } + int prev_num_active = OPAL_THREAD_ADD_FETCH32(&cont_req->cont_num_active, 1); + + /* if the continuation request was completed we mark it pending here */ + if (prev_num_active == 1) { + cont_req->super.req_complete = REQUEST_PENDING; + cont_req->super.req_complete_cb = NULL; + } + + /* if we don't have the requests we cannot handle oob errors, + * so don't bother keeping the continuation around */ + if (!req_volatile) { + opal_list_append(&cont_req->cont_incomplete_list, &cont->super.super); + } + if (using_threads) { + opal_atomic_unlock(&cont_req->cont_lock); + } + + return cont; +} + +static void handle_failed_cont(ompi_continuation_t *cont, int status, bool have_cont_req_lock) +{ + ompi_mpi_object_t error_object = {NULL}; + int error_object_type = OMPI_REQUEST_CONT; + ompi_cont_request_t *cont_req = cont->cont_req; + if (!have_cont_req_lock) { + opal_atomic_lock(&cont->cont_req->cont_lock); + } + /* add 1 here, so that no thread in the non-failure path in request_completion_cb + * thinks the continuation is ready for execution */ + OPAL_THREAD_ADD_FETCH32(&cont->cont_num_active, 1); + if (NULL != cont->cont_opreqs) { + /* block threads in request_completion_cb from releasing their requests */ + cont->cont_request_check = 1; + opal_atomic_wmb(); + /* detach all other requests*/ + for (int i = 0; i < cont->cont_num_opreqs; ++i) { + ompi_request_t *request = cont->cont_opreqs[i]; + if (MPI_REQUEST_NULL == request) continue; + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)request->req_complete_cb_data; + if (NULL == req_cont_data) continue; + if (opal_atomic_compare_exchange_strong_ptr((opal_atomic_intptr_t*)&request->req_complete_cb_data, (intptr_t*)&req_cont_data, 0x0)) { + /* we acquired the request continuation data, free it */ + OPAL_THREAD_ADD_FETCH32(&cont->cont_num_active, -1); + req_cont_data->cont_status->MPI_ERROR = MPI_ERR_PENDING; + int error = MPI_SUCCESS; + if (REQUEST_COMPLETE(request) && MPI_SUCCESS != request->req_status.MPI_ERROR) { + error = request->req_status.MPI_ERROR; + } +#if OPAL_ENABLE_FT_MPI + /* PROC_FAILED_PENDING errors are also not completed yet */ + if (ompi_request_is_failed(request)) { + if (MPI_ERR_PROC_FAILED_PENDING == request->req_status.MPI_ERROR) { + error = req_cont_data->cont_status->MPI_ERROR = MPI_ERR_PROC_FAILED_PENDING; + } + } +#endif /* OPAL_ENABLE_FT_MPI */ + /* pick the first failed request to associate the error with */ + if (error != MPI_SUCCESS) { + if (NULL == error_object.comm) { + error_object = request->req_mpi_object; + error_object_type = request->req_type; + status = error; + } +#if OPAL_ENABLE_FT_MPI + /* Free request similar to ompi_errhandler_request_invoke */ + if (MPI_ERR_PROC_FAILED_PENDING != request->req_status.MPI_ERROR) +#endif /* OPAL_ENABLE_FT_MPI */ + { + /* free the request and reset it in the array */ + ompi_request_free(&request); + cont->cont_opreqs[i] = MPI_REQUEST_NULL; + } + } + opal_free_list_return(&ompi_request_cont_data_freelist, &req_cont_data->super); + } + } + /* wait for other threads in request_completion_cb to decrement the counter */ + cont->cont_request_check = 0; + while (cont->cont_num_active != 1) { } + } + opal_list_remove_item(&cont_req->cont_incomplete_list, &cont->super.super); + + if (cont->cont_invoke_failed) { + /* make sure all requests have completed and enqueue the continuation for execution */ + cont->cont_rc = status; + if (0 == OPAL_THREAD_ADD_FETCH32(&cont->cont_num_active, -1)) { + ompi_continue_enqueue_runnable(cont); + } + } else { + cont->cont_num_active = 0; + /* put the continuation into the list of failed continuations */ + opal_list_append(&cont_req->cont_failed_list, &cont->super.super); + int32_t num_active = OPAL_THREAD_ADD_FETCH32(&cont_req->cont_num_active, -1); + if (MPI_SUCCESS == cont_req->super.req_status.MPI_ERROR) { + cont_req->super.req_status.MPI_ERROR = status; + cont_req->cont_errorinfo.mpi_object = error_object; + cont_req->cont_errorinfo.type = error_object_type; + } + + if (0 == num_active) { + opal_atomic_wmb(); + ompi_request_complete(&cont_req->super, true); + } + } + if (!have_cont_req_lock) { + opal_atomic_unlock(&cont_req->cont_lock); + } +} + +static int request_completion_cb(ompi_request_t *request) +{ + assert(NULL != request->req_complete_cb_data); + ompi_request_cont_data_t *req_cont_data; + + /* atomically swap the pointer here to avoid race with ompi_continue_global_wakeup */ + req_cont_data = (ompi_request_cont_data_t *)OPAL_THREAD_SWAP_PTR(&request->req_complete_cb_data, 0x0); + + if (NULL == req_cont_data) { + /* the wakeup call took away our callback data */ + return 1; + } + + ompi_continuation_t *cont = req_cont_data->cont_obj; + req_cont_data->cont_obj = NULL; + + /* set the status object */ + if (NULL != req_cont_data->cont_status) { + OMPI_COPY_STATUS(req_cont_data->cont_status, request->req_status, true); + req_cont_data->cont_status = NULL; + } + + int32_t failed_tmp = 0; + if (request->req_status.MPI_ERROR == MPI_SUCCESS) { + + /* inactivate / free the request */ + if (request->req_persistent) { + request->req_state = OMPI_REQUEST_INACTIVE; + } else { + if (NULL != cont->cont_opreqs) { + cont->cont_opreqs[req_cont_data->cont_idx] = MPI_REQUEST_NULL; + } + /* wait for any thread in the failure handler to complete handling the requests */ + while (cont->cont_request_check) {} + /* we own the request so release it and let the caller know */ + ompi_request_free(&request); + } + opal_atomic_wmb(); + + if (1 == cont->cont_num_active || 0 == OPAL_THREAD_ADD_FETCH32(&cont->cont_num_active, -1)) { + /* the continuation is ready for execution */ + cont->cont_num_active = 0; + ompi_continue_enqueue_runnable(cont); + } + + } else if (opal_atomic_compare_exchange_strong_32(&cont->cont_failed, &failed_tmp, 1)) { + /* We're the first, go ahead and handle fault */ + handle_failed_cont(cont, request->req_status.MPI_ERROR, false); + } else { + /* someone else handles the fault, so just signal that we're done with the continuation object */ + if (1 == cont->cont_num_active || 0 == OPAL_THREAD_ADD_FETCH32(&cont->cont_num_active, -1)) { + /* we're responsible for enqueuing the continuation for execution if: + * 1) we don't have access to the requests (handle_failed_cont couldn't handle all pending requests); and + * 2) this is the last outstanding request + */ + cont->cont_num_active = 0; + ompi_continue_enqueue_runnable(cont); + } + } + + opal_free_list_return(&ompi_request_cont_data_freelist, &req_cont_data->super); + + return 1; +} + +/* release all continuations, either by checking the requests for failure or just marking + * the continuation as failed if the requests are not available */ +int ompi_continue_global_wakeup(int status) { + opal_mutex_atomic_lock(&cont_req_list_mtx); + opal_list_item_t *item; + while (NULL != (item = opal_list_remove_first(&cont_req_list))) { + + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)(uintptr_t)item - offsetof(ompi_cont_request_t, cont_list_item); + opal_atomic_lock(&cont_req->cont_lock); + ompi_continuation_t *cont; + OPAL_LIST_FOREACH(cont, &cont_req->cont_incomplete_list, ompi_continuation_t) { + int32_t failed_tmp = 0; + if (NULL != cont->cont_opreqs) { + /* check for failed requests */ + for (int i = 0; i < cont->cont_num_opreqs; ++i) { + ompi_request_t *request = cont->cont_opreqs[i]; + if (MPI_REQUEST_NULL == request) continue; + if (ompi_request_is_failed(request) && opal_atomic_compare_exchange_strong_32(&cont->cont_failed, &failed_tmp, 1)) { + handle_failed_cont(cont, status, true); + break; + } + } + } else if (opal_atomic_compare_exchange_strong_32(&cont->cont_failed, &failed_tmp, 1)) { + /* we don't have the requests but still need to let the continuation request know */ + handle_failed_cont(cont, status, true); + } + } + opal_atomic_unlock(&cont_req->cont_lock); + } + + opal_mutex_atomic_unlock(&cont_req_list_mtx); + return OMPI_SUCCESS; +} + +int ompi_continue_attach( + ompi_request_t *continuation_request, + const int count, + ompi_request_t *requests[], + MPIX_Continue_cb_function *cont_cb, + void *cont_data, + int flags, + ompi_status_public_t statuses[]) +{ + if (OMPI_REQUEST_CONT != continuation_request->req_type) { + return OMPI_ERR_REQUEST; + } + + bool req_volatile = (flags & MPIX_CONT_REQBUF_VOLATILE); + bool defer_complete = (flags & MPIX_CONT_DEFER_COMPLETE); + bool invoke_failed = (flags & MPIX_CONT_INVOKE_FAILED) | (flags & MPIX_CONT_REQBUF_VOLATILE); + + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)continuation_request; + ompi_continuation_t *cont = ompi_continue_cont_create(count, cont_req, cont_cb, + cont_data, statuses, req_volatile); + + bool reset_requests = true; + + if (req_volatile) { + /* we cannot use the request buffer afterwards */ + cont->cont_opreqs = NULL; + } else { + cont->cont_opreqs = requests; + cont->cont_num_opreqs = count; + reset_requests = false; + } + + cont->cont_invoke_failed = invoke_failed; + + /* memory barrier to make sure a thread completing a request see + * a correct continuation object */ + opal_atomic_wmb(); + + int32_t num_registered = 0; + for (int i = 0; i < count; ++i) { + ompi_request_t *request = requests[i]; + if (MPI_REQUEST_NULL == request) { + /* set the status for null-request */ + if (statuses != MPI_STATUSES_IGNORE) { + OMPI_COPY_STATUS(&statuses[i], ompi_status_empty, true); + } + } else { + if (&ompi_request_empty == request) { + /* empty request: do not modify, just copy out the status */ + if (statuses != MPI_STATUSES_IGNORE) { + OMPI_COPY_STATUS(&statuses[i], request->req_status, true); + } + requests[i] = MPI_REQUEST_NULL; + } else { + ompi_request_cont_data_t *req_cont_data; + req_cont_data = (ompi_request_cont_data_t *)request->req_complete_cb_data; + if (!req_cont_data) { + req_cont_data = (ompi_request_cont_data_t *)opal_free_list_get(&ompi_request_cont_data_freelist); + /* NOTE: request->req_complete_cb_data will be set in ompi_request_set_callback */ + } else { + assert(request->req_type == OMPI_REQUEST_CONT); + } + req_cont_data->cont_status = NULL; + if (statuses != MPI_STATUSES_IGNORE) { + req_cont_data->cont_status = &statuses[i]; + } + + req_cont_data->cont_idx = i; + + req_cont_data->cont_obj = cont; + + opal_atomic_wmb(); + + ompi_request_set_callback(request, &request_completion_cb, req_cont_data); + ++num_registered; + + /* take ownership of any non-persistent request */ + if (!request->req_persistent && reset_requests) + { + requests[i] = MPI_REQUEST_NULL; + } + } + } + } + + assert(count >= num_registered); + int num_complete = count - num_registered; + int32_t last_num_active = count; + if (0 == num_registered) { + /* all requests were complete */ + cont->cont_num_active = 0; + last_num_active = 0; + } else if (num_complete > 0) { + /* some requests were complete */ + last_num_active = OPAL_THREAD_ADD_FETCH32(&cont->cont_num_active, -num_complete); + } + if (0 == last_num_active) { + if (defer_complete || OMPI_REQUEST_INACTIVE == cont_req->super.req_state) { + /* enqueue for later processing */ + ompi_continue_enqueue_runnable(cont); + } else { + /** + * Execute the continuation immediately + */ + if (!req_volatile) { + const bool using_threads = opal_using_threads(); + if (using_threads) { + opal_atomic_lock(&cont_req->cont_lock); + } + opal_list_remove_item(&cont_req->cont_incomplete_list, &cont->super.super); + if (using_threads) { + opal_atomic_unlock(&cont_req->cont_lock); + } + } + ompi_continue_cont_invoke(cont); + } + } + + return OMPI_SUCCESS; +} + +/** + * Continuation request management + */ +int ompi_continue_allocate_request( + ompi_request_t **cont_req_ptr, + int flags, + int max_poll, + ompi_info_t *info) +{ + ompi_cont_request_t *cont_req = OBJ_NEW(ompi_cont_request_t); + + if (OPAL_LIKELY(NULL != cont_req)) { + + cont_req->cont_flags = flags; + + if (cont_req->cont_flags & MPIX_CONT_POLL_ONLY) { + cont_req->cont_complete_list = OBJ_NEW(opal_list_t); + } + + cont_req->continue_max_poll = max_poll; + if (0 == cont_req->continue_max_poll) { + cont_req->continue_max_poll = UINT32_MAX; + } + *cont_req_ptr = &cont_req->super; + + opal_mutex_atomic_lock(&cont_req_list_mtx); + opal_list_append(&cont_req_list, &cont_req->cont_list_item); + opal_mutex_atomic_unlock(&cont_req_list_mtx); + + return MPI_SUCCESS; + } + + return OMPI_ERR_OUT_OF_RESOURCE; +} + +static int ompi_continue_request_start(size_t count, ompi_request_t** cont_req_ptr) +{ + bool lock_continuation_list = false; + const bool using_threads = opal_using_threads(); + /* check whether we need the global continuation list mutex */ + if (using_threads) { + for (size_t i = 0; i < count; ++i) { + ompi_cont_request_t *cont_req = (ompi_cont_request_t*)cont_req_ptr[i]; + if (NULL == cont_req->cont_complete_list) { + lock_continuation_list = true; + break; + } + } + if (lock_continuation_list) { + opal_mutex_atomic_lock(&request_cont_lock); + } + } + + for (size_t i = 0; i < count; ++i) { + ompi_cont_request_t *cont_req = (ompi_cont_request_t*)cont_req_ptr[i]; + if (cont_req->super.req_state != OMPI_REQUEST_INACTIVE) { + if (lock_continuation_list) { + opal_mutex_atomic_unlock(&request_cont_lock); + } + return OMPI_ERR_REQUEST; + } + + /* lock here to ensure consistency with the update of req_state */ + if (using_threads) { + opal_atomic_lock(&cont_req->cont_lock); + } + //OMPI_REQUEST_INIT(&cont_req->super, true); + if (NULL == cont_req->cont_complete_list) { + opal_list_join(&continuation_list, + opal_list_get_begin(&continuation_list), + &cont_req->cont_complete_defer_list); + } + cont_req->super.req_state = OMPI_REQUEST_ACTIVE; + /* TODO: is it correct to mark the cont_req pending once we + * register a new operation request? + */ + //cont_req->super.req_complete = REQUEST_PENDING; + if (using_threads) { + opal_atomic_unlock(&cont_req->cont_lock); + } + } + + if (lock_continuation_list) { + opal_mutex_atomic_unlock(&request_cont_lock); + } + return OMPI_SUCCESS; +} + +static int ompi_continue_request_free(ompi_request_t** cont_req_ptr) +{ + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)*cont_req_ptr; + assert(OMPI_REQUEST_CONT == cont_req->super.req_type); + + const bool using_threads = opal_using_threads(); + if (using_threads) { + opal_mutex_atomic_lock(&cont_req_list_mtx); + } + opal_list_remove_item(&cont_req_list, &cont_req->cont_list_item); + if (using_threads) { + opal_mutex_atomic_unlock(&cont_req_list_mtx); + } + + OBJ_RELEASE(cont_req); + *cont_req_ptr = &ompi_request_null.request; + return OMPI_SUCCESS; +} + + +int ompi_continue_get_failed( + MPI_Request req, + int *count, + void **cb_data) +{ + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; + + opal_atomic_lock(&cont_req->cont_lock); + + int i; + for (i = 0; i < *count; ++i) { + ompi_continuation_t *cont = (ompi_continuation_t*)opal_list_remove_first(&cont_req->cont_failed_list); + if (NULL == cont) { + break; + } + cb_data[i] = cont->cont_data; + OBJ_RELEASE(cont_req); + +#ifdef OPAL_ENABLE_DEBUG + cont->cont_cb = NULL; + cont->cont_data = NULL; + cont->cont_req = NULL; + opal_atomic_wmb(); +#endif // OPAL_ENABLE_DEBUG + opal_free_list_return(&ompi_continuation_freelist, &cont->super); + } + + if (opal_list_is_empty(&cont_req->cont_failed_list)) { + cont_req->super.req_status.MPI_ERROR = MPI_SUCCESS; + } + + *count = i; + opal_atomic_unlock(&cont_req->cont_lock); + + return OMPI_SUCCESS; +} + + +void ompi_continue_get_error_info( + struct ompi_request_t *req, + ompi_mpi_object_t *mpi_object, + int *mpi_object_type) +{ + ompi_cont_request_t *cont_req = (ompi_cont_request_t *)req; + *mpi_object = cont_req->cont_errorinfo.mpi_object; + *mpi_object_type = cont_req->cont_errorinfo.type; +} + + +static inline __opal_attribute_always_inline__ +int ompi_continue_check_errhandler_abort(ompi_errhandler_t* errhandler) +{ + /* it's safe to use binary OR here, safes a jmp */ + return ((errhandler == &ompi_mpi_errors_are_fatal.eh) | (errhandler == &ompi_mpi_errors_abort.eh)); +} + +static inline +int ompi_continue_check_request_error_abort(ompi_request_t *req) +{ + ompi_mpi_object_t obj = req->req_mpi_object; + switch (req->req_type) { + case OMPI_REQUEST_PART: + case OMPI_REQUEST_COLL: + case OMPI_REQUEST_PML: + case OMPI_REQUEST_COMM: // partitioned, coll, p2p, and comm duplication requests have a communicator set + return ompi_continue_check_errhandler_abort(obj.comm->error_handler); + case OMPI_REQUEST_IO: // file IO requests have a file set + return ompi_continue_check_errhandler_abort(obj.file->error_handler); + case OMPI_REQUEST_WIN: // RMA requests have a window set + return ompi_continue_check_errhandler_abort(obj.win->error_handler); + case OMPI_REQUEST_GEN: + case OMPI_REQUEST_CONT: // continuation and generalized requests fail on MPI_COMM_SELF + return ompi_continue_check_errhandler_abort(ompi_mpi_comm_self.comm.error_handler); + case OMPI_REQUEST_NOOP: + case OMPI_REQUEST_NULL: + case OMPI_REQUEST_MAX: + /** + * NOTE: not using the default label so a warning is triggered if a new request type is introduced + * NULL and NOOP requests do not fail so signal that they are safe to assume they would abort + */ + return true; + } +} diff --git a/ompi/mpiext/continue/c/continuation.h b/ompi/mpiext/continue/c/continuation.h new file mode 100644 index 00000000000..d77e140637f --- /dev/null +++ b/ompi/mpiext/continue/c/continuation.h @@ -0,0 +1,105 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef OMPI_CONTINUATION_H +#define OMPI_CONTINUATION_H + +#include "ompi_config.h" +#include "ompi/info/info.h" +#include "ompi/request/mpi_object.h" +#include "mpi.h" +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + + +struct ompi_request_t; +struct ompi_wait_sync_t; + +BEGIN_C_DECLS + +/** + * Initialize the user-callback infrastructure. + */ +int ompi_continuation_init(void); + +/** + * Finalize the user-callback infrastructure. + */ +int ompi_continuation_fini(void); + +/** + * Register a request with local completion list for progressing through + * the progress engine. + */ +int ompi_continue_register_request_progress(struct ompi_request_t *cont_req, struct ompi_wait_sync_t *sync); + +/** + * Deregister a request with local completion list from progressing through + * the progress engine. + */ +int ompi_continue_deregister_request_progress(struct ompi_request_t *cont_req); + +/** + * Progress a continuation request that has local completions. + */ +int ompi_continue_progress_request(struct ompi_request_t *cont_req); + +/** + * Wakeup all outstanding continuations and check for errors in the requests. + * Only supported if ULFM is enabled. + */ +int ompi_continue_global_wakeup(int status); + +/** + * Get the callback data for failed continuations. Count specifies the number of + * elements in `cb_data`. If `*count` is smaller than the actual number then + * count is updated to the actual number and the content of cb_data is not modified. + * Otherwise the callback data of failed continuations is put into cb_data + * and the count is updated to reflect the actual number of elements. + */ +int ompi_continue_get_failed( + MPI_Request cont_req, + int *count, + void **cb_data); + +/** + * Attach a continuation to a set of operations represented by \c requests. + * The \c statuses will be set before the \c cont_cb callback is invoked and + * passed together with \c cont_data to the callback. Passing \c MPI_STATUSES_IGNORE + * is valid, in which case statuses are ignored. + * The continuation is registered with the continuation request \c cont_req, which + * can be used to query for and progress outstanding continuations. + */ +int ompi_continue_attach( + struct ompi_request_t *cont_req, + int count, + struct ompi_request_t *requests[], + MPIX_Continue_cb_function *cont_cb, + void *cont_data, + int flags, + ompi_status_public_t statuses[]); + + +/** + * Allocate a new continuation request. + */ +int ompi_continue_allocate_request(struct ompi_request_t **cont_req, int max_poll, int flags, ompi_info_t *info); + +/** + * Query the object and object type attached to a failed continuation request. + */ +void ompi_continue_get_error_info(struct ompi_request_t *cont_req, ompi_mpi_object_t *mpi_object, int *mpi_object_type); + +END_C_DECLS + +#endif // OMPI_CONTINUATION_H diff --git a/ompi/mpiext/continue/c/continue.c b/ompi/mpiext/continue/c/continue.c new file mode 100644 index 00000000000..f97f2d2ae48 --- /dev/null +++ b/ompi/mpiext/continue/c/continue.c @@ -0,0 +1,66 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/memchecker.h" + +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +#if OMPI_BUILD_MPI_PROFILING +#if OPAL_HAVE_WEAK_SYMBOLS +#pragma weak MPIX_Continue = PMPIX_Continue +#endif +#define MPIX_Continue PMPIX_Continue +#endif + +static const char FUNC_NAME[] = "MPIX_Continue"; + +int MPIX_Continue( + MPI_Request *request, + MPIX_Continue_cb_function *cont_cb, + void *cb_data, + int flags, + MPI_Status *status, + MPI_Request cont_req) +{ + int rc; + + MEMCHECKER( + memchecker_request(request); + ); + + if (MPI_PARAM_CHECK) { + rc = MPI_SUCCESS; + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (NULL == request) { + rc = MPI_ERR_REQUEST; + } + if (MPI_REQUEST_NULL == cont_req || OMPI_REQUEST_CONT != cont_req->req_type) { + rc = MPI_ERR_REQUEST; + } + OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME); + } + + rc = ompi_continue_attach(cont_req, 1, request, cont_cb, cb_data, flags, + MPI_STATUS_IGNORE == status ? MPI_STATUSES_IGNORE : status); + + OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME); +} diff --git a/ompi/mpiext/continue/c/continue_get_failed.c b/ompi/mpiext/continue/c/continue_get_failed.c new file mode 100644 index 00000000000..811adec30ca --- /dev/null +++ b/ompi/mpiext/continue/c/continue_get_failed.c @@ -0,0 +1,55 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/memchecker.h" + +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +#if OMPI_BUILD_MPI_PROFILING +#if OPAL_HAVE_WEAK_SYMBOLS +#pragma weak MPIX_Continue_get_failed = MPIX_Continue_get_failed +#endif +#define MPIX_Continue_get_failed MPIX_Continue_get_failed +#endif + +static const char FUNC_NAME[] = "MPIX_Continue_get_failed"; + +int MPIX_Continue_get_failed( + MPI_Request cont_req, + int *count, + void **cb_data) +{ + int rc = MPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + rc = MPI_SUCCESS; + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (NULL == cont_req) { + rc = MPI_ERR_ARG; + } + OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME); + } + + rc = ompi_continue_get_failed(cont_req, count, cb_data); + + OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME); +} diff --git a/ompi/mpiext/continue/c/continue_init.c b/ompi/mpiext/continue/c/continue_init.c new file mode 100644 index 00000000000..1be127ca559 --- /dev/null +++ b/ompi/mpiext/continue/c/continue_init.c @@ -0,0 +1,62 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/memchecker.h" + +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +#if OMPI_BUILD_MPI_PROFILING +#if OPAL_HAVE_WEAK_SYMBOLS +#pragma weak MPIX_Continue_init = PMPIX_Continue_init +#endif +#define MPIX_Continue_init PMPIX_Continue_init +#endif + +static const char FUNC_NAME[] = "MPIX_Continue_init"; + +int MPIX_Continue_init( + int max_poll, + int flags, + MPI_Info info, + MPI_Request *cont_req) +{ + int rc = MPI_SUCCESS; + + if (MPI_PARAM_CHECK) { + rc = MPI_SUCCESS; + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (NULL == cont_req) { + rc = MPI_ERR_ARG; + } + OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME); + } + + + ompi_request_t *res; + rc = ompi_continue_allocate_request(&res, max_poll, flags, info); + + if (MPI_SUCCESS == rc) { + *cont_req = res; + } + + OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME); +} diff --git a/ompi/mpiext/continue/c/continueall.c b/ompi/mpiext/continue/c/continueall.c new file mode 100644 index 00000000000..5f43c34556b --- /dev/null +++ b/ompi/mpiext/continue/c/continueall.c @@ -0,0 +1,77 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" +#include + +#include "ompi/mpi/c/bindings.h" +#include "ompi/runtime/params.h" +#include "ompi/communicator/communicator.h" +#include "ompi/errhandler/errhandler.h" +#include "ompi/mpiext/continue/c/continuation.h" +#include "ompi/memchecker.h" + +#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +#if OMPI_BUILD_MPI_PROFILING +#if OPAL_HAVE_WEAK_SYMBOLS +#pragma weak MPIX_Continueall = PMPIX_Continueall +#endif +#define MPIX_Continueall PMPIX_Continueall +#endif + +static const char FUNC_NAME[] = "MPIX_Continueall"; + +int MPIX_Continueall( + int count, + MPI_Request requests[], + MPIX_Continue_cb_function *cont_cb, + void *cont_data, + int flags, + MPI_Status statuses[], + MPI_Request cont_req) +{ + int rc; + + MEMCHECKER( + for (int j = 0; j < count; j++){ + memchecker_request(&requests[j]); + } + ); + + + if (MPI_PARAM_CHECK) { + rc = MPI_SUCCESS; + OMPI_ERR_INIT_FINALIZE(FUNC_NAME); + if (MPI_REQUEST_NULL == cont_req || OMPI_REQUEST_CONT != cont_req->req_type) { + rc = MPI_ERR_REQUEST; + } + if( (NULL == requests) && (0 != count) ) { + rc = MPI_ERR_REQUEST; + } else { + for (int i = 0; i < count; i++) { + if (NULL == requests[i]) { + rc = MPI_ERR_REQUEST; + break; + } + } + } + OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME); + } + + rc = ompi_continue_attach(cont_req, count, requests, cont_cb, + cont_data, flags, statuses); + + OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME); +} diff --git a/ompi/mpiext/continue/c/help-mpi-continue.txt b/ompi/mpiext/continue/c/help-mpi-continue.txt new file mode 100644 index 00000000000..f7cf4598b4b --- /dev/null +++ b/ompi/mpiext/continue/c/help-mpi-continue.txt @@ -0,0 +1,16 @@ +# -*- text -*- +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# +# This is the US/English general help file for the OMPI Continuations extension. +# +[continue:incomplete_shutdown] +WARNING: Found %zu incomplete continuations during shutdown! +# diff --git a/ompi/mpiext/continue/c/mpiext_continue_c.h b/ompi/mpiext/continue/c/mpiext_continue_c.h new file mode 100644 index 00000000000..2b9e21f7e84 --- /dev/null +++ b/ompi/mpiext/continue/c/mpiext_continue_c.h @@ -0,0 +1,132 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2020 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +#ifndef MPIEXT_CONTINUE_C_H +#define MPIEXT_CONTINUE_C_H + +#include + +/** + * Mark the continuation request(s) as volatile. + * Generally, the request buffer should remain accessible until the continuation is invoked + * and will be set to MPI_REQUEST_NULL before the continuation is invoked. + * However, if this flag is specified the requests are not accessed after the call to + * MPIX_Continue[all] returns. + * + * NOTE: this flag is deprecated, use MPIX_CONT_REQUESTS_FREE + */ +#define MPIX_CONT_REQBUF_VOLATILE (1<<0) + +/** + * If passed to MPIX_Continue[all], MPI will assume that the memory holding these + * requests will be freed after the call and MPI will not attempt to access that memory. + * All requests passed to MPI_Continue[all] will be freed before the call returns. + */ +#define MPIX_CONT_REQUESTS_FREE (1<<1) + +/* + * If passed to MPIX_Continue_init, marks the continuation request as poll-only, + * i.e., only execute continuations when testing/waiting for the continuation + * request to complete. + */ +#define MPIX_CONT_POLL_ONLY (1<<2) + +/* Whether the execution of continuations is deferred in MPIX_Continue or + * MPIX_Continueall if all operations are complete. + * By default, continuations eligible for execution are invoked immediately + * if the continuation request is active. */ +#define MPIX_CONT_DEFER_COMPLETE (1<<3) + +/* Whether failed continuations will be invoked and passed the error code. + * If passed to MPIX_Continue[all] and an error occurs on any of the + * associated operations the callback will be invoked and the error code + * of the first failed request passed as the first argument. Error codes + * for all other operations are available in the associated status objects. + */ +#define MPIX_CONT_INVOKE_FAILED (1<<4) + +/** + * Completion callback signature: + * \param rc an error code (MPI_SUCCESS, unless MPIX_CONT_INVOKE_FAILED is provided) + * \param cb_data the pointer passed as cb_data to MPI_Continue[all] + * \returns MPI_SUCECSS on success, an error code to mark the continuation as failed + */ +typedef int (MPIX_Continue_cb_function)(int rc, void *cb_data); + +/** + * Initialize a continuation request. The request can be used when attaching continuation to one or more + * operation requests (\sa MPIX_Continue and \sa MPIX_Continueall). The request must be active for + * continuation callbacks registered with it to be executed, i.e., the request must be started (e.g., using MPI_Start) + * before callbacks are executed. + * + * \param flags 0 or \ref MPIX_CONT_POLL_ONLY + * \param max_poll the maximum number of continuations to execute when testing + * the continuation request for completion or zero for + * unlimited execution of eligible continuations + * \param info info object used to further control the behavior of the continuation request. + * Currently supported: + * - mpi_continue_thread: either "all" (any thread in the process may execute callbacks) + * or "application" (only application threads may execute callbacks; default) + * - mpi_continue_async_signal_safe: whether the callbacks may be executed from within a signal handler + * \param[out] cont_req the newly created continuation request + */ +OMPI_DECLSPEC int MPIX_Continue_init(int flags, int max_poll, MPI_Info info, MPI_Request *cont_req); + +/** + * Attach a new continuation to the operation represented by \c request and register it with the continuation request \c cont_req. + * The callback will be executed once the operation has completed and will be passed the \c cb_data pointer. + * + * \param request the request representing the the operation to attach a continuation to + * \param cb the callback to invoke upon completion, with signature \ref MPIX_Continue_cb_function + * \param cb_data the user-data to pass to the callback + * \param flags 0 or OR-combination of \ref MPIX_CONT_REQBUF_VOLATILE, + * \ref MPIX_CONT_DEFER_COMPLETE, \ref MPIX_CONT_INVOKE_FAILED + * \param status MPI_STATUS_IGNORE or a pointer to a status object that will be a filled before the callback is invoked + * \param cont_req a continuation request created through \ref MPIX_Continue_init + */ +OMPI_DECLSPEC int MPIX_Continue(MPI_Request *request, MPIX_Continue_cb_function *cb, void *cb_data, + int flags, MPI_Status *status, MPI_Request cont_req); + +/** + * Attach a new continuation to the operations represented by the \c count \c requests and + * register it with the continuation request \c cont_req. + * The callback will be executed once the operations have completed and will be passed the \c cb_data pointer. + * + * \param count the number of requests in \c requests + * \param requests the requests representing the the operations to attach a continuation to + * \param cb the callback to invoke upon completion of all operations, with signature \ref MPIX_Continue_cb_function + * \param cb_data the user-data to pass to the callback + * \param flags 0 or OR-combination of \ref MPIX_CONT_REQBUF_VOLATILE, + * \ref MPIX_CONT_DEFER_COMPLETE, \ref MPIX_CONT_INVOKE_FAILED + * \param status MPI_STATUS_IGNORE or a pointer to a status object that will be a filled before the callback is invoked + * \param cont_req a continuation request created through \ref MPIX_Continue_init + */ +OMPI_DECLSPEC int MPIX_Continueall(int count, MPI_Request requests[], MPIX_Continue_cb_function *cb, void *cb_data, + int flags, MPI_Status status[], MPI_Request cont_req); + +/** + * Query the callback data for failed continuations, i.e., continuations that returned a value other than + * MPI_SUCCESS or whose operations experienced an error. + * The applications passes in \c cb_data an array of size \c count. Upon return, \c count will be set + * to the actual number of elements stored in \c cb_data. If the resulting \c count equals \c count + * on input there may be more failed continuations to query and the call should be repeated. + * \note Handling of failed continuations requires an error handler for the involved operations that does not abort and + * is not supported if \ref MPIX_CONT_REQBUF_VOLATILE is used. + * + * \param cont_req The continuation request from which to query failed continuations + * \param[inout] count The maximum number of elements to be stored in \c cb_data + * \param cb_data Buffer of size \c count elements to store the callback data of failed continuations into + */ +OMPI_DECLSPEC int MPIX_Continue_get_failed(MPI_Request cont_req, int *count, void **cb_data); + +#endif // MPIEXT_CONTINUE_C_H diff --git a/ompi/mpiext/continue/c/mpiext_continue_module.c b/ompi/mpiext/continue/c/mpiext_continue_module.c new file mode 100644 index 00000000000..05db810d6de --- /dev/null +++ b/ompi/mpiext/continue/c/mpiext_continue_module.c @@ -0,0 +1,26 @@ +/** + * Copyright (c) 2021 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mpiext/mpiext.h" +#include "ompi/mpiext/continue/c/continuation.h" + +/* + * Similar to Open MPI components, a well-known struct provides + * function pointers to the extension's init/fini hooks. The struct + * must be a global symbol of the form ompi_mpiext_ and be + * of type ompi_mpiext_component_t. + */ +ompi_mpiext_component_t ompi_mpiext_continue = { + ompi_continuation_init, + ompi_continuation_fini +}; diff --git a/ompi/mpiext/continue/c/profile/Makefile.am b/ompi/mpiext/continue/c/profile/Makefile.am new file mode 100644 index 00000000000..1eab74e4be6 --- /dev/null +++ b/ompi/mpiext/continue/c/profile/Makefile.am @@ -0,0 +1,37 @@ +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# OMPI_BUILD_MPI_PROFILING is enabled when we want our generated MPI_* symbols +# to be replaced by PMPI_*. +# In this directory, we need it to be 0 + +AM_CPPFLAGS = -DOMPI_BUILD_MPI_PROFILING=1 + +noinst_LTLIBRARIES = libpmpiext_continue_c.la + +nodist_libpmpiext_continue_c_la_SOURCES = \ + pcontinue.c \ + pcontinueall.c \ + pcontinue_init.c + +# +# Sym link in the sources from the real MPI directory +# +$(nodist_libpmpiext_continue_c_la_SOURCES): + $(OMPI_V_LN_S) if test ! -r $@ ; then \ + pname=`echo $@ | cut -b '2-'` ; \ + $(LN_S) $(top_srcdir)/ompi/mpiext/continue/c/$$pname $@ ; \ + fi + + +# These files were created by targets above + +MAINTAINERCLEANFILES = $(nodist_libpmpiext_continue_c_la_SOURCES) diff --git a/ompi/mpiext/continue/configure.m4 b/ompi/mpiext/continue/configure.m4 new file mode 100644 index 00000000000..1907dfa8767 --- /dev/null +++ b/ompi/mpiext/continue/configure.m4 @@ -0,0 +1,35 @@ +# -*- shell-script -*- +# +# Copyright (c) 2021 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# OMPI_MPIEXT_continue_CONFIG([action-if-found], [action-if-not-found]) +# ----------------------------------------------------------- +AC_DEFUN([OMPI_MPIEXT_continue_CONFIG],[ + AC_CONFIG_FILES([ompi/mpiext/continue/Makefile]) + AC_CONFIG_FILES([ompi/mpiext/continue/c/Makefile]) + AC_CONFIG_FILES([ompi/mpiext/continue/c/profile/Makefile]) + + # This module is not stable yet so it should only be built + # if explicitly requested + AS_IF([test "$ENABLE_continue" = "1"], + [$1], + [$2]) +])dnl + +# we need init/finalize +AC_DEFUN([OMPI_MPIEXT_continue_NEED_INIT], [1]) + +AC_DEFUN([OMPI_MPIEXT_continue_POST_CONFIG], [ + AS_IF([test "$ENABLE_continue" = "1"], + [AC_DEFINE_UNQUOTED([OMPI_HAVE_MPI_EXT_CONTINUE], [1], + [Whether MPI Continuations are enabled])], + []) +]) diff --git a/ompi/mpiext/continue/owner.txt b/ompi/mpiext/continue/owner.txt new file mode 100644 index 00000000000..bc6c5ecb028 --- /dev/null +++ b/ompi/mpiext/continue/owner.txt @@ -0,0 +1,7 @@ +# +# owner/status file +# owner: institution that is responsible for this package +# status: e.g. active, maintenance, unmaintained +# +owner: UTK +status: active diff --git a/ompi/request/mpi_object.h b/ompi/request/mpi_object.h new file mode 100644 index 00000000000..e91d135e1e9 --- /dev/null +++ b/ompi/request/mpi_object.h @@ -0,0 +1,64 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2022 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2017 Cisco Systems, Inc. All rights reserved + * Copyright (c) 2009-2012 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2012 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2015-2017 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2018 FUJITSU LIMITED. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + * Union holding pointers to communicators, windows, and files + * + */ + +#ifndef OMPI_MPI_OBJECT_H +#define OMPI_MPI_OBJECT_H + +#include "ompi_config.h" + +BEGIN_C_DECLS + +/** + * Forward declaration + */ +struct ompi_communicator_t; + +/** + * Forward declaration + */ +struct ompi_win_t; + +/** + * Forward declaration + */ +struct ompi_file_t; + +/** + * Union for holding several different MPI pointer types on the request + */ +typedef union ompi_mpi_object_t { + struct ompi_communicator_t *comm; + struct ompi_file_t *file; + struct ompi_win_t *win; +} ompi_mpi_object_t; + +END_C_DECLS + +#endif // OMPI_MPI_OBJECT_H diff --git a/ompi/request/req_test.c b/ompi/request/req_test.c index b28ade8a67a..a2bb61114c5 100644 --- a/ompi/request/req_test.c +++ b/ompi/request/req_test.c @@ -26,6 +26,10 @@ #include "ompi/request/request_default.h" #include "ompi/request/grequest.h" +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + int ompi_request_default_test(ompi_request_t ** rptr, int *completed, ompi_status_public_t * status ) @@ -58,6 +62,7 @@ int ompi_request_default_test(ompi_request_t ** rptr, opal_atomic_rmb(); OMPI_COPY_STATUS(status, request->req_status, false); } + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; return request->req_status.MPI_ERROR; @@ -88,7 +93,17 @@ int ompi_request_default_test(ompi_request_t ** rptr, * leaving. We will call the opal_progress only once per call. */ ++do_it_once; - if (0 != opal_progress()) { + int rc = 0; + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuations may elect to not participate in global progress + * so progress them separately. */ + rc = ompi_continue_progress_request(request); + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + + if (rc != 0 || 0 != opal_progress()) { goto recheck_request_status; } } @@ -157,6 +172,14 @@ int ompi_request_default_test_any( return MPI_ERR_PROC_FAILED_PENDING; } #endif /* OPAL_ENABLE_FT_MPI */ + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuations may elect to not participate in global progress + * so progress them separately. */ + ompi_continue_progress_request(request); + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE } /* Only fall through here if we found nothing */ @@ -195,6 +218,15 @@ int ompi_request_default_test_all( num_completed++; continue; } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuations may elect to not participate in global progress + * so progress them separately. */ + ompi_continue_progress_request(request); + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE + #if OPAL_ENABLE_FT_MPI /* Check for dead requests due to process failure */ /* Special case for MPI_ANY_SOURCE */ @@ -218,8 +250,11 @@ int ompi_request_default_test_all( } } #endif /* OPAL_ENABLE_PROGRESS_THREADS */ - /* short-circuit */ + +#if !OMPI_HAVE_MPI_EXT_CONTINUE + /* short-circuit, unless there may be continuation requests */ break; +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ } if (num_completed != count) { @@ -248,6 +283,7 @@ int ompi_request_default_test_all( ompi_grequest_invoke_query(request, &request->req_status); } OMPI_COPY_STATUS(&statuses[i], request->req_status, true); + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; continue; @@ -283,6 +319,7 @@ int ompi_request_default_test_all( if (OMPI_REQUEST_GEN == request->req_type) { ompi_grequest_invoke_query(request, &request->req_status); } + if( request->req_persistent ) { request->req_state = OMPI_REQUEST_INACTIVE; continue; @@ -332,6 +369,7 @@ int ompi_request_default_test_some( indices[num_requests_done++] = i; continue; } + #if OPAL_ENABLE_FT_MPI /* Check for dead requests due to process failure */ /* Special case for MPI_ANY_SOURCE - Error managed below */ @@ -340,6 +378,14 @@ int ompi_request_default_test_some( indices[num_requests_done++] = i; } #endif /* OPAL_ENABLE_FT_MPI */ + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + /* continuations may elect to not participate in global progress + * so progress them separately. */ + ompi_continue_progress_request(request); + } +#endif // OMPI_HAVE_MPI_EXT_CONTINUE } /* diff --git a/ompi/request/req_wait.c b/ompi/request/req_wait.c index d66eccf9f79..4f341026ad5 100644 --- a/ompi/request/req_wait.c +++ b/ompi/request/req_wait.c @@ -31,6 +31,10 @@ #include "ompi/request/request_default.h" #include "ompi/request/grequest.h" +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + int ompi_request_default_wait( ompi_request_t ** req_ptr, ompi_status_public_t * status) @@ -61,6 +65,7 @@ int ompi_request_default_wait( if( MPI_STATUS_IGNORE != status ) { OMPI_COPY_STATUS(status, req->req_status, false); } + if( req->req_persistent ) { if( req->req_state == OMPI_REQUEST_INACTIVE ) { if (MPI_STATUS_IGNORE != status) { @@ -68,6 +73,7 @@ int ompi_request_default_wait( } return OMPI_SUCCESS; } + req->req_state = OMPI_REQUEST_INACTIVE; return req->req_status.MPI_ERROR; } @@ -94,6 +100,9 @@ int ompi_request_default_wait_any(size_t count, int rc = OMPI_SUCCESS; ompi_request_t *request=NULL; ompi_wait_sync_t sync; +#if OMPI_HAVE_MPI_EXT_CONTINUE + bool have_cont_req = false; +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ if (OPAL_UNLIKELY(0 == count)) { *index = MPI_UNDEFINED; @@ -125,6 +134,13 @@ int ompi_request_default_wait_any(size_t count, } } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + have_cont_req = true; + ompi_continue_register_request_progress(request, &sync); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + #if OPAL_ENABLE_FT_MPI if(OPAL_UNLIKELY( ompi_request_is_failed(request) )) { completed = i; @@ -147,6 +163,19 @@ int ompi_request_default_wait_any(size_t count, rc = SYNC_WAIT(&sync); after_sync_wait: + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (have_cont_req) { + have_cont_req = false; + for (i = 0; i < count; i++) { + request = requests[i]; + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_deregister_request_progress(request); + } + } + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + /* recheck the complete status and clean up the sync primitives. * Do it backward to return the earliest complete request to the * user. @@ -181,7 +210,7 @@ int ompi_request_default_wait_any(size_t count, if( *index == (int)completed ) { /* Only one request has triggered. There was no in-flight * completions. Drop the signalled flag so we won't block - * in WAIT_SYNC_RELEASE + * in WAIT_SYNC_RELEASE */ WAIT_SYNC_SIGNALLED(&sync); } @@ -257,6 +286,12 @@ int ompi_request_default_wait_all( size_t count, } } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_register_request_progress(request, &sync); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + #if OPAL_ENABLE_FT_MPI if(OPAL_UNLIKELY( ompi_request_is_failed(request) )) { failed++; @@ -316,6 +351,12 @@ int ompi_request_default_wait_all( size_t count, request = *rptr; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_deregister_request_progress(request); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + if( request->req_state == OMPI_REQUEST_INACTIVE ) { OMPI_COPY_STATUS(&statuses[i], ompi_status_empty, true); continue; @@ -380,6 +421,12 @@ int ompi_request_default_wait_all( size_t count, request = *rptr; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_deregister_request_progress(request); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + if( request->req_state == OMPI_REQUEST_INACTIVE ) { rc = ompi_status_empty.MPI_ERROR; goto absorb_error_and_continue; @@ -498,6 +545,12 @@ int ompi_request_default_wait_some(size_t count, } } +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_register_request_progress(request, &sync); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + #if OPAL_ENABLE_FT_MPI if(OPAL_UNLIKELY( ompi_request_is_failed(request) )) { num_requests_done++; @@ -531,6 +584,12 @@ int ompi_request_default_wait_some(size_t count, request = *rptr; +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == request->req_type) { + ompi_continue_deregister_request_progress(request); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ + if( request->req_state == OMPI_REQUEST_INACTIVE ) { continue; } @@ -538,14 +597,14 @@ int ompi_request_default_wait_some(size_t count, * a) request was found completed in the first loop * => ( indices[i] == 0 ) * b) request was completed between first loop and this check - * => ( indices[i] == 1 ) and we can NOT atomically mark the + * => ( indices[i] == 1 ) and we can NOT atomically mark the * request as pending. * c) request wasn't finished yet - * => ( indices[i] == 1 ) and we CAN atomically mark the + * => ( indices[i] == 1 ) and we CAN atomically mark the * request as pending. * NOTE that in any case (i >= num_requests_done) as latter grows * either slowly (in case of partial completion) - * OR in parallel with `i` (in case of full set completion) + * OR in parallel with `i` (in case of full set completion) */ if( !indices[num_active_reqs] ) { indices[num_requests_done++] = i; diff --git a/ompi/request/request.h b/ompi/request/request.h index e2e62a2d7ad..055979e8598 100644 --- a/ompi/request/request.h +++ b/ompi/request/request.h @@ -42,6 +42,11 @@ #include "opal/mca/threads/wait_sync.h" #include "ompi/constants.h" #include "ompi/runtime/params.h" +#include "ompi/request/mpi_object.h" + +#if OMPI_HAVE_MPI_EXT_CONTINUE +#include "ompi/mpiext/continue/c/continuation.h" +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ BEGIN_C_DECLS @@ -99,30 +104,6 @@ typedef int (*ompi_request_cancel_fn_t)(struct ompi_request_t* request, int flag */ typedef int (*ompi_request_complete_fn_t)(struct ompi_request_t* request); -/** - * Forward declaration - */ -struct ompi_communicator_t; - -/** - * Forward declaration - */ -struct ompi_win_t; - -/** - * Forward declaration - */ -struct ompi_file_t; - -/** - * Union for holding several different MPI pointer types on the request - */ -typedef union ompi_mpi_object_t { - struct ompi_communicator_t *comm; - struct ompi_file_t *file; - struct ompi_win_t *win; -} ompi_mpi_object_t; - /** * Main top-level request struct definition */ @@ -158,6 +139,9 @@ typedef struct ompi_request_t ompi_request_t; #define REQUEST_PENDING (void *)0L #define REQUEST_COMPLETED (void *)1L +#define REQUEST_CB_PENDING (ompi_request_complete_fn_t)0L +#define REQUEST_CB_COMPLETED (ompi_request_complete_fn_t)1L + struct ompi_predefined_request_t { struct ompi_request_t request; char padding[PREDEFINED_REQUEST_PAD - sizeof(ompi_request_t)]; @@ -449,6 +433,11 @@ static inline bool ompi_request_tag_is_collective(int tag) { static inline void ompi_request_wait_completion(ompi_request_t *req) { +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + ompi_continue_progress_request(req); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ if (opal_using_threads ()) { if(!REQUEST_COMPLETE(req)) { void *_tmp_ptr; @@ -466,7 +455,20 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) WAIT_SYNC_INIT(&sync, 1); if (OPAL_ATOMIC_COMPARE_EXCHANGE_STRONG_PTR(&req->req_complete, &_tmp_ptr, &sync)) { +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* let the continuations be processed as part of the global progress loop + * while we're waiting for their completion */ + ompi_continue_register_request_progress(req, &sync); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ SYNC_WAIT(&sync); + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + ompi_continue_deregister_request_progress(req); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ } else { /* completed before we had a chance to swap in the sync object */ WAIT_SYNC_SIGNALLED(&sync); @@ -488,6 +490,13 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) } opal_atomic_rmb(); } else { +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + /* let the continuations be processed as part of the global progress loop + * while we're waiting for their completion */ + ompi_continue_register_request_progress(req, NULL); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ while(!REQUEST_COMPLETE(req)) { opal_progress(); #if OPAL_ENABLE_FT_MPI @@ -498,6 +507,12 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) } #endif /* OPAL_ENABLE_FT_MPI */ } + +#if OMPI_HAVE_MPI_EXT_CONTINUE + if (OMPI_REQUEST_CONT == req->req_type) { + ompi_continue_deregister_request_progress(req); + } +#endif /* OMPI_HAVE_MPI_EXT_CONTINUE */ } } @@ -517,12 +532,13 @@ static inline void ompi_request_wait_completion(ompi_request_t *req) static inline int ompi_request_complete(ompi_request_t* request, bool with_signal) { int rc = 0; - - if(NULL != request->req_complete_cb) { - /* Set the request cb to NULL to allow resetting in the callback */ - ompi_request_complete_fn_t fct = request->req_complete_cb; - request->req_complete_cb = NULL; - rc = fct( request ); + ompi_request_complete_fn_t cb; + cb = (ompi_request_complete_fn_t)OPAL_ATOMIC_SWAP_PTR((opal_atomic_intptr_t*)&request->req_complete_cb, + (intptr_t)REQUEST_CB_COMPLETED); + if (REQUEST_CB_PENDING != cb) { + request->req_complete_cb = REQUEST_CB_PENDING; + opal_atomic_wmb(); + rc = cb(request); } if (0 == rc) { @@ -548,12 +564,18 @@ static inline int ompi_request_set_callback(ompi_request_t* request, void* cb_data) { request->req_complete_cb_data = cb_data; - request->req_complete_cb = cb; - /* If request is completed and the callback is not called, need to call callback */ - if ((NULL != request->req_complete_cb) && (request->req_complete == REQUEST_COMPLETED)) { - ompi_request_complete_fn_t fct = request->req_complete_cb; - request->req_complete_cb = NULL; - return fct( request ); + opal_atomic_wmb(); + if ((REQUEST_CB_COMPLETED == (ompi_request_complete_fn_t)request->req_complete_cb) || + (REQUEST_CB_COMPLETED == (ompi_request_complete_fn_t)OPAL_ATOMIC_SWAP_PTR((opal_atomic_intptr_t*)&request->req_complete_cb, + (intptr_t)cb))) { + if (NULL != cb) { + /* the request was marked at least partially completed, make sure it's fully complete */ + while (!REQUEST_COMPLETE(request)) {} + /* Set the request cb to NULL to allow resetting in the callback */ + request->req_complete_cb = REQUEST_CB_PENDING; + opal_atomic_wmb(); + cb(request); + } } return OMPI_SUCCESS; } diff --git a/ompi/request/request_dbg.h b/ompi/request/request_dbg.h index 5929374ade4..88b583ddfb8 100644 --- a/ompi/request/request_dbg.h +++ b/ompi/request/request_dbg.h @@ -30,6 +30,7 @@ typedef enum { OMPI_REQUEST_NOOP, /**< A request that does nothing (e.g., to PROC_NULL) */ OMPI_REQUEST_COMM, /**< MPI-3 non-blocking communicator duplication */ OMPI_REQUEST_PART, /**< MPI-4 partitioned communication request */ + OMPI_REQUEST_CONT, /**< MPI-X continuation request */ OMPI_REQUEST_MAX /**< Maximum request type */ } ompi_request_type_t; diff --git a/opal/mca/threads/base/wait_sync.c b/opal/mca/threads/base/wait_sync.c index 2458a9614be..08026e56dad 100644 --- a/opal/mca/threads/base/wait_sync.c +++ b/opal/mca/threads/base/wait_sync.c @@ -98,7 +98,15 @@ int ompi_sync_wait_mt(ompi_wait_sync_t *sync) */ check_status: if (sync != opal_threads_base_wait_sync_list && num_thread_in_progress >= opal_max_thread_in_progress) { - opal_thread_internal_cond_wait(&sync->condition, &sync->lock); + if (0 < sync->num_req_need_progress) { + /* release the lock so that we can be signaled */ + opal_thread_internal_mutex_unlock(&sync->lock); + sync->progress_cb(); + /* retake the lock */ + opal_thread_internal_mutex_lock(&sync->lock); + } else { + opal_thread_internal_cond_wait(&sync->condition, &sync->lock); + } /** * At this point either the sync was completed in which case diff --git a/opal/mca/threads/wait_sync.h b/opal/mca/threads/wait_sync.h index c77724bacbb..5752bbede1b 100644 --- a/opal/mca/threads/wait_sync.h +++ b/opal/mca/threads/wait_sync.h @@ -46,6 +46,8 @@ typedef struct ompi_wait_sync_t { opal_thread_internal_mutex_t lock; struct ompi_wait_sync_t *next; struct ompi_wait_sync_t *prev; + opal_progress_callback_t progress_cb; + opal_atomic_int32_t num_req_need_progress; volatile bool signaling; } ompi_wait_sync_t; @@ -119,6 +121,8 @@ static inline int sync_wait_st(ompi_wait_sync_t *sync) opal_thread_internal_cond_init(&(sync)->condition); \ opal_thread_internal_mutex_init(&(sync)->lock, false); \ } \ + (sync)->progress_cb = NULL; \ + (sync)->num_req_need_progress = 0; \ } while (0) /** @@ -141,9 +145,10 @@ OPAL_DECLSPEC void opal_threads_base_wait_sync_global_wakeup_mt(int status); static inline void wait_sync_update(ompi_wait_sync_t *sync, int updates, int status) { if (OPAL_LIKELY(OPAL_SUCCESS == status)) { - if (0 != (OPAL_THREAD_ADD_FETCH32(&sync->count, -updates))) { + if (1 != sync->count && 0 != (OPAL_THREAD_ADD_FETCH32(&sync->count, -updates))) { return; } + sync->count = 0; } else { /* this is an error path so just use the atomic */ sync->status = status; diff --git a/test/continuations/Makefile.am b/test/continuations/Makefile.am new file mode 100644 index 00000000000..75a8ed59aa9 --- /dev/null +++ b/test/continuations/Makefile.am @@ -0,0 +1,33 @@ +# Copyright (c) 2018 Los Alamos National Security, LLC. All rights reserved. +# +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +if PROJECT_OMPI + noinst_PROGRAMS = continuations continuations-mt continuations-persistent + continuations_SOURCES = continuations.c + continuations_LDFLAGS = $(OMPI_PKG_CONFIG_LDFLAGS) + continuations_LDADD = \ + $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(top_builddir)/opal/lib@OPAL_LIB_NAME@.la + + continuations-mt_SOURCES = continuations-mt.c + continuations-mt_LDFLAGS = $(OMPI_PKG_CONFIG_LDFLAGS) + continuations-mt_LDADD = \ + $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(top_builddir)/opal/lib@OPAL_LIB_NAME@.la + + continuations-persistent_SOURCES = continuations-persistent.c + continuations-persistent_LDFLAGS = $(OMPI_PKG_CONFIG_LDFLAGS) + continuations-persistent_LDADD = \ + $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(top_builddir)/opal/lib@OPAL_LIB_NAME@.la +endif # PROJECT_OMPI + +distclean-local: + rm -rf *.dSYM .deps .libs *.log *.o *.trs $(noinst_PROGRAMS) Makefile + diff --git a/test/continuations/continutions-mt.c b/test/continuations/continutions-mt.c new file mode 100644 index 00000000000..b81422a1f74 --- /dev/null +++ b/test/continuations/continutions-mt.c @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2016 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2018 Los Alamos National Security, LLC. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include +#include +#include +#include +#include +#include + +#include "mpi.h" +#include "mpi-ext.h" +//#include "ompi/mpiext/continue/c/mpiext_continue_c.h" + +//#define OMPI_HAVE_MPI_EXT_CONTINUE + +#ifdef OMPI_HAVE_MPI_EXT_CONTINUE + +/* Block a thread on a receive until we release it from the main thread */ +static void* thread_recv(void* data) { + MPI_Request req; + int val; + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Irecv(&val, 1, MPI_INT, rank, 1002, MPI_COMM_WORLD, &req); + MPI_Wait(&req, MPI_STATUS_IGNORE); + return NULL; +} + +static int complete_cnt_cb(int status, void *user_data) { + assert(user_data != NULL); + assert(MPI_SUCCESS == status); + _Atomic int *cb_cnt = (_Atomic int*)user_data; + ++(*cb_cnt); + return MPI_SUCCESS; +} + +int main(int argc, char *argv[]) +{ + MPI_Request cont_req, reqs[2]; + _Atomic int cb_cnt; + int val; + int rank, size; + int provided; + MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); + assert(provided == MPI_THREAD_MULTIPLE); + + MPI_Comm_size(MPI_COMM_WORLD, &size); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + pthread_t thread; + + pthread_create(&thread, NULL, &thread_recv, NULL); + + /* give enough slack to allow the thread to enter the wait + * from now on the thread is stuck in MPI_Wait, owning progress + */ + sleep(2); + + /* initialize the continuation request */ + MPIX_Continue_init(0, 0, MPI_INFO_NULL, &cont_req); + + MPI_Start(&cont_req); + + /** + * One send, one recv, one continuation + */ + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + + cb_cnt = 0; + MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, 0, MPI_STATUSES_IGNORE, cont_req); + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + assert(cb_cnt == 1); + + MPI_Start(&cont_req); + + /** + * One send, one recv, two continuations + */ + cb_cnt = 0; + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPIX_Continue(&reqs[0], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req); + + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + MPIX_Continue(&reqs[1], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req); + + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 2); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + + MPI_Request_free(&cont_req); + + /**************************************************************** + * Do the same thing, but with a poll-only continuation request + ****************************************************************/ + /* initialize the continuation request */ + MPIX_Continue_init(MPIX_CONT_POLL_ONLY, MPI_UNDEFINED, MPI_INFO_NULL, &cont_req); + + MPI_Info_free(&info); + + MPI_Start(&cont_req); + + /** + * One send, one recv, one continuation + */ + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + + cb_cnt = 0; + MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, MPIX_CONT_DEFER_COMPLETE, MPI_STATUSES_IGNORE, cont_req); + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + assert(cb_cnt == 1); + + MPI_Start(&cont_req); + + /** + * One send, one recv, two continuations + */ + cb_cnt = 0; + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPIX_Continue(&reqs[0], &complete_cnt_cb, &cb_cnt, MPIX_CONT_DEFER_COMPLETE, MPI_STATUS_IGNORE, cont_req); + + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + MPIX_Continue(&reqs[1], &complete_cnt_cb, &cb_cnt, MPIX_CONT_DEFER_COMPLETE, MPI_STATUS_IGNORE, cont_req); + + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + assert(cb_cnt == 2); + + MPI_Request_free(&cont_req); + + /* release the blocked thread */ + MPI_Send(&val, 1, MPI_INT, rank, 1002, MPI_COMM_WORLD); + pthread_join(thread, NULL); + + MPI_Finalize(); + + return 0; +} +#else +int main(int argc, char *argv[]) +{ + return 77; +} +#endif /* HAVE_MEMKIND_H */ diff --git a/test/continuations/continutions.c b/test/continuations/continutions.c new file mode 100644 index 00000000000..b1779bbda31 --- /dev/null +++ b/test/continuations/continutions.c @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2005 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2016 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2018 Los Alamos National Security, LLC. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include +#include +#include +#include + +#include "mpi.h" +#include "mpi-ext.h" + +//#define OMPI_HAVE_MPI_EXT_CONTINUE + +#ifdef OMPI_HAVE_MPI_EXT_CONTINUE + +static int complete_cnt_cb(int status, void *user_data) { + assert(user_data != NULL); + assert(status == MPI_SUCCESS); + printf("complete_cnt_cb \n"); + int *cb_cnt = (int*)user_data; + *cb_cnt = *cb_cnt + 1; + return MPI_SUCCESS; +} + +int main(int argc, char *argv[]) +{ + MPI_Request cont_req, cont_req2, reqs[2]; + int cb_cnt; + int val; + int rank, size; + MPI_Init(&argc, &argv); + + MPI_Comm_size(MPI_COMM_WORLD, &size); + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + + /* initialize the continuation request */ + MPIX_Continue_init(0, 0, MPI_INFO_NULL, &cont_req); + + MPI_Start(&cont_req); + + /** + * One send, one recv, one continuation + */ + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + + //MPI_Waitall(2, reqs, MPI_STATUSES_IGNORE); + + cb_cnt = 0; + MPIX_Continueall(2, reqs, &complete_cnt_cb, &cb_cnt, 0, MPI_STATUSES_IGNORE, cont_req); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(cb_cnt == 1); + + /** + * One send, one recv, two continuations + */ + cb_cnt = 0; + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPIX_Continue(&reqs[0], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req); + + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + MPIX_Continue(&reqs[1], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req); + + /* deferred start */ + MPI_Start(&cont_req); + + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + assert(cb_cnt == 2); + + MPI_Start(&cont_req); + + /** + * One send, one recv, two continuations in two continuation requests + */ + /* initialize a poll-only continuation request */ + MPIX_Continue_init(MPIX_CONT_POLL_ONLY, MPI_UNDEFINED, MPI_INFO_NULL, &cont_req2); + + MPI_Start(&cont_req2); + + cb_cnt = 0; + MPI_Irecv(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[0]); + MPIX_Continue(&reqs[0], &complete_cnt_cb, &cb_cnt, 0, MPI_STATUS_IGNORE, cont_req); + + MPI_Isend(&val, 1, MPI_INT, rank, 1001, MPI_COMM_WORLD, &reqs[1]); + MPIX_Continue(&reqs[1], &complete_cnt_cb, &cb_cnt, MPIX_CONT_DEFER_COMPLETE, MPI_STATUS_IGNORE, cont_req2); + + MPI_Wait(&cont_req, MPI_STATUS_IGNORE); + assert(reqs[0] == MPI_REQUEST_NULL && reqs[1] == MPI_REQUEST_NULL); + assert(cb_cnt == 1); + + printf("Waiting for poll-only cont request %p to complete\n", cont_req2); + MPI_Wait(&cont_req2, MPI_STATUS_IGNORE); + assert(cb_cnt == 2); + + MPI_Request_free(&cont_req); + MPI_Request_free(&cont_req2); + MPI_Finalize(); + + return 0; +} +#else +int main(int argc, char *argv[]) +{ + return 77; +} +#endif /* HAVE_MEMKIND_H */