From 9411a27f724253643b80f5446a85ca3ee437b2e2 Mon Sep 17 00:00:00 2001 From: Ralph Castain Date: Wed, 12 Sep 2018 14:59:59 -0700 Subject: [PATCH] Update to PMIx v2.1.4 Signed-off-by: Ralph Castain --- opal/mca/pmix/pmix2x/pmix/NEWS | 15 +- opal/mca/pmix/pmix2x/pmix/VERSION | 10 +- .../pmix/pmix2x/pmix/config/pmix_setup_cc.m4 | 72 ++-- opal/mca/pmix/pmix2x/pmix/contrib/pmix.spec | 2 +- .../pmix/pmix2x/pmix/include/pmix_common.h.in | 27 +- .../pmix/pmix2x/pmix/src/client/pmix_client.c | 9 +- .../pmix2x/pmix/src/common/pmix_control.c | 51 ++- .../pmix/pmix2x/pmix/src/event/pmix_event.h | 23 +- .../pmix/src/event/pmix_event_notification.c | 384 ++++++++---------- .../pmix2x/pmix/src/mca/gds/ds12/gds_dstore.c | 8 +- .../src/mca/psensor/base/psensor_base_stubs.c | 21 +- .../pmix/src/mca/psensor/file/psensor_file.c | 6 +- .../mca/psensor/heartbeat/psensor_heartbeat.c | 38 +- .../mca/psensor/heartbeat/psensor_heartbeat.h | 3 +- .../heartbeat/psensor_heartbeat_component.c | 16 +- .../pmix/src/mca/ptl/base/ptl_base_sendrecv.c | 4 + .../pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c | 2 +- .../pmix2x/pmix/src/mca/ptl/usock/ptl_usock.c | 2 +- .../pmix/pmix2x/pmix/src/server/pmix_server.c | 22 + .../pmix2x/pmix/src/server/pmix_server_ops.c | 117 ++++-- .../mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c | 17 +- opal/mca/pmix/pmix2x/pmix/src/util/path.c | 5 +- .../pmix/pmix2x/pmix/test/simple/Makefile.am | 10 +- .../pmix/pmix2x/pmix/test/simple/simpjctrl.c | 231 +++++++++++ .../pmix/pmix2x/pmix/test/simple/simptest.c | 85 +++- 25 files changed, 813 insertions(+), 367 deletions(-) create mode 100644 opal/mca/pmix/pmix2x/pmix/test/simple/simpjctrl.c diff --git a/opal/mca/pmix/pmix2x/pmix/NEWS b/opal/mca/pmix/pmix2x/pmix/NEWS index 8c1add89935..12f61cccfd7 100644 --- a/opal/mca/pmix/pmix2x/pmix/NEWS +++ b/opal/mca/pmix/pmix2x/pmix/NEWS @@ -21,11 +21,24 @@ example, a bug might be fixed in the master, and then moved to the current release as well as the "stable" bug fix release branch. -2.1.3 -- TBD +2.1.4 -- 18 Sep 2018 +---------------------- +- Updated configury to silence warnings on older compilers +- Implement job control and sensor APIs +- Update sensor support +- Fix a few bugs in the event notification system and provide some + missing implementation (support for specifying target procs to + receive the event). +- Add PMIX_PROC_TERMINATED constant +- Properly deal with EOPNOTSUPP from getsockopt() on ARM + + +2.1.3 -- 23 Aug 2018 ---------------------- - Fixed memory corruption bug in event notification system due to uninitialized variable - Add numeric version definition +- Transfer all cached data to client dstore upon first connect 2.1.2 -- 6 July 2018 diff --git a/opal/mca/pmix/pmix2x/pmix/VERSION b/opal/mca/pmix/pmix2x/pmix/VERSION index 27af07e5aa5..bc222742467 100644 --- a/opal/mca/pmix/pmix2x/pmix/VERSION +++ b/opal/mca/pmix/pmix2x/pmix/VERSION @@ -15,7 +15,7 @@ major=2 minor=1 -release=3 +release=4 # greek is used for alpha or beta release tags. If it is non-empty, # it will be appended to the version number. It does not have to be @@ -23,14 +23,14 @@ release=3 # The only requirement is that it must be entirely printable ASCII # characters and have no white space. -greek=rc1 +greek= # If repo_rev is empty, then the repository version number will be # obtained during "make dist" via the "git describe --tags --always" # command, or with the date (if "git describe" fails) in the form of # "date". -repo_rev=git1b0b577 +repo_rev=git1d605654 # If tarball_version is not empty, it is used as the version string in # the tarball filename, regardless of all other versions listed in @@ -44,7 +44,7 @@ tarball_version= # The date when this release was created -date="Jul 19, 2018" +date="Sep 18, 2018" # The shared library version of each of PMIx's public libraries. # These versions are maintained in accordance with the "Library @@ -75,6 +75,6 @@ date="Jul 19, 2018" # Version numbers are described in the Libtool current:revision:age # format. -libpmix_so_version=3:13:1 +libpmix_so_version=3:14:1 libpmi_so_version=1:0:0 libpmi2_so_version=1:0:0 diff --git a/opal/mca/pmix/pmix2x/pmix/config/pmix_setup_cc.m4 b/opal/mca/pmix/pmix2x/pmix/config/pmix_setup_cc.m4 index a2151c3a7ce..f35c16197e7 100644 --- a/opal/mca/pmix/pmix2x/pmix/config/pmix_setup_cc.m4 +++ b/opal/mca/pmix/pmix2x/pmix/config/pmix_setup_cc.m4 @@ -25,52 +25,44 @@ dnl $HEADER$ dnl AC_DEFUN([PMIX_CC_HELPER],[ - PMIX_VAR_SCOPE_PUSH([pmix_prog_cc_c11_helper_tmp]) + PMIX_VAR_SCOPE_PUSH([pmix_cc_helper_result]) AC_MSG_CHECKING([$1]) - pmix_prog_cc_c11_helper_tmp=0 + AC_LINK_IFELSE([AC_LANG_PROGRAM([$3],[$4])], + [$2=1 + pmix_cc_helper_result=yes], + [$2=0 + pmix_cc_helper_result=no]) - AC_LINK_IFELSE([AC_LANG_PROGRAM([$3],[$4])],[ - $2=yes - pmix_prog_cc_c11_helper_tmp=1], [$2=no]) - - AC_DEFINE_UNQUOTED([$5], [$pmix_prog_cc_c11_helper_tmp], [$6]) - - AC_MSG_RESULT([$$2]) + AC_MSG_RESULT([$pmix_cc_helper_result]) PMIX_VAR_SCOPE_POP ]) AC_DEFUN([PMIX_PROG_CC_C11_HELPER],[ - PMIX_VAR_SCOPE_PUSH([pmix_prog_cc_c11_helper_CFLAGS_save pmix_prog_cc_c11_helper__Thread_local_available pmix_prog_cc_c11_helper_atomic_var_available pmix_prog_cc_c11_helper__Atomic_available pmix_prog_cc_c11_helper__static_assert_available pmix_prog_cc_c11_helper__Generic_available]) + PMIX_VAR_SCOPE_PUSH([pmix_prog_cc_c11_helper_CFLAGS_save]) pmix_prog_cc_c11_helper_CFLAGS_save=$CFLAGS CFLAGS="$CFLAGS $1" PMIX_CC_HELPER([if $CC $1 supports C11 _Thread_local], [pmix_prog_cc_c11_helper__Thread_local_available], - [],[[static _Thread_local int foo = 1;++foo;]], [PMIX_C_HAVE__THREAD_LOCAL], - [Whether C compiler supports __Thread_local]) + [],[[static _Thread_local int foo = 1;++foo;]]) PMIX_CC_HELPER([if $CC $1 supports C11 atomic variables], [pmix_prog_cc_c11_helper_atomic_var_available], - [[#include ]], [[static atomic_long foo = 1;++foo;]], [PMIX_C_HAVE_ATOMIC_CONV_VAR], - [Whether C compiler support atomic convenience variables in stdatomic.h]) + [[#include ]], [[static atomic_long foo = 1;++foo;]]) PMIX_CC_HELPER([if $CC $1 supports C11 _Atomic keyword], [pmix_prog_cc_c11_helper__Atomic_available], - [[#include ]],[[static _Atomic long foo = 1;++foo;]], [PMIX_C_HAVE__ATOMIC], - [Whether C compiler supports __Atomic keyword]) + [[#include ]],[[static _Atomic long foo = 1;++foo;]]) PMIX_CC_HELPER([if $CC $1 supports C11 _Generic keyword], [pmix_prog_cc_c11_helper__Generic_available], - [[#define FOO(x) (_Generic (x, int: 1))]], [[static int x, y; y = FOO(x);]], [PMIX_C_HAVE__GENERIC], - [Whether C compiler supports __Generic keyword]) + [[#define FOO(x) (_Generic (x, int: 1))]], [[static int x, y; y = FOO(x);]]) PMIX_CC_HELPER([if $CC $1 supports C11 _Static_assert], [pmix_prog_cc_c11_helper__static_assert_available], - [[#include ]],[[_Static_assert(sizeof(int64_t) == 8, "WTH");]], [PMIX_C_HAVE__STATIC_ASSERT], - [Whether C compiler support _Static_assert keyword]) + [[#include ]],[[_Static_assert(sizeof(int64_t) == 8, "WTH");]]) - dnl At this time Open MPI only needs thread local and the atomic convenience types for C11 support. These - dnl will likely be required in the future. - AS_IF([test "x$pmix_prog_cc_c11_helper__Thread_local_available" = "xyes" && test "x$pmix_prog_cc_c11_helper_atomic_var_available" = "xyes"], - [$2], [$3]) + AS_IF([test $pmix_prog_cc_c11_helper__Thread_local_available -eq 1 && test $pmix_prog_cc_c11_helper_atomic_var_available -eq 1], + [$2], + [$3]) CFLAGS=$pmix_prog_cc_c11_helper_CFLAGS_save @@ -136,6 +128,8 @@ AC_DEFUN([PMIX_SETUP_CC],[ AC_REQUIRE([_PMIX_PROG_CC]) AC_REQUIRE([AM_PROG_CC_C_O]) + PMIX_VAR_SCOPE_PUSH([pmix_prog_cc_c11_helper__Thread_local_available pmix_prog_cc_c11_helper_atomic_var_available pmix_prog_cc_c11_helper__Atomic_available pmix_prog_cc_c11_helper__static_assert_available pmix_prog_cc_c11_helper__Generic_available pmix_prog_cc__thread_available]) + PMIX_PROG_CC_C11 if test $pmix_cv_c11_supported = no ; then @@ -157,11 +151,32 @@ AC_DEFUN([PMIX_SETUP_CC],[ fi # Check if compiler support __thread - PMIX_VAR_SCOPE_PUSH([pmix_prog_cc__thread_available]) PMIX_CC_HELPER([if $CC $1 supports __thread], [pmix_prog_cc__thread_available], - [],[[static __thread int foo = 1;++foo;]], [PMIX_C_HAVE___THREAD], - [Whether C compiler supports __thread]) - PMIX_VAR_SCOPE_POP + [],[[static __thread int foo = 1;++foo;]]) + + + PMIX_CC_HELPER([if $CC $1 supports C11 _Thread_local], [pmix_prog_cc_c11_helper__Thread_local_available], + [],[[static _Thread_local int foo = 1;++foo;]]) + + dnl At this time, PMIx only needs thread local and the atomic convenience tyes for C11 suport. These + dnl will likely be required in the future. + AC_DEFINE_UNQUOTED([PMIX_C_HAVE__THREAD_LOCAL], [$pmix_prog_cc_c11_helper__Thread_local_available], + [Whether C compiler supports __Thread_local]) + + AC_DEFINE_UNQUOTED([PMIX_C_HAVE_ATOMIC_CONV_VAR], [$pmix_prog_cc_c11_helper_atomic_var_available], + [Whether C compiler supports atomic convenience variables in stdatomic.h]) + + AC_DEFINE_UNQUOTED([PMIX_C_HAVE__ATOMIC], [$pmix_prog_cc_c11_helper__Atomic_available], + [Whether C compiler supports __Atomic keyword]) + + AC_DEFINE_UNQUOTED([PMIX_C_HAVE__GENERIC], [$pmix_prog_cc_c11_helper__Generic_available], + [Whether C compiler supports __Generic keyword]) + + AC_DEFINE_UNQUOTED([PMIX_C_HAVE__STATIC_ASSERT], [$pmix_prog_cc_c11_helper__static_assert_available], + [Whether C compiler supports _Static_assert keyword]) + + AC_DEFINE_UNQUOTED([PMIX_C_HAVE___THREAD], [$pmix_prog_cc__thread_available], + [Whether C compiler supports __thread]) PMIX_C_COMPILER_VENDOR([pmix_c_vendor]) @@ -456,6 +471,7 @@ AC_DEFUN([PMIX_SETUP_CC],[ PMIX_ENSURE_CONTAINS_OPTFLAGS(["$CFLAGS"]) AC_MSG_RESULT([$co_result]) CFLAGS="$co_result" + PMIX_VAR_SCOPE_POP ]) diff --git a/opal/mca/pmix/pmix2x/pmix/contrib/pmix.spec b/opal/mca/pmix/pmix2x/pmix/contrib/pmix.spec index 853d15a7427..6204686d5b9 100644 --- a/opal/mca/pmix/pmix2x/pmix/contrib/pmix.spec +++ b/opal/mca/pmix/pmix2x/pmix/contrib/pmix.spec @@ -192,7 +192,7 @@ Summary: An extended/exascale implementation of PMI Name: %{?_name:%{_name}}%{!?_name:pmix} -Version: 2.1.3rc1 +Version: 2.1.4 Release: 1%{?dist} License: BSD Group: Development/Libraries diff --git a/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h.in b/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h.in index e975f6e82d3..b4f4325e5e1 100644 --- a/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h.in +++ b/opal/mca/pmix/pmix2x/pmix/include/pmix_common.h.in @@ -591,6 +591,7 @@ typedef int pmix_status_t; /* monitoring */ #define PMIX_MONITOR_HEARTBEAT_ALERT (PMIX_ERR_V2X_BASE - 9) #define PMIX_MONITOR_FILE_ALERT (PMIX_ERR_V2X_BASE - 10) +#define PMIX_PROC_TERMINATED (PMIX_ERR_V2X_BASE - 11) /* define a starting point for operational error constants so * we avoid renumbering when making additions */ @@ -602,6 +603,9 @@ typedef int pmix_status_t; #define PMIX_ERR_UPDATE_ENDPOINTS (PMIX_ERR_OP_BASE - 16) #define PMIX_MODEL_DECLARED (PMIX_ERR_OP_BASE - 17) #define PMIX_GDS_ACTION_COMPLETE (PMIX_ERR_OP_BASE - 18) +/* gap created by v3 definitions */ +#define PMIX_OPERATION_SUCCEEDED (PMIX_ERR_OP_BASE - 27) +/* gap for group codes */ /* define a starting point for system error constants so * we avoid renumbering when making additions */ @@ -750,6 +754,19 @@ typedef uint8_t pmix_alloc_directive_t; #define PMIX_ALLOC_EXTERNAL 128 +/* declare a convenience macro for checking keys */ +#define PMIX_CHECK_KEY(a, b) \ + (0 == strncmp((a)->key, (b), PMIX_MAX_KEYLEN)) + +/* define a convenience macro for checking nspaces */ +#define PMIX_CHECK_NSPACE(a, b) \ + (0 == strncmp((a), (b), PMIX_MAX_NSLEN)) + +/* define a convenience macro for checking names */ +#define PMIX_CHECK_PROCID(a, b) \ + (PMIX_CHECK_NSPACE((a)->nspace, (b)->nspace) && ((a)->rank == (b)->rank || (PMIX_RANK_WILDCARD == (a)->rank || PMIX_RANK_WILDCARD == (b)->rank))) + + /**** PMIX BYTE OBJECT ****/ typedef struct pmix_byte_object { char *bytes; @@ -1086,11 +1103,11 @@ struct pmix_info_t { (void)strncpy((m)->key, (k), PMIX_MAX_KEYLEN); \ pmix_value_load(&((m)->value), (v), (t)); \ } while (0) -#define PMIX_INFO_XFER(d, s) \ - do { \ - (void)strncpy((d)->key, (s)->key, PMIX_MAX_KEYLEN); \ - (d)->flags = (s)->flags; \ - pmix_value_xfer(&(d)->value, &(s)->value); \ +#define PMIX_INFO_XFER(d, s) \ + do { \ + (void)strncpy((d)->key, (s)->key, PMIX_MAX_KEYLEN); \ + (d)->flags = (s)->flags; \ + pmix_value_xfer(&(d)->value, (pmix_value_t*)&(s)->value); \ } while(0) #define PMIX_INFO_REQUIRED(m) \ diff --git a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c index 668f3b23e9f..2d951b14f74 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c +++ b/opal/mca/pmix/pmix2x/pmix/src/client/pmix_client.c @@ -167,14 +167,9 @@ static void pmix_client_notify_recv(struct pmix_peer_t *peer, PMIX_RELEASE(chain); goto error; } - /* check for non-default flag */ - for (cnt=0; cnt < (int)ninfo; cnt++) { - if (0 == strncmp(chain->info[cnt].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { - chain->nondefault = PMIX_INFO_TRUE(&chain->info[cnt]); - break; - } - } } + /* prep the chain for processing */ + pmix_prep_event_chain(chain, chain->info, ninfo, false); pmix_output_verbose(2, pmix_globals.debug_output, "[%s:%d] pmix:client_notify_recv - processing event %d, calling errhandler", diff --git a/opal/mca/pmix/pmix2x/pmix/src/common/pmix_control.c b/opal/mca/pmix/pmix2x/pmix/src/common/pmix_control.c index 44803eff7ae..7dd8c7f4e8b 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/common/pmix_control.c +++ b/opal/mca/pmix/pmix2x/pmix/src/common/pmix_control.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2018 Intel, Inc. All rights reserved. * Copyright (c) 2016 Mellanox Technologies, Inc. * All rights reserved. * Copyright (c) 2016 IBM Corporation. All rights reserved. @@ -84,7 +84,7 @@ static void query_cbfunc(struct pmix_peer_t *peer, /* unpack any returned data */ cnt = 1; PMIX_BFROPS_UNPACK(rc, peer, buf, &results->ninfo, &cnt, PMIX_SIZE); - if (PMIX_SUCCESS != rc) { + if (PMIX_SUCCESS != rc && PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { PMIX_ERROR_LOG(rc); goto complete; } @@ -127,16 +127,10 @@ PMIX_EXPORT pmix_status_t PMIx_Job_control_nb(const pmix_proc_t targets[], size_ return PMIX_ERR_INIT; } - /* if we aren't connected, don't attempt to send */ - if (!PMIX_PROC_IS_SERVER(pmix_globals.mypeer) && !pmix_globals.connected) { - PMIX_RELEASE_THREAD(&pmix_global_lock); - return PMIX_ERR_UNREACH; - } - PMIX_RELEASE_THREAD(&pmix_global_lock); - /* if we are the server, then we just issue the request and * return the response */ if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { + PMIX_RELEASE_THREAD(&pmix_global_lock); if (NULL == pmix_host_server.job_control) { /* nothing we can do */ return PMIX_ERR_NOT_SUPPORTED; @@ -150,6 +144,13 @@ PMIX_EXPORT pmix_status_t PMIx_Job_control_nb(const pmix_proc_t targets[], size_ return rc; } + /* we need to send, so check for connection */ + if (!pmix_globals.connected) { + PMIX_RELEASE_THREAD(&pmix_global_lock); + return PMIX_ERR_UNREACH; + } + PMIX_RELEASE_THREAD(&pmix_global_lock); + /* if we are a client, then relay this request to the server */ msg = PMIX_NEW(pmix_buffer_t); /* pack the cmd */ @@ -171,7 +172,7 @@ PMIX_EXPORT pmix_status_t PMIx_Job_control_nb(const pmix_proc_t targets[], size_ } /* remember, the targets can be NULL to indicate that the operation * is to be done against all members of our nspace */ - if (0 < ntargets) { + if (NULL != targets && 0 < ntargets) { /* pack the targets */ PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, targets, ntargets, PMIX_PROC); @@ -190,7 +191,7 @@ PMIX_EXPORT pmix_status_t PMIx_Job_control_nb(const pmix_proc_t targets[], size_ PMIX_RELEASE(msg); return rc; } - if (0 < ndirs) { + if (NULL != directives && 0 < ndirs) { PMIX_BFROPS_PACK(rc, pmix_client_globals.myserver, msg, directives, ndirs, PMIX_INFO); if (PMIX_SUCCESS != rc) { @@ -237,16 +238,16 @@ PMIX_EXPORT pmix_status_t PMIx_Process_monitor_nb(const pmix_info_t *monitor, pm return PMIX_ERR_INIT; } - /* if we aren't connected, don't attempt to send */ - if (!PMIX_PROC_IS_SERVER(pmix_globals.mypeer) && !pmix_globals.connected) { + /* sanity check */ + if (NULL == monitor) { PMIX_RELEASE_THREAD(&pmix_global_lock); - return PMIX_ERR_UNREACH; + return PMIX_ERR_BAD_PARAM; } - PMIX_RELEASE_THREAD(&pmix_global_lock); /* if we are the server, then we just issue the request and * return the response */ if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { + PMIX_RELEASE_THREAD(&pmix_global_lock); if (NULL == pmix_host_server.monitor) { /* nothing we can do */ return PMIX_ERR_NOT_SUPPORTED; @@ -258,6 +259,26 @@ PMIX_EXPORT pmix_status_t PMIx_Process_monitor_nb(const pmix_info_t *monitor, pm return rc; } + /* we need to send, so check for connection */ + if (!pmix_globals.connected) { + PMIX_RELEASE_THREAD(&pmix_global_lock); + return PMIX_ERR_UNREACH; + } + PMIX_RELEASE_THREAD(&pmix_global_lock); + + /* if the monitor is PMIX_SEND_HEARTBEAT, then send it */ + if (0 == strncmp(monitor->key, PMIX_SEND_HEARTBEAT, PMIX_MAX_KEYLEN)) { + msg = PMIX_NEW(pmix_buffer_t); + if (NULL == msg) { + return PMIX_ERR_NOMEM; + } + PMIX_PTL_SEND_ONEWAY(rc, pmix_client_globals.myserver, msg, PMIX_PTL_TAG_HEARTBEAT); + if (PMIX_SUCCESS != rc) { + PMIX_RELEASE(msg); + } + return rc; + } + /* if we are a client, then relay this request to the server */ msg = PMIX_NEW(pmix_buffer_t); /* pack the cmd */ diff --git a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event.h b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event.h index b4ee30b0c0e..1cd7d3fe719 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event.h +++ b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event.h @@ -39,6 +39,13 @@ #define PMIX_EVENT_ORDER_PREPEND 0x10 #define PMIX_EVENT_ORDER_APPEND 0x20 +/* define an internal attribute for marking that the + * server processed an event before passing it up + * to its host in case it comes back down - avoids + * infinite loop */ +#define PMIX_SERVER_INTERNAL_NOTIFY "pmix.srvr.internal.notify" + + /* define a struct for tracking registration ranges */ typedef struct { pmix_data_range_t range; @@ -117,8 +124,15 @@ typedef struct pmix_event_chain_t { bool endchain; pmix_proc_t source; pmix_data_range_t range; + /* When generating events, callers can specify + * the range of targets to receive notifications. + */ + pmix_proc_t *targets; + size_t ntargets; + /* the processes that we affected by the event */ pmix_proc_t *affected; size_t naffected; + /* any info provided by the event generator */ pmix_info_t *info; size_t ninfo; size_t nallocated; @@ -130,6 +144,13 @@ typedef struct pmix_event_chain_t { } pmix_event_chain_t; PMIX_CLASS_DECLARATION(pmix_event_chain_t); +/* prepare a chain for processing by cycling across provided + * info structs and translating those supported by the event + * system into the chain object*/ +pmix_status_t pmix_prep_event_chain(pmix_event_chain_t *chain, + const pmix_info_t *info, size_t ninfo, + bool xfer); + /* invoke the error handler that is registered against the given * status, passing it the provided info on the procs that were * affected, plus any additional info provided by the server */ @@ -146,7 +167,7 @@ bool pmix_notify_check_affected(pmix_proc_t *interested, size_t ninterested, pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status, const pmix_proc_t *source, pmix_data_range_t range, - pmix_info_t info[], size_t ninfo, + const pmix_info_t info[], size_t ninfo, pmix_op_cbfunc_t cbfunc, void *cbdata); void pmix_event_timeout_cb(int fd, short flags, void *arg); diff --git a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c index 1065ad0c533..554bb333faa 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c +++ b/opal/mca/pmix/pmix2x/pmix/src/event/pmix_event_notification.c @@ -30,7 +30,7 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, const pmix_proc_t *source, pmix_data_range_t range, - pmix_info_t info[], size_t ninfo, + const pmix_info_t info[], size_t ninfo, pmix_op_cbfunc_t cbfunc, void *cbdata); /* if we are a client, we call this function to notify the server of @@ -54,13 +54,16 @@ PMIX_EXPORT pmix_status_t PMIx_Notify_event(pmix_status_t status, if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { PMIX_RELEASE_THREAD(&pmix_global_lock); + pmix_output_verbose(2, pmix_globals.debug_output, + "pmix_server_notify_event source = %s:%d event_status = %d", + (NULL == source) ? "UNKNOWN" : source->nspace, + (NULL == source) ? PMIX_RANK_WILDCARD : source->rank, status); rc = pmix_server_notify_client_of_event(status, source, range, info, ninfo, cbfunc, cbdata); - pmix_output_verbose(2, pmix_globals.debug_output, - "pmix_server_notify_event source = %s:%d event_status = %d, rc= %d", - (NULL == source) ? "UNKNOWN" : source->nspace, - (NULL == source) ? PMIX_RANK_WILDCARD : source->rank, status, rc); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + } return rc; } @@ -70,14 +73,17 @@ PMIX_EXPORT pmix_status_t PMIx_Notify_event(pmix_status_t status, return PMIX_ERR_UNREACH; } PMIX_RELEASE_THREAD(&pmix_global_lock); + pmix_output_verbose(2, pmix_globals.debug_output, + "pmix_client_notify_event source = %s:%d event_status =%d", + (NULL == source) ? pmix_globals.myid.nspace : source->nspace, + (NULL == source) ? pmix_globals.myid.rank : source->rank, status); rc = notify_server_of_event(status, source, range, info, ninfo, cbfunc, cbdata); - pmix_output_verbose(2, pmix_globals.debug_output, - "pmix_client_notify_event source = %s:%d event_status =%d, rc=%d", - (NULL == source) ? pmix_globals.myid.nspace : source->nspace, - (NULL == source) ? pmix_globals.myid.rank : source->rank, status, rc); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + } return rc; } @@ -105,7 +111,7 @@ static void notify_event_cbfunc(struct pmix_peer_t *pr, pmix_ptl_hdr_t *hdr, static pmix_status_t notify_server_of_event(pmix_status_t status, const pmix_proc_t *source, pmix_data_range_t range, - pmix_info_t info[], size_t ninfo, + const pmix_info_t info[], size_t ninfo, pmix_op_cbfunc_t cbfunc, void *cbdata) { pmix_status_t rc; @@ -170,14 +176,8 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, /* we always leave space for event hdlr name and a callback object */ chain->nallocated = ninfo + 2; PMIX_INFO_CREATE(chain->info, chain->nallocated); - - if (0 < ninfo) { - chain->ninfo = ninfo; - /* need to copy the info */ - for (n=0; n < ninfo; n++) { - PMIX_INFO_XFER(&chain->info[n], &info[n]); - } - } + /* prep the chain for processing */ + pmix_prep_event_chain(chain, info, ninfo, true); /* we need to cache this event so we can pass it into * ourselves should someone later register for it */ @@ -194,65 +194,25 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, if (0 < chain->ninfo) { cd->ninfo = chain->ninfo; PMIX_INFO_CREATE(cd->info, cd->ninfo); + cd->nondefault = chain->nondefault; /* need to copy the info */ for (n=0; n < cd->ninfo; n++) { PMIX_INFO_XFER(&cd->info[n], &chain->info[n]); - if (0 == strncmp(cd->info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { - cd->nondefault = PMIX_INFO_TRUE(&info[n]); - chain->nondefault = cd->nondefault; - } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) { - /* provides an array of pmix_proc_t identifying the procs - * that are to receive this notification, or a single pmix_proc_t */ - if (PMIX_DATA_ARRAY == cd->info[n].value.type && - NULL != cd->info[n].value.data.darray && - NULL != cd->info[n].value.data.darray->array) { - cd->ntargets = cd->info[n].value.data.darray->size; - PMIX_PROC_CREATE(cd->targets, cd->ntargets); - memcpy(cd->targets, cd->info[n].value.data.darray->array, cd->ntargets * sizeof(pmix_proc_t)); - } else if (PMIX_PROC == cd->info[n].value.type) { - cd->ntargets = 1; - PMIX_PROC_CREATE(cd->targets, cd->ntargets); - memcpy(cd->targets, cd->info[n].value.data.proc, sizeof(pmix_proc_t)); - } else { - /* this is an error */ - PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); - return PMIX_ERR_BAD_PARAM; - } - } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_AFFECTED_PROC, PMIX_MAX_KEYLEN)) { - PMIX_PROC_CREATE(cd->affected, 1); - if (NULL == cd->affected) { - rc = PMIX_ERR_NOMEM; - goto cleanup; - } - cd->naffected = 1; - memcpy(cd->affected, cd->info[n].value.data.proc, sizeof(pmix_proc_t)); - /* need to do the same for chain so it can be correctly processed */ - PMIX_PROC_CREATE(chain->affected, 1); - if (NULL == chain->affected) { - rc = PMIX_ERR_NOMEM; - goto cleanup; - } - chain->naffected = 1; - memcpy(chain->affected, cd->info[n].value.data.proc, sizeof(pmix_proc_t)); - } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_AFFECTED_PROCS, PMIX_MAX_KEYLEN)) { - cd->naffected = cd->info[n].value.data.darray->size; - PMIX_PROC_CREATE(cd->affected, cd->naffected); - if (NULL == cd->affected) { - cd->naffected = 0; - rc = PMIX_ERR_NOMEM; - goto cleanup; - } - memcpy(cd->affected, cd->info[n].value.data.darray->array, cd->naffected * sizeof(pmix_proc_t)); - /* need to do the same for chain so it can be correctly processed */ - chain->naffected = cd->info[n].value.data.darray->size; - PMIX_PROC_CREATE(chain->affected, chain->naffected); - if (NULL == chain->affected) { - chain->naffected = 0; - rc = PMIX_ERR_NOMEM; - goto cleanup; - } - memcpy(chain->affected, cd->info[n].value.data.darray->array, chain->naffected * sizeof(pmix_proc_t)); + } + if (NULL != chain->targets) { + cd->ntargets = chain->ntargets; + PMIX_PROC_CREATE(cd->targets, cd->ntargets); + memcpy(cd->targets, chain->targets, cd->ntargets * sizeof(pmix_proc_t)); + } + if (NULL != chain->affected) { + cd->naffected = chain->naffected; + PMIX_PROC_CREATE(cd->affected, cd->naffected); + if (NULL == cd->affected) { + cd->naffected = 0; + rc = PMIX_ERR_NOMEM; + goto cleanup; } + memcpy(cd->affected, chain->affected, cd->naffected * sizeof(pmix_proc_t)); } } @@ -424,7 +384,7 @@ static void progress_local_event_hdlr(pmix_status_t status, } while (pmix_list_get_end(&pmix_globals.events.multi_events) != (item = pmix_list_get_next(item))) { nxt = (pmix_event_hdlr_t*)item; - if (!pmix_notify_check_range(&nxt->rng, &chain->source) && + if (!pmix_notify_check_range(&nxt->rng, &chain->source) || !pmix_notify_check_affected(nxt->affected, nxt->naffected, chain->affected, chain->naffected)) { continue; @@ -622,10 +582,17 @@ void pmix_invoke_local_event_hdlr(pmix_event_chain_t *chain) goto complete; } - /* check for directives */ - for (i=0; i < chain->ninfo; i++) { - if (0 == strncmp(chain->info[i].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { - chain->nondefault = true; + /* if we are not a target, then we can simply ignore this event */ + if (NULL != chain->targets) { + found = false; + for (i=0; i < chain->ntargets; i++) { + if (PMIX_CHECK_PROCID(&chain->targets[i], &pmix_globals.myid)) { + found = true; + break; + } + } + if (!found) { + goto complete; } } @@ -814,7 +781,7 @@ static void _notify_client_event(int sd, short args, void *cbdata) if (0 < cd->ninfo) { /* check for caching instructions */ for (n=0; n < cd->ninfo; n++) { - if (0 == strncmp(cd->info[n].key, PMIX_EVENT_DO_NOT_CACHE, PMIX_MAX_KEYLEN)) { + if (PMIX_CHECK_KEY(&cd->info[n], PMIX_EVENT_DO_NOT_CACHE)) { if (PMIX_INFO_TRUE(&cd->info[n])) { holdcd = false; } @@ -836,6 +803,59 @@ static void _notify_client_event(int sd, short args, void *cbdata) } } + /* we may also have registered for events, so setup to check this + * against our registrations */ + chain = PMIX_NEW(pmix_event_chain_t); + chain->status = cd->status; + (void)strncpy(chain->source.nspace, cd->source.nspace, PMIX_MAX_NSLEN); + chain->source.rank = cd->source.rank; + /* we always leave space for a callback object and + * the evhandler name. */ + chain->nallocated = cd->ninfo + 2; + PMIX_INFO_CREATE(chain->info, chain->nallocated); + /* prep the chain for processing */ + pmix_prep_event_chain(chain, cd->info, cd->ninfo, true); + + if (0 < cd->ninfo) { + /* copy setup to the cd object */ + cd->nondefault = chain->nondefault; + if (NULL != chain->targets) { + cd->ntargets = chain->ntargets; + PMIX_PROC_CREATE(cd->targets, cd->ntargets); + memcpy(cd->targets, chain->targets, cd->ntargets * sizeof(pmix_proc_t)); + } + if (NULL != chain->affected) { + cd->naffected = chain->naffected; + PMIX_PROC_CREATE(cd->affected, cd->naffected); + if (NULL == cd->affected) { + cd->naffected = 0; + /* notify the caller */ + if (NULL != cd->cbfunc) { + cd->cbfunc(PMIX_ERR_NOMEM, cd->cbdata); + } + PMIX_RELEASE(cd); + PMIX_RELEASE(chain); + return; + } + memcpy(cd->affected, chain->affected, cd->naffected * sizeof(pmix_proc_t)); + } + } + + /* if they provided a PMIX_EVENT_CUSTOM_RANGE info object but + * specified a range other than PMIX_RANGE_CUSTOM, then this + * is an error */ + if (PMIX_RANGE_CUSTOM != cd->range && NULL != cd->targets) { + PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); + /* notify the caller */ + if (NULL != cd->cbfunc) { + cd->cbfunc(PMIX_ERR_BAD_PARAM, cd->cbdata); + } + PMIX_RELEASE(cd); + PMIX_RELEASE(chain); + return; + } + + holdcd = false; if (PMIX_RANGE_PROC_LOCAL != cd->range) { PMIX_CONSTRUCT(&trk, pmix_list_t); @@ -848,8 +868,7 @@ static void _notify_client_event(int sd, short args, void *cbdata) /* if this client was the source of the event, then * don't send it back as they will have processed it * when they generated it */ - if (0 == strncmp(cd->source.nspace, pr->peer->info->pname.nspace, PMIX_MAX_NSLEN) && - cd->source.rank == pr->peer->info->pname.rank) { + if (PMIX_CHECK_PROCID(&cd->source, &pr->peer->info->pname)) { continue; } /* if we have already notified this client, then don't do it again */ @@ -867,11 +886,7 @@ static void _notify_client_event(int sd, short args, void *cbdata) if (NULL != cd->targets) { matched = false; for (n=0; n < cd->ntargets; n++) { - if (0 != strncmp(pr->peer->info->pname.nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) { - continue; - } - if (PMIX_RANK_WILDCARD == cd->targets[n].rank || - pr->peer->info->pname.rank == cd->targets[n].rank) { + if (PMIX_CHECK_PROCID(&pr->peer->info->pname, &cd->targets[n])) { matched = true; break; } @@ -939,9 +954,7 @@ static void _notify_client_event(int sd, short args, void *cbdata) } } PMIX_LIST_DESTRUCT(&trk); - if (PMIX_RANGE_LOCAL != cd->range && - 0 == strncmp(cd->source.nspace, pmix_globals.myid.nspace, PMIX_MAX_NSLEN) && - cd->source.rank == pmix_globals.myid.rank) { + if (PMIX_RANGE_LOCAL != cd->range && PMIX_CHECK_PROCID(&cd->source, &pmix_globals.myid)) { /* if we are the source, then we need to post this upwards as * well so the host RM can broadcast it as necessary - we rely * on the host RM to _not_ deliver this back to us! */ @@ -952,85 +965,10 @@ static void _notify_client_event(int sd, short args, void *cbdata) pmix_host_server.notify_event(cd->status, &cd->source, cd->range, cd->info, cd->ninfo, local_cbfunc, cd); } - } } - /* we may also have registered for events, so be sure to check this - * against our registrations */ - chain = PMIX_NEW(pmix_event_chain_t); - chain->status = cd->status; - (void)strncpy(chain->source.nspace, cd->source.nspace, PMIX_MAX_NSLEN); - chain->source.rank = cd->source.rank; - /* we always leave space for a callback object and - * the evhandler name. */ - chain->nallocated = cd->ninfo + 2; - PMIX_INFO_CREATE(chain->info, chain->nallocated); - if (0 < cd->ninfo) { - chain->ninfo = cd->ninfo; - /* need to copy the info */ - for (n=0; n < cd->ninfo; n++) { - PMIX_INFO_XFER(&chain->info[n], &cd->info[n]); - if (0 == strncmp(cd->info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { - cd->nondefault = PMIX_INFO_TRUE(&cd->info[n]); - chain->nondefault = cd->nondefault; - } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) { - /* provides an array of pmix_proc_t identifying the procs - * that are to receive this notification, or a single pmix_proc_t */ - if (PMIX_DATA_ARRAY == cd->info[n].value.type && - NULL != cd->info[n].value.data.darray && - NULL != cd->info[n].value.data.darray->array) { - cd->ntargets = cd->info[n].value.data.darray->size; - PMIX_PROC_CREATE(cd->targets, cd->ntargets); - memcpy(cd->targets, cd->info[n].value.data.darray->array, cd->ntargets * sizeof(pmix_proc_t)); - } else if (PMIX_PROC == cd->info[n].value.type) { - cd->ntargets = 1; - PMIX_PROC_CREATE(cd->targets, cd->ntargets); - memcpy(cd->targets, cd->info[n].value.data.proc, sizeof(pmix_proc_t)); - } else { - /* this is an error */ - PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); - PMIX_RELEASE(chain); - return; - } - } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_AFFECTED_PROC, PMIX_MAX_KEYLEN)) { - PMIX_PROC_CREATE(cd->affected, 1); - if (NULL == cd->affected) { - PMIX_RELEASE(chain); - return; - } - cd->naffected = 1; - memcpy(cd->affected, cd->info[n].value.data.proc, sizeof(pmix_proc_t)); - /* need to do the same for chain so it can be correctly processed */ - PMIX_PROC_CREATE(chain->affected, 1); - if (NULL == chain->affected) { - PMIX_RELEASE(chain); - return; - } - chain->naffected = 1; - memcpy(chain->affected, cd->info[n].value.data.proc, sizeof(pmix_proc_t)); - } else if (0 == strncmp(cd->info[n].key, PMIX_EVENT_AFFECTED_PROCS, PMIX_MAX_KEYLEN)) { - cd->naffected = cd->info[n].value.data.darray->size; - PMIX_PROC_CREATE(cd->affected, cd->naffected); - if (NULL == cd->affected) { - cd->naffected = 0; - PMIX_RELEASE(chain); - return; - } - memcpy(cd->affected, cd->info[n].value.data.darray->array, cd->naffected * sizeof(pmix_proc_t)); - /* need to do the same for chain so it can be correctly processed */ - chain->naffected = cd->info[n].value.data.darray->size; - PMIX_PROC_CREATE(chain->affected, chain->naffected); - if (NULL == chain->affected) { - chain->naffected = 0; - PMIX_RELEASE(chain); - return; - } - memcpy(chain->affected, cd->info[n].value.data.darray->array, chain->naffected * sizeof(pmix_proc_t)); - } - } - } - /* process it */ + /* process it ourselves */ pmix_invoke_local_event_hdlr(chain); if (!holdcd) { @@ -1053,7 +991,7 @@ static void _notify_client_event(int sd, short args, void *cbdata) pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status, const pmix_proc_t *source, pmix_data_range_t range, - pmix_info_t info[], size_t ninfo, + const pmix_info_t info[], size_t ninfo, pmix_op_cbfunc_t cbfunc, void *cbdata) { pmix_notify_caddy_t *cd; @@ -1063,6 +1001,11 @@ pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status, "pmix_server: notify client of event %s", PMIx_Error_string(status)); + /* check for prior processing */ + if (NULL != info && PMIX_CHECK_KEY(&info[ninfo], PMIX_SERVER_INTERNAL_NOTIFY)) { + return PMIX_OPERATION_SUCCEEDED; + } + cd = PMIX_NEW(pmix_notify_caddy_t); cd->status = status; if (NULL == source) { @@ -1083,51 +1026,6 @@ pmix_status_t pmix_server_notify_client_of_event(pmix_status_t status, } } - /* check for directives */ - if (NULL != info) { - for (n=0; n < ninfo; n++) { - if (0 == strncmp(info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { - cd->nondefault = PMIX_INFO_TRUE(&info[n]); - } else if (0 == strncmp(info[n].key, PMIX_EVENT_CUSTOM_RANGE, PMIX_MAX_KEYLEN)) { - /* provides an array of pmix_proc_t identifying the procs - * that are to receive this notification, or a single pmix_proc_t */ - if (PMIX_DATA_ARRAY == info[n].value.type && - NULL != info[n].value.data.darray && - NULL != info[n].value.data.darray->array) { - cd->ntargets = info[n].value.data.darray->size; - PMIX_PROC_CREATE(cd->targets, cd->ntargets); - memcpy(cd->targets, info[n].value.data.darray->array, cd->ntargets * sizeof(pmix_proc_t)); - } else if (PMIX_PROC == info[n].value.type) { - cd->ntargets = 1; - PMIX_PROC_CREATE(cd->targets, cd->ntargets); - memcpy(cd->targets, info[n].value.data.proc, sizeof(pmix_proc_t)); - } else { - /* this is an error */ - PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); - return PMIX_ERR_BAD_PARAM; - } - } - } - } - - /* - * If the range is PMIX_RANGE_NAMESPACE, then they should not have set a - * PMIX_EVENT_CUSTOM_RANGE info object or at least we should ignore it - */ - if (PMIX_RANGE_NAMESPACE == cd->range) { - if (cd->targets) { - PMIX_PROC_FREE(cd->targets, cd->ntargets); - } - PMIX_PROC_CREATE(cd->targets, 1); - cd->ntargets = 1; - cd->targets[0].rank = PMIX_RANK_WILDCARD; - if (NULL == source) { - strncpy(cd->targets[0].nspace, "UNDEF", PMIX_MAX_NSLEN); - } else { - strncpy(cd->targets[0].nspace, source->nspace, PMIX_MAX_NSLEN); - } - } - /* track the eventual callback info */ cd->cbfunc = cbfunc; cd->cbdata = cbdata; @@ -1244,6 +1142,65 @@ void pmix_event_timeout_cb(int fd, short flags, void *arg) } } +pmix_status_t pmix_prep_event_chain(pmix_event_chain_t *chain, + const pmix_info_t *info, size_t ninfo, + bool xfer) +{ + size_t n; + + if (NULL != info && 0 < ninfo) { + chain->ninfo = ninfo; + if (NULL == chain->info) { + PMIX_INFO_CREATE(chain->info, chain->ninfo); + } + /* need to copy the info */ + for (n=0; n < ninfo; n++) { + if (xfer) { + /* chain doesn't already have a copy of the info */ + PMIX_INFO_XFER(&chain->info[n], &info[n]); + } + /* look for specific directives */ + if (0 == strncmp(info[n].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { + chain->nondefault = PMIX_INFO_TRUE(&info[n]); + } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_CUSTOM_RANGE)) { + /* provides an array of pmix_proc_t identifying the procs + * that are to receive this notification, or a single pmix_proc_t */ + if (PMIX_DATA_ARRAY == info[n].value.type && + NULL != info[n].value.data.darray && + NULL != info[n].value.data.darray->array) { + chain->ntargets = info[n].value.data.darray->size; + PMIX_PROC_CREATE(chain->targets, chain->ntargets); + memcpy(chain->targets, info[n].value.data.darray->array, chain->ntargets * sizeof(pmix_proc_t)); + } else if (PMIX_PROC == info[n].value.type) { + chain->ntargets = 1; + PMIX_PROC_CREATE(chain->targets, chain->ntargets); + memcpy(chain->targets, info[n].value.data.proc, sizeof(pmix_proc_t)); + } else { + /* this is an error */ + PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); + return PMIX_ERR_BAD_PARAM; + } + } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROC)) { + PMIX_PROC_CREATE(chain->affected, 1); + if (NULL == chain->affected) { + return PMIX_ERR_NOMEM; + } + chain->naffected = 1; + memcpy(chain->affected, info[n].value.data.proc, sizeof(pmix_proc_t)); + } else if (PMIX_CHECK_KEY(&info[n], PMIX_EVENT_AFFECTED_PROCS)) { + chain->naffected = info[n].value.data.darray->size; + PMIX_PROC_CREATE(chain->affected, chain->naffected); + if (NULL == chain->affected) { + chain->naffected = 0; + return PMIX_ERR_NOMEM; + } + memcpy(chain->affected, info[n].value.data.darray->array, chain->naffected * sizeof(pmix_proc_t)); + } + } + } + return PMIX_SUCCESS; +} + /**** CLASS INSTANTIATIONS ****/ static void sevcon(pmix_event_hdlr_t *p) @@ -1326,6 +1283,8 @@ static void chcon(pmix_event_chain_t *p) p->source.rank = PMIX_RANK_UNDEF; p->nondefault = false; p->endchain = false; + p->targets = NULL; + p->ntargets = 0; p->range = PMIX_RANGE_UNDEF; p->affected = NULL; p->naffected = 0; @@ -1343,6 +1302,9 @@ static void chdes(pmix_event_chain_t *p) if (p->timer_active) { pmix_event_del(&p->ev); } + if (NULL != p->targets) { + PMIX_PROC_FREE(p->targets, p->ntargets); + } if (NULL != p->affected) { PMIX_PROC_FREE(p->affected, p->naffected); } diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/gds/ds12/gds_dstore.c b/opal/mca/pmix/pmix2x/pmix/src/mca/gds/ds12/gds_dstore.c index 039a16765e5..065a9bb4bad 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/gds/ds12/gds_dstore.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/gds/ds12/gds_dstore.c @@ -1,6 +1,6 @@ /* * Copyright (c) 2015-2018 Intel, Inc. All rights reserved. - * Copyright (c) 2016 IBM Corporation. All rights reserved. + * Copyright (c) 2016-2018 IBM Corporation. All rights reserved. * Copyright (c) 2016-2017 Mellanox Technologies, Inc. * All rights reserved. * @@ -3164,7 +3164,7 @@ static pmix_status_t dstore_register_job_info(struct pmix_peer_t *pr, char *msg; pmix_status_t rc; pmix_proc_t proc; - pmix_rank_info_t *rinfo; + pmix_rank_t rank; pmix_output_verbose(2, pmix_gds_base_framework.framework_output, "[%s:%d] gds:dstore:register_job_info for peer [%s:%d]", @@ -3181,8 +3181,8 @@ static pmix_status_t dstore_register_job_info(struct pmix_peer_t *pr, return rc; } - PMIX_LIST_FOREACH(rinfo, &ns->ranks, pmix_rank_info_t) { - proc.rank = rinfo->pname.rank; + for (rank=0; rank < ns->nprocs; rank++) { + proc.rank = rank; rc = _store_job_info(&proc); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/base/psensor_base_stubs.c b/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/base/psensor_base_stubs.c index c24b57d6986..b959372fe02 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/base/psensor_base_stubs.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/base/psensor_base_stubs.c @@ -1,7 +1,7 @@ /* * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. - * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2018 Intel, Inc. All rights reserved. * * $COPYRIGHT$ * @@ -24,6 +24,7 @@ pmix_status_t pmix_psensor_base_start(pmix_peer_t *requestor, pmix_status_t erro { pmix_psensor_active_module_t *mod; pmix_status_t rc; + bool didit = false; pmix_output_verbose(5, pmix_psensor_base_framework.framework_output, "%s:%d sensor:base: starting sensors", @@ -36,9 +37,17 @@ pmix_status_t pmix_psensor_base_start(pmix_peer_t *requestor, pmix_status_t erro if (PMIX_SUCCESS != rc && PMIX_ERR_TAKE_NEXT_OPTION != rc) { return rc; } + didit = true; } } + /* if none of the components could do it, then report + * not supported upwards so the server knows to ask + * the host to try */ + if (!didit) { + return PMIX_ERR_NOT_SUPPORTED; + } + return PMIX_SUCCESS; } @@ -46,7 +55,7 @@ pmix_status_t pmix_psensor_base_stop(pmix_peer_t *requestor, char *id) { pmix_psensor_active_module_t *mod; - pmix_status_t rc; + pmix_status_t rc, ret = PMIX_SUCCESS; pmix_output_verbose(5, pmix_psensor_base_framework.framework_output, "%s:%d sensor:base: stopping sensors", @@ -57,10 +66,14 @@ pmix_status_t pmix_psensor_base_stop(pmix_peer_t *requestor, if (NULL != mod->module->stop) { rc = mod->module->stop(requestor, id); if (PMIX_SUCCESS != rc && PMIX_ERR_TAKE_NEXT_OPTION != rc) { - return rc; + if (PMIX_SUCCESS == ret) { + ret = rc; + } + /* need to continue to ensure that all + * sensors have been stopped */ } } } - return PMIX_SUCCESS; + return ret; } diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/file/psensor_file.c b/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/file/psensor_file.c index ab4f9ce3f02..914e895a40a 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/file/psensor_file.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/file/psensor_file.c @@ -6,7 +6,7 @@ * Copyright (c) 2011-2012 Los Alamos National Security, LLC. * All rights reserved. * - * Copyright (c) 2017 Intel, Inc. All rights reserved. + * Copyright (c) 2017-2018 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -258,7 +258,9 @@ static pmix_status_t stop(pmix_peer_t *requestor, char *id) cd = PMIX_NEW(file_caddy_t); PMIX_RETAIN(requestor); cd->requestor = requestor; - cd->id = strdup(id); + if (NULL != id) { + cd->id = strdup(id); + } /* need to push into our event base to add this to our trackers */ pmix_event_assign(&cd->ev, pmix_psensor_base.evbase, -1, diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat.c b/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat.c index 7d363c030b4..f88ef0cdb69 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat.c @@ -3,7 +3,7 @@ * Copyright (c) 2011-2012 Los Alamos National Security, LLC. All rights * reserved. * - * Copyright (c) 2017 Intel, Inc. All rights reserved. + * Copyright (c) 2017-2018 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -30,7 +30,7 @@ #include "src/util/output.h" #include "src/util/show_help.h" #include "src/include/pmix_globals.h" -#include "src/mca/ptl/ptl.h" +#include "src/mca/ptl/base/base.h" #include "src/mca/psensor/base/base.h" #include "psensor_heartbeat.h" @@ -63,6 +63,7 @@ typedef struct { pmix_data_range_t range; pmix_info_t *info; size_t ninfo; + bool stopped; } pmix_heartbeat_trkr_t; static void ft_constructor(pmix_heartbeat_trkr_t *ft) @@ -79,6 +80,7 @@ static void ft_constructor(pmix_heartbeat_trkr_t *ft) ft->range = PMIX_RANGE_NAMESPACE; ft->info = NULL; ft->ninfo = 0; + ft->stopped = false; } static void ft_destructor(pmix_heartbeat_trkr_t *ft) { @@ -168,6 +170,7 @@ static pmix_status_t heartbeat_start(pmix_peer_t *requestor, pmix_status_t error { pmix_heartbeat_trkr_t *ft; size_t n; + pmix_ptl_posted_recv_t *rcv; PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output, "[%s:%d] checking heartbeat monitoring for requestor %s:%d", @@ -202,6 +205,17 @@ static pmix_status_t heartbeat_start(pmix_peer_t *requestor, pmix_status_t error return PMIX_ERR_BAD_PARAM; } + /* if the recv hasn't been posted, so so now */ + if (!mca_psensor_heartbeat_component.recv_active) { + /* setup to receive heartbeats */ + rcv = PMIX_NEW(pmix_ptl_posted_recv_t); + rcv->tag = PMIX_PTL_TAG_HEARTBEAT; + rcv->cbfunc = pmix_psensor_heartbeat_recv_beats; + /* add it to the beginning of the list of recvs */ + pmix_list_prepend(&pmix_ptl_globals.posted_recvs, &rcv->super); + mca_psensor_heartbeat_component.recv_active = true; + } + /* need to push into our event base to add this to our trackers */ pmix_event_assign(&ft->cdev, pmix_psensor_base.evbase, -1, EV_WRITE, add_tracker, ft); @@ -239,9 +253,11 @@ static pmix_status_t heartbeat_stop(pmix_peer_t *requestor, char *id) cd = PMIX_NEW(heartbeat_caddy_t); PMIX_RETAIN(requestor); cd->requestor = requestor; - cd->id = strdup(id); + if (NULL != id) { + cd->id = strdup(id); + } - /* need to push into our event base to add this to our trackers */ + /* need to push into our event base to remove this from our trackers */ pmix_event_assign(&cd->ev, pmix_psensor_base.evbase, -1, EV_WRITE, del_tracker, cd); PMIX_POST_OBJECT(cd); @@ -254,7 +270,7 @@ static void opcbfunc(pmix_status_t status, void *cbdata) { pmix_heartbeat_trkr_t *ft = (pmix_heartbeat_trkr_t*)cbdata; - PMIX_RELEASE(ft); + PMIX_RELEASE(ft); // maintain accounting } /* this function automatically gets periodically called @@ -274,23 +290,25 @@ static void check_heartbeat(int fd, short dummy, void *cbdata) pmix_globals.myid.nspace, pmix_globals.myid.rank, ft->requestor->info->pname.nspace, ft->requestor->info->pname.rank)); - if (0 == ft->nbeats) { + if (0 == ft->nbeats && !ft->stopped) { /* no heartbeat recvd in last window */ PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output, "[%s:%d] sensor:check_heartbeat failed for proc %s:%d", pmix_globals.myid.nspace, pmix_globals.myid.rank, ft->requestor->info->pname.nspace, ft->requestor->info->pname.rank)); - /* stop monitoring this client */ - pmix_list_remove_item(&mca_psensor_heartbeat_component.trackers, &ft->super); /* generate an event */ (void)strncpy(source.nspace, ft->requestor->info->pname.nspace, PMIX_MAX_NSLEN); source.rank = ft->requestor->info->pname.rank; + /* ensure the tracker remains throughout the process */ + PMIX_RETAIN(ft); + /* mark that the process appears stopped so we don't + * continue to report it */ + ft->stopped = true; rc = PMIx_Notify_event(PMIX_MONITOR_HEARTBEAT_ALERT, &source, ft->range, ft->info, ft->ninfo, opcbfunc, ft); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); } - return; } else { PMIX_OUTPUT_VERBOSE((1, pmix_psensor_base_framework.framework_output, "[%s:%d] sensor:check_heartbeat detected %d beats for proc %s:%d", @@ -316,6 +334,8 @@ static void add_beat(int sd, short args, void *cbdata) if (ft->requestor == b->peer) { /* increment the beat count */ ++ft->nbeats; + /* ensure we know that the proc is alive */ + ft->stopped = false; break; } } diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat.h b/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat.h index 2f904b60359..2052b0d9c66 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat.h +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat.h @@ -2,7 +2,7 @@ * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. * - * Copyright (c) 2017 Intel, Inc. All rights reserved. + * Copyright (c) 2017-2018 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -28,6 +28,7 @@ BEGIN_C_DECLS typedef struct { pmix_psensor_base_component_t super; + bool recv_active; pmix_list_t trackers; } pmix_psensor_heartbeat_component_t; diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat_component.c b/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat_component.c index 7f6f18f2ff7..1f56177dee2 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat_component.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/psensor/heartbeat/psensor_heartbeat_component.c @@ -1,7 +1,7 @@ /* * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. - * Copyright (c) 2017 Intel, Inc. All rights reserved. + * Copyright (c) 2017-2018 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -50,14 +50,9 @@ pmix_psensor_heartbeat_component_t mca_psensor_heartbeat_component = { */ static int heartbeat_open(void) { - pmix_status_t rc; - PMIX_CONSTRUCT(&mca_psensor_heartbeat_component.trackers, pmix_list_t); - /* setup to receive heartbeats */ - PMIX_PTL_RECV(rc, pmix_globals.mypeer, pmix_psensor_heartbeat_recv_beats, PMIX_PTL_TAG_HEARTBEAT); - - return rc; + return PMIX_SUCCESS; } @@ -74,12 +69,7 @@ static int heartbeat_query(pmix_mca_base_module_t **module, int *priority) static int heartbeat_close(void) { - pmix_status_t rc; - - /* cancel our persistent recv */ - PMIX_PTL_CANCEL(rc, pmix_globals.mypeer, PMIX_PTL_TAG_HEARTBEAT); - PMIX_LIST_DESTRUCT(&mca_psensor_heartbeat_component.trackers); - return rc; + return PMIX_SUCCESS; } diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c index 57cc89d9fb6..d1f7971e5bb 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c @@ -45,6 +45,7 @@ #include "src/server/pmix_server_ops.h" #include "src/util/error.h" #include "src/util/show_help.h" +#include "src/mca/psensor/psensor.h" #include "src/mca/ptl/base/base.h" @@ -140,6 +141,9 @@ void pmix_ptl_base_lost_connection(pmix_peer_t *peer, pmix_status_t err) } } } + /* cleanup any sensors that are monitoring them */ + pmix_psensor.stop(peer, NULL); + if (!peer->finalized && !PMIX_PROC_IS_TOOL(peer)) { /* if this peer already called finalize, then * we are just seeing their connection go away diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c index c07d8202e3d..5d317861825 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/tcp/ptl_tcp.c @@ -867,7 +867,7 @@ static pmix_status_t recv_connect_ack(int sd) /* get the current timeout value so we can reset to it */ sz = sizeof(save); if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &sz)) { - if (ENOPROTOOPT == errno) { + if (ENOPROTOOPT == errno || EOPNOTSUPP == errno) { sockopt = false; } else { return PMIX_ERR_UNREACH; diff --git a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/usock/ptl_usock.c b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/usock/ptl_usock.c index daf795cf054..515744ddde0 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/usock/ptl_usock.c +++ b/opal/mca/pmix/pmix2x/pmix/src/mca/ptl/usock/ptl_usock.c @@ -398,7 +398,7 @@ static pmix_status_t recv_connect_ack(int sd) /* get the current timeout value so we can reset to it */ sz = sizeof(save); if (0 != getsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, (void*)&save, &sz)) { - if (ENOPROTOOPT == errno) { + if (ENOPROTOOPT == errno || EOPNOTSUPP == errno) { sockopt = false; } else { return PMIX_ERR_UNREACH; diff --git a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c index f31d7cb9657..153bc9d66a4 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c +++ b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server.c @@ -65,6 +65,7 @@ #include "src/mca/bfrops/base/base.h" #include "src/mca/gds/base/base.h" #include "src/mca/preg/preg.h" +#include "src/mca/psensor/base/base.h" #include "src/mca/ptl/base/base.h" /* the server also needs access to client operations @@ -276,6 +277,16 @@ PMIX_EXPORT pmix_status_t PMIx_server_init(pmix_server_module_t *module, PMIX_RETAIN(pmix_globals.mypeer->info); pmix_client_globals.myserver->info = pmix_globals.mypeer->info; + /* open the psensor framework */ + if (PMIX_SUCCESS != (rc = pmix_mca_base_framework_open(&pmix_psensor_base_framework, 0))) { + PMIX_RELEASE_THREAD(&pmix_global_lock); + return rc; + } + if (PMIX_SUCCESS != (rc = pmix_psensor_base_select())) { + PMIX_RELEASE_THREAD(&pmix_global_lock); + return rc; + } + /* setup the wildcard recv for inbound messages from clients */ req = PMIX_NEW(pmix_ptl_posted_recv_t); req->tag = UINT32_MAX; @@ -357,6 +368,10 @@ PMIX_EXPORT pmix_status_t PMIx_server_finalize(void) if (NULL != gds_mode) { free(gds_mode); } + + /* close the psensor framework */ + (void)pmix_mca_base_framework_close(&pmix_psensor_base_framework); + pmix_rte_finalize(); pmix_output_verbose(2, pmix_globals.debug_output, @@ -841,6 +856,7 @@ static void _deregister_client(int sd, short args, void *cbdata) pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata; pmix_rank_info_t *info; pmix_nspace_t *nptr, *tmp; + pmix_peer_t *peer; PMIX_ACQUIRE_OBJECT(cd); @@ -863,6 +879,9 @@ static void _deregister_client(int sd, short args, void *cbdata) /* find and remove this client */ PMIX_LIST_FOREACH(info, &nptr->ranks, pmix_rank_info_t) { if (info->pname.rank == cd->proc.rank) { + if (NULL != (peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, info->peerid))) { + pmix_psensor.stop(peer, NULL); + } pmix_list_remove_item(&nptr->ranks, &info->super); PMIX_RELEASE(info); break; @@ -2363,6 +2382,9 @@ static void server_message_handler(struct pmix_peer_t *pr, PMIX_ERROR_LOG(PMIX_ERR_NOMEM); return; } + if (PMIX_OPERATION_SUCCEEDED == ret) { + ret = PMIX_SUCCESS; + } PMIX_BFROPS_PACK(rc, pr, reply, &ret, 1, PMIX_STATUS); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); diff --git a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c index 35965f57677..3205b761a4e 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c +++ b/opal/mca/pmix/pmix2x/pmix/src/server/pmix_server_ops.c @@ -50,6 +50,7 @@ #include "src/class/pmix_list.h" #include "src/mca/bfrops/bfrops.h" +#include "src/mca/psensor/psensor.h" #include "src/util/argv.h" #include "src/util/error.h" #include "src/util/output.h" @@ -1645,6 +1646,47 @@ static void local_cbfunc(pmix_status_t status, void *cbdata) PMIX_RELEASE(cd); } +static void intermed_step(pmix_status_t status, void *cbdata) +{ + pmix_notify_caddy_t *cd = (pmix_notify_caddy_t*)cbdata; + pmix_status_t rc; + + if (PMIX_SUCCESS != status) { + rc = status; + goto complete; + } + + /* check the range directive - if it is LOCAL, then we are + * done. Otherwise, it needs to go up to our + * host for dissemination */ + if (PMIX_RANGE_LOCAL == cd->range) { + rc = PMIX_SUCCESS; + goto complete; + } + + if (NULL == pmix_host_server.notify_event) { + rc = PMIX_ERR_NOT_SUPPORTED; + goto complete; + } + + /* pass it to our host RM for distribution */ + rc = pmix_host_server.notify_event(cd->status, &cd->source, cd->range, + cd->info, cd->ninfo, local_cbfunc, cd); + if (PMIX_SUCCESS == rc) { + /* let the callback function respond for us */ + return; + } + if (PMIX_OPERATION_SUCCEEDED == rc) { + rc = PMIX_SUCCESS; // local_cbfunc will not be called + } + + complete: + if (NULL != cd->cbfunc) { + cd->cbfunc(rc, cd->cbdata); + } + PMIX_RELEASE(cd); +} + pmix_status_t pmix_server_event_recvd_from_client(pmix_peer_t *peer, pmix_buffer_t *buf, pmix_op_cbfunc_t cbfunc, @@ -1653,13 +1695,11 @@ pmix_status_t pmix_server_event_recvd_from_client(pmix_peer_t *peer, int32_t cnt; pmix_status_t rc; pmix_notify_caddy_t *cd; + size_t ninfo; pmix_output_verbose(2, pmix_globals.debug_output, - "recvd event notification from client"); - - if (NULL == pmix_host_server.notify_event) { - return PMIX_ERR_NOT_SUPPORTED; - } + "%s:%d recvd event notification from client", + pmix_globals.myid.nspace, pmix_globals.myid.rank); cd = PMIX_NEW(pmix_notify_caddy_t); if (NULL == cd) { @@ -1689,44 +1729,36 @@ pmix_status_t pmix_server_event_recvd_from_client(pmix_peer_t *peer, /* unpack the info keys */ cnt = 1; - PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->ninfo, &cnt, PMIX_SIZE); + PMIX_BFROPS_UNPACK(rc, peer, buf, &ninfo, &cnt, PMIX_SIZE); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); goto exit; } - if (0 < cd->ninfo) { - PMIX_INFO_CREATE(cd->info, cd->ninfo); - if (NULL == cd->info) { - rc = PMIX_ERR_NOMEM; - goto exit; - } - cnt = cd->ninfo; + cd->ninfo = ninfo + 1; + PMIX_INFO_CREATE(cd->info, cd->ninfo); + if (NULL == cd->info) { + rc = PMIX_ERR_NOMEM; + goto exit; + } + if (0 < ninfo) { + cnt = ninfo; PMIX_BFROPS_UNPACK(rc, peer, buf, cd->info, &cnt, PMIX_INFO); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); goto exit; } } - - /* check the range directive - if it is LOCAL, then we just - * process it ourselves. Otherwise, it needs to go up to our - * host for dissemination */ - if (PMIX_RANGE_LOCAL == cd->range) { - if (PMIX_SUCCESS != (rc = pmix_server_notify_client_of_event(cd->status, - &cd->source, - cd->range, - cd->info, cd->ninfo, - local_cbfunc, cd))) { - goto exit; - } - return PMIX_SUCCESS; + /* add an info object to mark that we recvd this internally */ + PMIX_INFO_LOAD(&cd->info[ninfo], PMIX_SERVER_INTERNAL_NOTIFY, NULL, PMIX_BOOL); + /* process it */ + if (PMIX_SUCCESS != (rc = pmix_server_notify_client_of_event(cd->status, + &cd->source, + cd->range, + cd->info, cd->ninfo, + intermed_step, cd))) { + goto exit; } - - /* when we receive an event from a client, we just pass it to - * our host RM for distribution - if any targeted recipients - * are local to us, the host RM will let us know */ - pmix_host_server.notify_event(cd->status, &cd->source, cd->range, - cd->info, cd->ninfo, local_cbfunc, cd); + /* tell the switchyard we will handle it from here */ return PMIX_SUCCESS; exit: @@ -2032,9 +2064,6 @@ pmix_status_t pmix_server_monitor(pmix_peer_t *peer, pmix_output_verbose(2, pmix_globals.debug_output, "recvd monitor request from client"); - if (NULL == pmix_host_server.monitor) { - return PMIX_ERR_NOT_SUPPORTED; - } cd = PMIX_NEW(pmix_query_caddy_t); if (NULL == cd) { @@ -2077,6 +2106,24 @@ pmix_status_t pmix_server_monitor(pmix_peer_t *peer, } } + /* see if they are requesting one of the monitoring + * methods we internally support */ + rc = pmix_psensor.start(peer, error, &monitor, cd->info, cd->ninfo); + if (PMIX_SUCCESS == rc) { + rc = PMIX_OPERATION_SUCCEEDED; + goto exit; + } + if (PMIX_ERR_NOT_SUPPORTED != rc) { + goto exit; + } + + /* if we don't internally support it, see if + * our host does */ + if (NULL == pmix_host_server.monitor) { + rc = PMIX_ERR_NOT_SUPPORTED; + goto exit; + } + /* setup the requesting peer name */ (void)strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN); proc.rank = peer->info->pname.rank; diff --git a/opal/mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c b/opal/mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c index dd858eb80a3..9afbebb09d0 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c +++ b/opal/mca/pmix/pmix2x/pmix/src/tool/pmix_tool.c @@ -137,9 +137,9 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer, goto error; } - /* we always leave space for a callback object */ - chain->ninfo = ninfo + 1; - PMIX_INFO_CREATE(chain->info, chain->ninfo); + /* we always leave space for event hdlr name and a callback object */ + chain->nallocated = ninfo + 2; + PMIX_INFO_CREATE(chain->info, chain->nallocated); if (NULL == chain->info) { PMIX_ERROR_LOG(PMIX_ERR_NOMEM); PMIX_RELEASE(chain); @@ -155,16 +155,9 @@ static void pmix_tool_notify_recv(struct pmix_peer_t *peer, PMIX_RELEASE(chain); goto error; } - /* check for non-default flag */ - for (cnt=0; cnt < (int)ninfo; cnt++) { - if (0 == strncmp(chain->info[cnt].key, PMIX_EVENT_NON_DEFAULT, PMIX_MAX_KEYLEN)) { - chain->nondefault = PMIX_INFO_TRUE(&chain->info[cnt]); - break; - } - } } - /* now put the callback object tag in the last element */ - PMIX_INFO_LOAD(&chain->info[ninfo], PMIX_EVENT_RETURN_OBJECT, NULL, PMIX_POINTER); + /* prep the chain for processing */ + pmix_prep_event_chain(chain, chain->info, ninfo, false); pmix_output_verbose(2, pmix_globals.debug_output, "[%s:%d] pmix:tool_notify_recv - processing event %d, calling errhandler", diff --git a/opal/mca/pmix/pmix2x/pmix/src/util/path.c b/opal/mca/pmix/pmix2x/pmix/src/util/path.c index 0de2fafef6d..6903d10df61 100644 --- a/opal/mca/pmix/pmix2x/pmix/src/util/path.c +++ b/opal/mca/pmix/pmix2x/pmix/src/util/path.c @@ -15,6 +15,8 @@ * All rights reserved. * Copyright (c) 2014-2017 Intel, Inc. All rights reserved. * Copyright (c) 2016 University of Houston. All rights reserved. + * Copyright (c) 2018 Research Organization for Information Science + * and Technology (RIST). All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -54,9 +56,6 @@ #ifdef HAVE_SYS_STATVFS_H #include #endif -#ifdef HAVE_SYS_MOUNT_H -#include -#endif #ifdef HAVE_MNTENT_H #include #endif diff --git a/opal/mca/pmix/pmix2x/pmix/test/simple/Makefile.am b/opal/mca/pmix/pmix2x/pmix/test/simple/Makefile.am index 11b535d125c..198e8f4891d 100644 --- a/opal/mca/pmix/pmix2x/pmix/test/simple/Makefile.am +++ b/opal/mca/pmix/pmix2x/pmix/test/simple/Makefile.am @@ -24,7 +24,9 @@ AM_CPPFLAGS = -I$(top_builddir)/src -I$(top_builddir)/src/include -I$(top_buildd headers = simptest.h noinst_PROGRAMS = simptest simpclient simppub simpdyn simpft simpdmodex \ - test_pmix simptool simpdie simplegacy stability quietclient + test_pmix simptool simpdie simplegacy stability quietclient \ + test_pmix simptool simpdie simplegacy \ + stability quietclient simpjctrl simptest_SOURCES = $(headers) \ simptest.c @@ -97,3 +99,9 @@ quietclient_SOURCES = $(headers) \ quietclient_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) quietclient_LDADD = \ $(top_builddir)/src/libpmix.la + +simpjctrl_SOURCES = \ + simpjctrl.c +simpjctrl_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) +simpjctrl_LDADD = \ + $(top_builddir)/src/libpmix.la diff --git a/opal/mca/pmix/pmix2x/pmix/test/simple/simpjctrl.c b/opal/mca/pmix/pmix2x/pmix/test/simple/simpjctrl.c new file mode 100644 index 00000000000..c9ac506520a --- /dev/null +++ b/opal/mca/pmix/pmix2x/pmix/test/simple/simpjctrl.c @@ -0,0 +1,231 @@ +/* + * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2011 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include + +#include +#include "simptest.h" + +static pmix_proc_t myproc; + +/* this is the event notification function we pass down below + * when registering for general events - i.e.,, the default + * handler. We don't technically need to register one, but it + * is usually good practice to catch any events that occur */ +static void notification_fn(size_t evhdlr_registration_id, + pmix_status_t status, + const pmix_proc_t *source, + pmix_info_t info[], size_t ninfo, + pmix_info_t results[], size_t nresults, + pmix_event_notification_cbfunc_fn_t cbfunc, + void *cbdata) +{ + if (NULL != cbfunc) { + cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata); + } +} + +/* event handler registration is done asynchronously because it + * may involve the PMIx server registering with the host RM for + * external events. So we provide a callback function that returns + * the status of the request (success or an error), plus a numerical index + * to the registered event. The index is used later on to deregister + * an event handler - if we don't explicitly deregister it, then the + * PMIx server will do so when it see us exit */ +static void evhandler_reg_callbk(pmix_status_t status, + size_t evhandler_ref, + void *cbdata) +{ + mylock_t *lk = (mylock_t*)cbdata; + + if (PMIX_SUCCESS != status) { + fprintf(stderr, "Client %s:%d EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n", + myproc.nspace, myproc.rank, status, (unsigned long)evhandler_ref); + } + lk->status = status; + DEBUG_WAKEUP_THREAD(lk); +} + +static void infocbfunc(pmix_status_t status, + pmix_info_t *info, size_t ninfo, + void *cbdata, + pmix_release_cbfunc_t release_fn, + void *release_cbdata) +{ + mylock_t *lk = (mylock_t*)cbdata; + + fprintf(stderr, "Callback recvd with status %d\n", status); + + /* release the caller */ + if (NULL != release_fn) { + release_fn(release_cbdata); + } + + lk->status = status; + DEBUG_WAKEUP_THREAD(lk); +} + +int main(int argc, char **argv) +{ + int rc; + pmix_value_t value; + pmix_value_t *val = &value; + pmix_proc_t proc; + uint32_t nprocs, n; + pmix_info_t *info, *iptr; + bool flag; + mylock_t mylock; + pmix_data_array_t *dptr; + + /* init us - note that the call to "init" includes the return of + * any job-related info provided by the RM. */ + if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, NULL, 0))) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Init failed: %d\n", myproc.nspace, myproc.rank, rc); + exit(0); + } + fprintf(stderr, "Client ns %s rank %d: Running\n", myproc.nspace, myproc.rank); + + + /* register our default event handler - again, this isn't strictly + * required, but is generally good practice */ + DEBUG_CONSTRUCT_LOCK(&mylock); + PMIx_Register_event_handler(NULL, 0, NULL, 0, + notification_fn, evhandler_reg_callbk, (void*)&mylock); + DEBUG_WAIT_THREAD(&mylock); + if (0 != mylock.status) { + fprintf(stderr, "[%s:%d] Default handler registration failed\n", myproc.nspace, myproc.rank); + exit(mylock.status); + } + DEBUG_DESTRUCT_LOCK(&mylock); + + /* job-related info is found in our nspace, assigned to the + * wildcard rank as it doesn't relate to a specific rank. Setup + * a name to retrieve such values */ + PMIX_PROC_CONSTRUCT(&proc); + (void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN); + proc.rank = PMIX_RANK_WILDCARD; + + /* get our universe size */ + if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, PMIX_UNIV_SIZE, NULL, 0, &val))) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Get universe size failed: %d\n", myproc.nspace, myproc.rank, rc); + goto done; + } + nprocs = val->data.uint32; + PMIX_VALUE_RELEASE(val); + fprintf(stderr, "Client %s:%d universe size %d\n", myproc.nspace, myproc.rank, nprocs); + + /* inform the RM that we are preemptible, and that our checkpoint methods are + * "signal" on SIGUSR2 and event on PMIX_JCTRL_CHECKPOINT */ + PMIX_INFO_CREATE(info, 2); + flag = true; + PMIX_INFO_LOAD(&info[0], PMIX_JOB_CTRL_PREEMPTIBLE, (void*)&flag, PMIX_BOOL); + /* can't use "load" to load a pmix_data_array_t */ + (void)strncpy(info[1].key, PMIX_JOB_CTRL_CHECKPOINT_METHOD, PMIX_MAX_KEYLEN); + info[1].value.type = PMIX_DATA_ARRAY; + dptr = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t)); + info[1].value.data.darray = dptr; + dptr->type = PMIX_INFO; + dptr->size = 2; + PMIX_INFO_CREATE(dptr->array, dptr->size); + rc = SIGUSR2; + iptr = (pmix_info_t*)dptr->array; + PMIX_INFO_LOAD(&iptr[0], PMIX_JOB_CTRL_CHECKPOINT_SIGNAL, &rc, PMIX_INT); + rc = PMIX_JCTRL_CHECKPOINT; + PMIX_INFO_LOAD(&iptr[1], PMIX_JOB_CTRL_CHECKPOINT_EVENT, &rc, PMIX_STATUS); + + /* since this is informational and not a requested operation, the target parameter + * doesn't mean anything and can be ignored */ + DEBUG_CONSTRUCT_LOCK(&mylock); + if (PMIX_SUCCESS != (rc = PMIx_Job_control_nb(NULL, 0, info, 2, infocbfunc, (void*)&mylock))) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Job_control_nb failed: %d\n", myproc.nspace, myproc.rank, rc); + goto done; + } + DEBUG_WAIT_THREAD(&mylock); + PMIX_INFO_FREE(info, 2); + if (0 != mylock.status) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Job_control_nb failed: %d\n", myproc.nspace, myproc.rank, mylock.status); + exit(mylock.status); + } + DEBUG_DESTRUCT_LOCK(&mylock); + + /* now request that this process be monitored using heartbeats */ + PMIX_INFO_CREATE(iptr, 1); + PMIX_INFO_LOAD(&iptr[0], PMIX_MONITOR_HEARTBEAT, NULL, PMIX_POINTER); + + PMIX_INFO_CREATE(info, 3); + PMIX_INFO_LOAD(&info[0], PMIX_MONITOR_ID, "MONITOR1", PMIX_STRING); + n = 5; // require a heartbeat every 5 seconds + PMIX_INFO_LOAD(&info[1], PMIX_MONITOR_HEARTBEAT_TIME, &n, PMIX_UINT32); + n = 2; // two heartbeats can be missed before declaring us "stalled" + PMIX_INFO_LOAD(&info[2], PMIX_MONITOR_HEARTBEAT_DROPS, &n, PMIX_UINT32); + + /* make the request */ + DEBUG_CONSTRUCT_LOCK(&mylock); + if (PMIX_SUCCESS != (rc = PMIx_Process_monitor_nb(iptr, PMIX_MONITOR_HEARTBEAT_ALERT, + info, 3, infocbfunc, (void*)&mylock))) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Process_monitor_nb failed: %d\n", myproc.nspace, myproc.rank, rc); + goto done; + } + DEBUG_WAIT_THREAD(&mylock); + PMIX_INFO_FREE(iptr, 1); + PMIX_INFO_FREE(info, 3); + if (0 != mylock.status) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Process_monitor_nb failed: %d\n", myproc.nspace, myproc.rank, mylock.status); + exit(mylock.status); + } + DEBUG_DESTRUCT_LOCK(&mylock); + + /* send a heartbeat */ + PMIx_Heartbeat(); + + /* call fence to synchronize with our peers - no need to + * collect any info as we didn't "put" anything */ + PMIX_INFO_CREATE(info, 1); + flag = false; + PMIX_INFO_LOAD(info, PMIX_COLLECT_DATA, &flag, PMIX_BOOL); + if (PMIX_SUCCESS != (rc = PMIx_Fence(&proc, 1, info, 1))) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Fence failed: %d\n", myproc.nspace, myproc.rank, rc); + goto done; + } + PMIX_INFO_FREE(info, 1); + + + done: + /* finalize us */ + fprintf(stderr, "Client ns %s rank %d: Finalizing\n", myproc.nspace, myproc.rank); + if (PMIX_SUCCESS != (rc = PMIx_Finalize(NULL, 0))) { + fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize failed: %d\n", myproc.nspace, myproc.rank, rc); + } else { + fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize successfully completed\n", myproc.nspace, myproc.rank); + } + fflush(stderr); + return(0); +} diff --git a/opal/mca/pmix/pmix2x/pmix/test/simple/simptest.c b/opal/mca/pmix/pmix2x/pmix/test/simple/simptest.c index fcba809a886..fe6472e61fd 100644 --- a/opal/mca/pmix/pmix2x/pmix/test/simple/simptest.c +++ b/opal/mca/pmix/pmix2x/pmix/test/simple/simptest.c @@ -101,6 +101,18 @@ static void log_fn(const pmix_proc_t *client, const pmix_info_t data[], size_t ndata, const pmix_info_t directives[], size_t ndirs, pmix_op_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t alloc_fn(const pmix_proc_t *client, + pmix_alloc_directive_t directive, + const pmix_info_t data[], size_t ndata, + pmix_info_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t jctrl_fn(const pmix_proc_t *requestor, + const pmix_proc_t targets[], size_t ntargets, + const pmix_info_t directives[], size_t ndirs, + pmix_info_cbfunc_t cbfunc, void *cbdata); +static pmix_status_t mon_fn(const pmix_proc_t *requestor, + const pmix_info_t *monitor, pmix_status_t error, + const pmix_info_t directives[], size_t ndirs, + pmix_info_cbfunc_t cbfunc, void *cbdata); static pmix_server_module_t mymodule = { .client_connected = connected, @@ -119,7 +131,10 @@ static pmix_server_module_t mymodule = { .notify_event = notify_event, .query = query_fn, .tool_connected = tool_connect_fn, - .log = log_fn + .log = log_fn, + .allocate = alloc_fn, + .job_control = jctrl_fn, + .monitor = mon_fn }; typedef struct { @@ -161,6 +176,7 @@ PMIX_CLASS_INSTANCE(myxfer_t, typedef struct { pmix_list_item_t super; + int exit_code; pid_t pid; } wait_tracker_t; PMIX_CLASS_INSTANCE(wait_tracker_t, @@ -246,6 +262,17 @@ static void model_registration_callback(pmix_status_t status, *active = status; } +static void set_handler_default(int sig) +{ + struct sigaction act; + + act.sa_handler = SIG_DFL; + act.sa_flags = 0; + sigemptyset(&act.sa_mask); + + sigaction(sig, &act, (struct sigaction *)0); +} + int main(int argc, char **argv) { char **client_env=NULL; @@ -425,14 +452,22 @@ int main(int argc, char **argv) PMIx_server_finalize(); return -1; } - child = PMIX_NEW(wait_tracker_t); - child->pid = pid; - pmix_list_append(&children, &child->super); - if (pid == 0) { + sigset_t sigs; + set_handler_default(SIGTERM); + set_handler_default(SIGINT); + set_handler_default(SIGHUP); + set_handler_default(SIGPIPE); + set_handler_default(SIGCHLD); + sigprocmask(0, 0, &sigs); + sigprocmask(SIG_UNBLOCK, &sigs, 0); execve(executable, client_argv, client_env); /* Does not return */ exit(0); + } else { + child = PMIX_NEW(wait_tracker_t); + child->pid = pid; + pmix_list_append(&children, &child->super); } } free(executable); @@ -447,6 +482,15 @@ int main(int argc, char **argv) nanosleep(&ts, NULL); } + /* see if anyone exited with non-zero status */ + n=0; + PMIX_LIST_FOREACH(child, &children, wait_tracker_t) { + if (0 != child->exit_code) { + fprintf(stderr, "Child %d [%d] exited with status %d - test FAILED\n", n, child->pid, child->exit_code); + } + ++n; + } + /* try notifying ourselves */ ninfo = 3; PMIX_INFO_CREATE(info, ninfo); @@ -853,7 +897,8 @@ static pmix_status_t notify_event(pmix_status_t code, pmix_info_t info[], size_t ninfo, pmix_op_cbfunc_t cbfunc, void *cbdata) { - return PMIX_SUCCESS; + pmix_output(0, "SERVER: NOTIFY EVENT"); + return PMIX_OPERATION_SUCCEEDED; } typedef struct query_data_t { @@ -917,6 +962,31 @@ static void log_fn(const pmix_proc_t *client, } } +static pmix_status_t alloc_fn(const pmix_proc_t *client, + pmix_alloc_directive_t directive, + const pmix_info_t data[], size_t ndata, + pmix_info_cbfunc_t cbfunc, void *cbdata) +{ + return PMIX_SUCCESS; +} + +static pmix_status_t jctrl_fn(const pmix_proc_t *requestor, + const pmix_proc_t targets[], size_t ntargets, + const pmix_info_t directives[], size_t ndirs, + pmix_info_cbfunc_t cbfunc, void *cbdata) +{ + return PMIX_OPERATION_SUCCEEDED; +} + +static pmix_status_t mon_fn(const pmix_proc_t *requestor, + const pmix_info_t *monitor, pmix_status_t error, + const pmix_info_t directives[], size_t ndirs, + pmix_info_cbfunc_t cbfunc, void *cbdata) +{ + return PMIX_ERR_NOT_SUPPORTED; +} + + static void wait_signal_callback(int fd, short event, void *arg) { pmix_event_t *sig = (pmix_event_t*) arg; @@ -947,8 +1017,9 @@ static void wait_signal_callback(int fd, short event, void *arg) if (pid == t2->pid) { /* found it! */ --wakeup; - break; + return; } } } + fprintf(stderr, "ENDLOOP\n"); }