diff --git a/CMakeLists.txt b/CMakeLists.txt index 844a4499..06a686e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -175,7 +175,7 @@ if ( LIB_MATH AND LIB_DL AND MPI_FOUND ) endif() if (ENABLE_IBVERBS) - list(APPEND ENGINES "ibverbs") + list(APPEND ENGINES "ibverbs" "zero") endif() endif() diff --git a/NOTICE b/NOTICE index 1f386452..3992b64c 100644 --- a/NOTICE +++ b/NOTICE @@ -33,6 +33,8 @@ Implementation 1) BSMP 2) Collectives 3) Pthread implementation + - 2022 - 2024, Kiril Dichev + 1) Develop zero engine for LPF - 2018, Pierre Leca 1) Usability improvements of compiler frontends and CMake integration @@ -50,6 +52,8 @@ Quality Assurance - 2015 - 2017, Albert-Jan Yzelman 1) Performance test suite + - 2022 - 2024, Kiril Dichev + 1) Rewrite all functional tests to use CTest/Gtest Miscellaneous / Acknowledgments diff --git a/bootstrap.sh b/bootstrap.sh index 1bc1835c..4c3d4e68 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -278,13 +278,13 @@ echo "--------------------------------------------------" echo ${CMAKE_EXE} -Wno-dev \ -DCMAKE_INSTALL_PREFIX="$installdir" \ - -DCMAKE_BUILD_TYPE=$config \ - -DLPFLIB_MAKE_DOC=$doc \ - -DLPFLIB_MAKE_TEST_DOC=$doc \ + -DCMAKE_BUILD_TYPE=$config \ + -DLPFLIB_MAKE_DOC=$doc \ + -DLPFLIB_MAKE_TEST_DOC=$doc \ -DLPF_ENABLE_TESTS=$functests \ -DGTEST_AGREE_TO_LICENSE=$googletest_license_agreement \ - -DLPFLIB_PERFTESTS=$perftests \ - -DLPFLIB_CONFIG_NAME=${config_name:-${config}}\ + -DLPFLIB_PERFTESTS=$perftests \ + -DLPFLIB_CONFIG_NAME=${config_name:-${config}} \ -DLPF_HWLOC="${hwloc}" \ $hwloc_found_flag \ $mpi_cmake_flags \ diff --git a/include/debug/lpf/core.h b/include/debug/lpf/core.h index ff2306c6..4de8881b 100644 --- a/include/debug/lpf/core.h +++ b/include/debug/lpf/core.h @@ -64,6 +64,12 @@ extern "C" { #define lpf_sync( ctx, attrs ) \ lpf_debug_sync( __FILE__, __LINE__, (ctx), (attrs) ) +#define lpf_counting_sync_per_tag( ctx, attrs, slot, expected_sends, expected_rcvs ) \ + lpf_debug_counting_sync_per_tag( __FILE__, __LINE__, (ctx), (attrs), (slot), (expected_sends), (expected_rcvs) ) + +#define lpf_sync_per_tag( ctx, attrs, slot) \ + lpf_debug_sync_per_tag( __FILE__, __LINE__, (ctx), (attrs), (slot)) + #define lpf_resize_memory_register( ctx, size ) \ lpf_debug_resize_memory_register( __FILE__, __LINE__, (ctx), (size) ) @@ -128,6 +134,9 @@ extern _LPFLIB_API lpf_err_t lpf_debug_sync( const char * file, int line, lpf_t ctx, lpf_sync_attr_t attr ); +lpf_err_t lpf_debug_counting_sync_per_tag( const char * file, int line, + lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sends, size_t expected_rcvs); + extern _LPFLIB_API lpf_err_t lpf_debug_resize_memory_register( const char * file, int line, lpf_t ctx, size_t max_regs ); diff --git a/include/lpf/collectives.h b/include/lpf/collectives.h index 4304c5f0..871b7f27 100644 --- a/include/lpf/collectives.h +++ b/include/lpf/collectives.h @@ -116,6 +116,16 @@ typedef void (*lpf_combiner_t) (size_t n, const void * combine, void * into ); */ extern _LPFLIB_API const lpf_coll_t LPF_INVALID_COLL; +/** + * ToDo: document allgatherv + */ +lpf_err_t lpf_allgatherv( + lpf_coll_t coll, + lpf_memslot_t src, + lpf_memslot_t dst, + size_t *sizes, + bool exclude_myself + ); /** * Initialises a collectives struct, which allows the scheduling of collective * calls. The initialised struct is only valid after a next call to lpf_sync(). diff --git a/include/lpf/core.h b/include/lpf/core.h index 9c0d1da8..772fa92e 100644 --- a/include/lpf/core.h +++ b/include/lpf/core.h @@ -705,7 +705,7 @@ extern "C" { * released, and NN the number of the specifications released before this one in * the same year. */ -#define _LPF_VERSION 202000L +#define _LPF_VERSION 202400L /** * An implementation that has defined this macro may never define the @@ -984,7 +984,7 @@ typedef struct lpf_machine { * byte. This value may depend on the actual number of processes \a p used, * the minimum message size \a min_msg_size the user aims to send and * receive, and the type of synchronisation requested via \a attr. The - * value is bitwise equivalent across all processes. + * value is bitwise equivalent across all processes. * * \param[in] p A value between 1 and #lpf_machine_t.p, where * both bounds are inclusive. @@ -2060,6 +2060,25 @@ lpf_err_t lpf_get( extern _LPFLIB_API lpf_err_t lpf_sync( lpf_t ctx, lpf_sync_attr_t attr ); +/** + * This synchronisation waits on memory slot #slot to complete sending + * and receiving @expected_sent and @expected_rcvd messages. The counts are + * checked in the ibv_poll_cq calls and associated to certain LPF slots. + * This call is only implemented for IB verbs at the moment. + */ +extern _LPFLIB_API +lpf_err_t lpf_counting_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd); + +/** + * This synchronisation waits on memory slot #slot to complete sending + * or receiving all outstanding messages. For the current implementation + * in IB verbs, this means all scheduled sends via ibv_post_send are + * checked for completion via ibv_poll_cq. Currently, there is no logic + * scheduling receives, but only sends -- for either get or put. + */ +extern _LPFLIB_API +lpf_err_t lpf_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot); + /** * This primitive allows a user to inspect the machine that this LPF program * has been assigned. All resources reported in the #lpf_machine_t struct are @@ -2317,6 +2336,122 @@ lpf_err_t lpf_resize_memory_register( lpf_t ctx, size_t max_regs ); extern _LPFLIB_API lpf_err_t lpf_resize_message_queue( lpf_t ctx, size_t max_msgs ); +/** + * This call blockingly locks a destination slot #dst_slot, relying + * on IBVerbs Compare-and-Swap atomics. + * For an example, check tests/functional/func_lpf_compare_and_swap.ibverbs.c + * It is only implemented for the zero backend (on Infiniband) + * \param[in] ctx The LPF context + * \param[in] src_slot Local slot used as source for the + * operation to lock the destination slot, registered via lpf_register_local() + * \param[in] src_offset Source offset to use (0 in most cases) + * \param[in] dst_pid The process ID of the destination process + * \param[in] dst_slot The memory slot of the remote destination memory area + * registered via lpf_register_global(). + * \param[in] dst_offset Destinaton offset (0 in most cases) + * \param[in] size The number of bytes to copy from the source memory area to + * the destination memory area (#lpf_memslot_t in most cases) + * \param[in] attr A #lpf_sync_attr_t value (use #LPF_MSG_DEFAULT) + * \returns #LPF_SUCCESS + * When this process successfully locks the slot + */ +extern _LPFLIB_API +lpf_err_t lpf_lock_slot( + lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +); + +/** + * This call blockingly unlocks a destination slot #dst_slot, relying + * on IBVerbs Compare-and-Swap atomics. + * For an example, check tests/functional/func_lpf_compare_and_swap.ibverbs.c + * It is only implemented for the zero backend (on Infiniband) + * \param[in] ctx The LPF context + * \param[in] src_slot Local slot used as source for the + * operation to lock the destination slot, registered via lpf_register_local() + * \param[in] src_offset Source offset to use (0 in most cases) + * \param[in] dst_pid The process ID of the destination process + * \param[in] dst_slot The memory slot of the remote destination memory area + * registered via lpf_register_global(). + * \param[in] dst_offset Destinaton offset (0 in most cases) + * \param[in] size The number of bytes to copy from the source memory area to + * the destination memory area (#lpf_memslot_t in most cases) + * \param[in] attr A #lpf_sync_attr_t value (use #LPF_MSG_DEFAULT) + * \returns #LPF_SUCCESS + * When this process successfully locks the slot + */ +extern _LPFLIB_API +lpf_err_t lpf_unlock_slot( + lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +); + +/** + * This function returns in @rcvd_msgs the received message count on + * LPF slot #slot. It is only implemented for the zero backend (on Infiniband) + * \param[in] ctx The LPF context + * \param[out] rcvd_msgs Received message count + * \param[in] slot LPF slot to check received messages for + */ +extern _LPFLIB_API +lpf_err_t lpf_get_rcvd_msg_count_per_slot( lpf_t ctx, size_t *rcvd_msgs, lpf_memslot_t slot); + +/** + * This function returns in @rcvd_msgs the total received message count. + * It is only implemented for the zero backend (on Infiniband) + * \param[in] ctx The LPF context + * \param[out] rcvd_msgs Received message count + */ +extern _LPFLIB_API +lpf_err_t lpf_get_rcvd_msg_count( lpf_t ctx, size_t *rcvd_msgs); + +/** + * This function returns in @sent_msgs the sent message count on LPF + * slot #slot. It is only implemented for the zero backend (on Infiniband) + * \param[in] ctx The LPF context + * \param[out] sent_msgs Total messages sent on #slot + * \param[in] slot + */ +extern _LPFLIB_API +lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t ctx, size_t *sent_msgs, lpf_memslot_t slot); + +/** + * This function blocks until all the scheduled messages via + * ibv_post_send are completed (via ibv_poll_cq). This includes + * both put and get calls on the local process. + * No concept of slots is used here. + * This allows to reuse the send buffers e.g. in higher-level channel + * libraries. + * It is only implemented for the zero backend (on Infiniband) + * \param[in] ctx The LPF context + */ +extern _LPFLIB_API +lpf_err_t lpf_flush_sent( lpf_t ctx); + +/** + * This function blocks until all the incoming received messages + * waiting on the receive completion queue are handled (via ibv_poll_cq). + * No concept of slots is used here. + * This allows to reuse the send buffers e.g. in higher-level channel + * libraries. + * It is only implemented for the zero backend (on Infiniband) + * \param[in] ctx The LPF context + */ +extern _LPFLIB_API +lpf_err_t lpf_flush_received( lpf_t ctx); + #ifdef __cplusplus } #endif diff --git a/include/lpf/static_dispatch.h b/include/lpf/static_dispatch.h index e9eea40b..f28f07f1 100644 --- a/include/lpf/static_dispatch.h +++ b/include/lpf/static_dispatch.h @@ -40,8 +40,15 @@ #undef lpf_get #undef lpf_put #undef lpf_sync +#undef lpf_counting_sync_per_slot +#undef lpf_sync_per_slot #undef lpf_register_local +#undef lpf_get_rcvd_msg_count +#undef lpf_get_rcvd_msg_count_per_slot +#undef lpf_get_sent_msg_count_per_slot #undef lpf_register_global +#undef lpf_flush_sent +#undef lpf_flush_received #undef lpf_deregister #undef lpf_probe #undef lpf_resize_memory_register @@ -85,7 +92,14 @@ #define lpf_get LPF_FUNC(get) #define lpf_put LPF_FUNC(put) #define lpf_sync LPF_FUNC(sync) +#define lpf_counting_sync_per_slot LPF_FUNC(counting_sync_per_slot) +#define lpf_sync_per_slot LPF_FUNC(sync_per_slot) #define lpf_register_local LPF_FUNC(register_local) +#define lpf_get_rcvd_msg_count LPF_FUNC(get_rcvd_msg_count) +#define lpf_get_rcvd_msg_count_per_slot LPF_FUNC(get_rcvd_msg_count_per_slot) +#define lpf_get_sent_msg_count_per_slot LPF_FUNC(get_sent_msg_count_per_slot) +#define lpf_flush_sent LPF_FUNC(flush_sent) +#define lpf_flush_received LPF_FUNC(flush_received) #define lpf_register_global LPF_FUNC(register_global) #define lpf_deregister LPF_FUNC(deregister) #define lpf_probe LPF_FUNC(probe) diff --git a/lpfrun.in b/lpfrun.in index 640fdc00..558a96d5 100644 --- a/lpfrun.in +++ b/lpfrun.in @@ -57,7 +57,7 @@ function printhelp() echo echo " -engine " echo " Allow you to choose the engine. Currently supported" - echo " are: pthread, mpirma, mpimsg, ibverbs, hybrid" + echo " are: pthread, mpirma, mpimsg, ibverbs, zero, hybrid" echo echo " -probe " echo " Set the number of seconds to probe the system for BSP" @@ -846,7 +846,7 @@ case $engine in exit_status=$? ;; - mpirma|mpimsg|ibverbs) + mpirma|mpimsg|ibverbs|zero) mpi_impl=$(mpi_detect) proc_args= @@ -1128,8 +1128,8 @@ case $engine in ;; *) - echo "Engine '$engine' is not supported. Please choose 'pthread'," - echo "'mpirma', or 'hybrid'" + echo "Engine '$engine' is not supported. Please choose " + echo "'pthread', 'mpirma', 'mpimsg', 'ibverbs, 'zero', 'hybrid'" exit_status=1 ;; esac diff --git a/post-install/post-install-test.cmake.in b/post-install/post-install-test.cmake.in index edd06922..05786d26 100644 --- a/post-install/post-install-test.cmake.in +++ b/post-install/post-install-test.cmake.in @@ -353,6 +353,9 @@ endif() ###### CMake integration using generated CMake module file ############ foreach(engine @ENGINES@) + if ("${engine}" STREQUAL "zero") + continue() + endif() message("Testing generated CMake module files for engine ${engine}") set(test_dir @builddir@/cmake-module-test-${engine}) diff --git a/post-install/test-lpf-nprocs.c b/post-install/test-lpf-nprocs.c index cf274b3f..554b5775 100644 --- a/post-install/test-lpf-nprocs.c +++ b/post-install/test-lpf-nprocs.c @@ -53,6 +53,8 @@ void spmd( lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args ) lpf_memslot_t mem_slot = LPF_INVALID_MEMSLOT; lpf_register_global( lpf, mem, nprocs, &mem_slot ); + lpf_sync(lpf, LPF_SYNC_DEFAULT); + if (pid != 0) lpf_get( lpf, 0, params_slot, 0, params_slot, 0, sizeof(params), LPF_MSG_DEFAULT ); diff --git a/src/MPI/CMakeLists.txt b/src/MPI/CMakeLists.txt index 757b9004..d837edb7 100644 --- a/src/MPI/CMakeLists.txt +++ b/src/MPI/CMakeLists.txt @@ -23,7 +23,7 @@ if (MPI_FOUND) endif() if (ENABLE_IBVERBS) - list(APPEND MPI_ENGINES ibverbs) + list(APPEND MPI_ENGINES ibverbs zero) endif() if (MPI_IBARRIER) @@ -52,6 +52,10 @@ if (MPI_FOUND) set(ibverbs_sources ibverbs.cpp) endif() + if (LPF_IMPL_ID STREQUAL zero) + set(ibverbs_sources ibverbsZero.cpp) + endif() + add_library(raw_${libname} OBJECT memorytable.cpp mesgqueue.cpp @@ -127,9 +131,9 @@ if (MPI_FOUND) ${LIB_POSIX_THREADS} ) - if (engine STREQUAL ibverbs) - target_link_libraries(${target} ${LIB_IBVERBS}) - endif() + if (engine STREQUAL ibverbs OR engine STREQUAL zero) + target_link_libraries(${target} ${LIB_IBVERBS}) + endif() endfunction() @@ -176,6 +180,10 @@ if (MPI_FOUND) add_gtest( ibverbs_test "ibverbs" ON ${CMAKE_CURRENT_SOURCE_DIR}/ibverbs.t.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ibverbs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/mpilib.cpp) + + add_gtest( zero_test "zero" ON ${CMAKE_CURRENT_SOURCE_DIR}/ibverbs.t.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/ibverbsZero.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/mpilib.cpp) endif() foreach (engine ${MPI_ENGINES}) diff --git a/src/MPI/core.cpp b/src/MPI/core.cpp index 94a9658f..a61e6376 100644 --- a/src/MPI/core.cpp +++ b/src/MPI/core.cpp @@ -267,6 +267,111 @@ lpf_err_t lpf_sync( lpf_t ctx, lpf_sync_attr_t attr ) return realContext(ctx)->sync(); } + +lpf_err_t lpf_lock_slot( lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +) +{ + (void) attr; // ignore parameter 'msg' since this implementation only + // implements core functionality + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) + i->lockSlot( src_slot, src_offset, dst_pid, dst_slot, dst_offset, size ); + return LPF_SUCCESS; +} + +lpf_err_t lpf_unlock_slot( lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +) +{ + (void) attr; // ignore parameter 'msg' since this implementation only + // implements core functionality + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) + i->unlockSlot( src_slot, src_offset, dst_pid, dst_slot, dst_offset, size ); + return LPF_SUCCESS; +} + +lpf_err_t lpf_counting_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + (void) attr; // ignore attr parameter since this implementation only + // implements core functionality + return realContext(ctx)->countingSyncPerSlot(slot, expected_sent, expected_rcvd); +} + +lpf_err_t lpf_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot) +{ + (void) attr; // ignore attr parameter since this implementation only + // implements core functionality + return realContext(ctx)->syncPerSlot(slot); +} + +lpf_err_t lpf_get_rcvd_msg_count_per_slot( lpf_t ctx, size_t * rcvd_msgs, lpf_memslot_t slot) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->getRcvdMsgCountPerSlot(rcvd_msgs, slot); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_rcvd_msg_count( lpf_t ctx, size_t * rcvd_msgs) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->getRcvdMsgCount(rcvd_msgs); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_sent_msg_count( lpf_t ctx, size_t * sent_msgs) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->getSentMsgCount(sent_msgs); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t ctx, size_t * sent_msgs, lpf_memslot_t slot) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->getSentMsgCountPerSlot(sent_msgs, slot); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_flush_sent( lpf_t ctx) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->flushSent(); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_flush_received( lpf_t ctx) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->flushReceived(); + } + return LPF_SUCCESS; +} + lpf_err_t lpf_probe( lpf_t ctx, lpf_machine_t * params ) { lpf::Interface * i = realContext(ctx); diff --git a/src/MPI/ibverbs.cpp b/src/MPI/ibverbs.cpp index 44852caa..20c431a8 100644 --- a/src/MPI/ibverbs.cpp +++ b/src/MPI/ibverbs.cpp @@ -45,9 +45,9 @@ namespace { } } - IBVerbs :: IBVerbs( Communication & comm ) - : m_pid( comm.pid() ) + : m_comm( comm ) + , m_pid( comm.pid() ) , m_nprocs( comm.nprocs() ) , m_devName() , m_ibPort( Config::instance().getIBPort() ) @@ -72,7 +72,6 @@ IBVerbs :: IBVerbs( Communication & comm ) , m_memreg() , m_dummyMemReg() , m_dummyBuffer() - , m_comm( comm ) { m_peerList.reserve( m_nprocs ); @@ -97,7 +96,6 @@ IBVerbs :: IBVerbs( Communication & comm ) throw Exception( "No Infiniband devices available" ); } - std::string wantDevName = Config::instance().getIBDeviceName(); LOG( 3, "Searching for device '"<< wantDevName << "'" ); struct ibv_device * dev = NULL; @@ -144,7 +142,8 @@ IBVerbs :: IBVerbs( Communication & comm ) // maximum number of work requests per Queue Pair m_maxSrs = std::min( m_deviceAttr.max_qp_wr, // maximum work requests per QP m_deviceAttr.max_cqe ); // maximum entries per CQ - LOG(3, "Maximum number of send requests is the minimum of " + + LOG(3, "Initial maximum number of send requests is the minimum of " << m_deviceAttr.max_qp_wr << " (the maximum of work requests per QP)" << " and " << m_deviceAttr.max_cqe << " (the maximum of completion " << " queue entries per QP), nameley " << m_maxSrs ); @@ -196,6 +195,58 @@ IBVerbs :: IBVerbs( Communication & comm ) LOG(3, "Allocated completion queue with " << m_nprocs << " entries."); + /* + * Unfortunately, some RDMA devices advertise max_qp_wr but + * support a much smaller number. We can probe that. + * Note that the inofficial documentation on rdmamojo.com states: + * + * There may be RDMA devices that for specific transport types may support less outstanding Work Requests than the maximum reported value." + * + * Therefore, we here do binary search to find the actual value + */ + struct ibv_qp_init_attr testAttr; + std::memset(&testAttr, 0, sizeof(testAttr)); + + // We only care about the attr.cap.max_send_wr + testAttr.qp_type = IBV_QPT_RC; + + struct ibv_qp * ibv_new_qp_p; + testAttr.cap.max_send_wr = m_maxSrs; + testAttr.send_cq = m_cq.get(); + testAttr.recv_cq = m_cq.get(); + ibv_new_qp_p = ibv_create_qp(m_pd.get(), &testAttr); + if (ibv_new_qp_p == NULL) { + size_t left = 1; + size_t right = m_maxSrs; + size_t largestOkaySize = 0; + while (left <= right) + { + size_t mid = (left + right) / 2; + testAttr.cap.max_send_wr = mid; + // test if call succeeds + ibv_new_qp_p = ibv_create_qp(m_pd.get(), &testAttr); + if (ibv_new_qp_p == NULL) { + if (errno != EINVAL) { // error points to unsupported max_send_wr by device + throw Exception("Unexpected error code during binary search for maximum send WR."); + } + else { + right = mid - 1; + } + } + else { + // clean up dummy QP + ibv_destroy_qp(ibv_new_qp_p); + left = mid + 1; + // record that we still succeed + largestOkaySize = mid; + } + } + ASSERT(largestOkaySize > 0); + m_maxSrs = largestOkaySize; + LOG(3, "Revised maximum number of send requests is " << m_maxSrs ); + } + + // allocate dummy buffer m_dummyBuffer.resize( 8 ); struct ibv_mr * const ibv_reg_mr_new_p = ibv_reg_mr( @@ -237,11 +288,8 @@ void IBVerbs :: stageQPs( size_t maxMsgs ) attr.cap.max_recv_sge = 1; struct ibv_qp * const ibv_new_qp_p = ibv_create_qp( m_pd.get(), &attr ); - if( ibv_new_qp_p == NULL ) { - m_stagedQps[i].reset(); - } else { - m_stagedQps[i].reset( ibv_new_qp_p, ibv_destroy_qp ); - } + + m_stagedQps[i].reset( ibv_new_qp_p, ibv_destroy_qp ); if (!m_stagedQps[i]) { LOG( 1, "Could not create Infiniband Queue pair number " << i ); throw std::bad_alloc(); @@ -413,8 +461,8 @@ void IBVerbs :: resizeMemreg( size_t size ) throw std::bad_alloc() ; } - MemoryRegistration null = { 0, 0, 0, 0 }; - MemorySlot dflt; dflt.glob.resize( m_nprocs, null ); + MemoryRegistration newMR = { nullptr, 0, 0, 0, m_pid}; + MemorySlot dflt; dflt.glob.resize( m_nprocs, newMR ); m_memreg.reserve( size, dflt ); } @@ -457,11 +505,7 @@ IBVerbs :: SlotID IBVerbs :: regLocal( void * addr, size_t size ) throw Exception("Could not register memory area"); } } - MemoryRegistration local; - local.addr = addr; - local.size = size; - local.lkey = size?slot.mr->lkey:0; - local.rkey = size?slot.mr->rkey:0; + MemoryRegistration local((char *) addr, size, size?slot.mr->lkey:0, size?slot.mr->rkey:0, m_pid); SlotID id = m_memreg.addLocalReg( slot ); @@ -501,11 +545,7 @@ IBVerbs :: SlotID IBVerbs :: regGlobal( void * addr, size_t size ) // exchange memory registration info globally ref.glob.resize(m_nprocs); - MemoryRegistration local; - local.addr = addr; - local.size = size; - local.lkey = size?slot.mr->lkey:0; - local.rkey = size?slot.mr->rkey:0; + MemoryRegistration local((char *) addr, size, size?slot.mr->lkey:0, size?slot.mr->rkey:0, m_pid); LOG(4, "All-gathering memory register data" ); @@ -533,13 +573,13 @@ void IBVerbs :: put( SlotID srcSlot, size_t srcOffset, struct ibv_send_wr sr; std::memset(&sr, 0, sizeof(sr)); const char * localAddr - = static_cast(src.glob[m_pid].addr) + srcOffset; + = static_cast(src.glob[m_pid]._addr) + srcOffset; const char * remoteAddr - = static_cast(dst.glob[dstPid].addr) + dstOffset; + = static_cast(dst.glob[dstPid]._addr) + dstOffset; sge.addr = reinterpret_cast( localAddr ); sge.length = std::min(size, m_maxMsgSize ); - sge.lkey = src.mr->lkey; + sge.lkey = src.mr->lkey; m_sges.push_back( sge ); bool lastMsg = ! m_activePeers.contains( dstPid ); @@ -553,7 +593,7 @@ void IBVerbs :: put( SlotID srcSlot, size_t srcOffset, sr.num_sge = 1; sr.opcode = IBV_WR_RDMA_WRITE; sr.wr.rdma.remote_addr = reinterpret_cast( remoteAddr ); - sr.wr.rdma.rkey = dst.glob[dstPid].rkey; + sr.wr.rdma.rkey = dst.glob[dstPid]._rkey; m_srsHeads[ dstPid ] = m_srs.size(); m_srs.push_back( sr ); @@ -582,9 +622,9 @@ void IBVerbs :: get( int srcPid, SlotID srcSlot, size_t srcOffset, struct ibv_send_wr sr; std::memset(&sr, 0, sizeof(sr)); const char * localAddr - = static_cast(dst.glob[m_pid].addr) + dstOffset; + = static_cast(dst.glob[m_pid]._addr) + dstOffset; const char * remoteAddr - = static_cast(src.glob[srcPid].addr) + srcOffset; + = static_cast(src.glob[srcPid]._addr) + srcOffset; sge.addr = reinterpret_cast( localAddr ); sge.length = std::min(size, m_maxMsgSize ); @@ -602,7 +642,7 @@ void IBVerbs :: get( int srcPid, SlotID srcSlot, size_t srcOffset, sr.num_sge = 1; sr.opcode = IBV_WR_RDMA_READ; sr.wr.rdma.remote_addr = reinterpret_cast( remoteAddr ); - sr.wr.rdma.rkey = src.glob[srcPid].rkey; + sr.wr.rdma.rkey = src.glob[srcPid]._rkey; m_srsHeads[ srcPid ] = m_srs.size(); m_srs.push_back( sr ); diff --git a/src/MPI/ibverbs.hpp b/src/MPI/ibverbs.hpp index a96030a2..b9f7d6aa 100644 --- a/src/MPI/ibverbs.hpp +++ b/src/MPI/ibverbs.hpp @@ -19,6 +19,7 @@ #define LPF_CORE_MPI_IBVERBS_HPP #include +#include #include #if __cplusplus >= 201103L #include @@ -33,6 +34,18 @@ #include "sparseset.hpp" #include "memreg.hpp" +typedef enum Op { + SEND, + RECV, + GET +} Op; + +typedef enum Phase { + INIT, + PRE, + POST +} Phase; + namespace lpf { class Communication; @@ -45,6 +58,23 @@ using std::shared_ptr; using std::tr1::shared_ptr; #endif +class MemoryRegistration { + public: + char * _addr; + size_t _size; + uint32_t _lkey; + uint32_t _rkey; + int _pid; + MemoryRegistration(char * addr, size_t size, uint32_t lkey, uint32_t rkey, int pid) : _addr(addr), + _size(size), _lkey(lkey), _rkey(rkey), _pid(pid) + { } + MemoryRegistration() : _addr(nullptr), _size(0), _lkey(0), _rkey(0), _pid(-1) {} + size_t serialize(char ** buf); + static MemoryRegistration * deserialize(char * buf); + +}; + + class _LPFLIB_LOCAL IBVerbs { public: @@ -66,39 +96,68 @@ class _LPFLIB_LOCAL IBVerbs return m_maxMsgSize; } + void blockingCompareAndSwap(SlotID srSlot, size_t srcOffset, int dstPid, SlotID dstSlot, size_t dstOffset, size_t size, uint64_t compare_add, uint64_t swap); + void put( SlotID srcSlot, size_t srcOffset, int dstPid, SlotID dstSlot, size_t dstOffset, size_t size ); void get( int srcPid, SlotID srcSlot, size_t srcOffset, SlotID dstSlot, size_t dstOffset, size_t size ); + void flushSent(); + + void flushReceived(); + + void doRemoteProgress(); + + void countingSyncPerSlot(SlotID tag, size_t sent, size_t recvd); + /** + * @syncPerSlot only guarantees that all already scheduled sends (via put), + * or receives (via get) associated with a slot are completed. It does + * not guarantee that not scheduled operations will be scheduled (e.g. + * no guarantee that a remote process will wait til data is put into its + * memory, as it does schedule the operation (one-sided). + */ + void syncPerSlot(SlotID slot); // Do the communication and synchronize // 'Reconnect' must be a globally replicated value void sync( bool reconnect); -private: + void get_rcvd_msg_count(size_t * rcvd_msgs); + void get_sent_msg_count(size_t * sent_msgs); + void get_rcvd_msg_count_per_slot(size_t * rcvd_msgs, SlotID slot); + void get_sent_msg_count_per_slot(size_t * sent_msgs, SlotID slot); + +protected: IBVerbs & operator=(const IBVerbs & ); // assignment prohibited IBVerbs( const IBVerbs & ); // copying prohibited void stageQPs(size_t maxMsgs ); void reconnectQPs(); + void tryLock(SlotID id, int dstPid); + void tryUnlock(SlotID id, int dstPid); - - struct MemoryRegistration { - void * addr; - size_t size; - uint32_t lkey; - uint32_t rkey; - }; + std::vector wait_completion(int& error); + void doProgress(); + void tryIncrement(Op op, Phase phase, SlotID slot); struct MemorySlot { shared_ptr< struct ibv_mr > mr; // verbs structure std::vector< MemoryRegistration > glob; // array for global registrations }; + + Communication & m_comm; int m_pid; // local process ID int m_nprocs; // number of processes + std::atomic_size_t m_numMsgs; + std::atomic_size_t m_recvTotalInitMsgCount; + std::atomic_size_t m_sentMsgs; + std::atomic_size_t m_recvdMsgs; + std::vector m_recvInitMsgCount; + std::vector m_getInitMsgCount; + std::vector m_sendInitMsgCount; std::string m_devName; // IB device name int m_ibPort; // local IB port to work with @@ -108,18 +167,23 @@ class _LPFLIB_LOCAL IBVerbs struct ibv_device_attr m_deviceAttr; size_t m_maxRegSize; size_t m_maxMsgSize; + size_t m_cqSize; size_t m_minNrMsgs; size_t m_maxSrs; // maximum number of sends requests per QP shared_ptr< struct ibv_context > m_device; // device handle shared_ptr< struct ibv_pd > m_pd; // protection domain shared_ptr< struct ibv_cq > m_cq; // complation queue + shared_ptr< struct ibv_cq > m_cqLocal; // completion queue + shared_ptr< struct ibv_cq > m_cqRemote; // completion queue + shared_ptr< struct ibv_srq > m_srq; // shared receive queue // Disconnected queue pairs - std::vector< shared_ptr< struct ibv_qp > > m_stagedQps; + std::vector< shared_ptr > m_stagedQps; // Connected queue pairs - std::vector< shared_ptr< struct ibv_qp > > m_connectedQps; + std::vector< shared_ptr > m_connectedQps; + std::vector< struct ibv_send_wr > m_srs; // array of send requests @@ -136,8 +200,13 @@ class _LPFLIB_LOCAL IBVerbs shared_ptr< struct ibv_mr > m_dummyMemReg; // registration of dummy buffer std::vector< char > m_dummyBuffer; // dummy receive buffer - - Communication & m_comm; + // + std::vector rcvdMsgCount; + std::vector sentMsgCount; + std::vector getMsgCount; + std::vector slotActive; + size_t m_postCount; + size_t m_recvCount; }; diff --git a/src/MPI/ibverbsZero.cpp b/src/MPI/ibverbsZero.cpp new file mode 100644 index 00000000..7cec923a --- /dev/null +++ b/src/MPI/ibverbsZero.cpp @@ -0,0 +1,1060 @@ + +/* + * Copyright 2021 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ibverbs.hpp" +#include "log.hpp" +#include "communication.hpp" +#include "config.hpp" + +#include +#include +#include +#include + +#define POLL_BATCH 64 +#define MAX_POLLING 128 +#define ARRAY_SIZE 1000 + + +namespace lpf { namespace mpi { + + +struct IBVerbs::Exception : std::runtime_error { + Exception(const char * what) : std::runtime_error( what ) {} +}; + +namespace { + ibv_mtu getMTU( unsigned size ) { + switch (size) { + case 256: return IBV_MTU_256; + case 512: return IBV_MTU_512; + case 1024: return IBV_MTU_1024; + case 2048: return IBV_MTU_2048; + case 4096: return IBV_MTU_4096; + default: throw IBVerbs::Exception("Illegal MTU size"); + } + return IBV_MTU_4096; + } +} + + +IBVerbs :: IBVerbs( Communication & comm ) + : m_comm( comm ) + , m_pid( comm.pid() ) + , m_nprocs( comm.nprocs() ) + , m_numMsgs(0) + , m_recvTotalInitMsgCount(0) + , m_sentMsgs(0) + , m_recvdMsgs(0) + , m_devName() + , m_ibPort( Config::instance().getIBPort() ) + , m_gidIdx( Config::instance().getIBGidIndex() ) + , m_mtu( getMTU( Config::instance().getIBMTU() )) + , m_maxRegSize(0) + , m_maxMsgSize(0) + , m_cqSize(1) + , m_minNrMsgs(0) + , m_maxSrs(0) + , m_device() + , m_pd() + , m_cqLocal() + , m_cqRemote() + , m_stagedQps( m_nprocs ) + , m_connectedQps( m_nprocs ) + , m_srs() + , m_srsHeads( m_nprocs, 0u ) + , m_nMsgsPerPeer( m_nprocs, 0u ) + , m_activePeers(0, m_nprocs) + , m_peerList() + , m_sges() + , m_memreg() + , m_dummyMemReg() + , m_dummyBuffer() + , m_postCount(0) + , m_recvCount(0) +{ + + // arrays instead of hashmap for counters + m_recvInitMsgCount.resize(ARRAY_SIZE, 0); + m_getInitMsgCount.resize(ARRAY_SIZE, 0); + m_sendInitMsgCount.resize(ARRAY_SIZE, 0); + rcvdMsgCount.resize(ARRAY_SIZE, 0); + getMsgCount.resize(ARRAY_SIZE, 0); + sentMsgCount.resize(ARRAY_SIZE, 0); + slotActive.resize(ARRAY_SIZE, 0); + + + m_peerList.reserve( m_nprocs ); + + int numDevices = -1; + struct ibv_device * * const try_get_device_list = ibv_get_device_list( &numDevices ); + + if (!try_get_device_list) { + LOG(1, "Cannot get list of Infiniband devices" ); + throw Exception( "failed to get IB devices list"); + } + + shared_ptr< struct ibv_device * > devList( + try_get_device_list, + ibv_free_device_list ); + + LOG(3, "Retrieved Infiniband device list, which has " << numDevices + << " devices" ); + + if (numDevices < 1) { + LOG(1, "There are " << numDevices << " Infiniband devices" + " available, which is not enough" ); + throw Exception( "No Infiniband devices available" ); + } + + + std::string wantDevName = Config::instance().getIBDeviceName(); + LOG( 3, "Searching for device '"<< wantDevName << "'" ); + struct ibv_device * dev = NULL; + for (int i = 0; i < numDevices; i ++) + { + std::string name = ibv_get_device_name( (&*devList)[i]); + LOG(3, "Device " << i << " has name '" << name << "'" ); + if ( wantDevName.empty() || name == wantDevName ) { + LOG(3, "Found device '" << name << "'" ); + m_devName = name; + dev = (&*devList)[i]; + break; + } + } + + if (dev == NULL) { + LOG(1, "Could not find device '" << wantDevName << "'" ); + throw Exception("Infiniband device not found"); + } + + struct ibv_context * const ibv_context_new_p = ibv_open_device(dev); + if( ibv_context_new_p == NULL ) + m_device.reset(); + else + m_device.reset( ibv_context_new_p, ibv_close_device ); + if (!m_device) { + LOG(1, "Failed to open Infiniband device '" << m_devName << "'"); + throw Exception("Cannot open IB device"); + } + LOG(3, "Opened Infiniband device '" << m_devName << "'" ); + + devList.reset(); + LOG(3, "Closed Infiniband device list" ); + + std::memset(&m_deviceAttr, 0, sizeof(m_deviceAttr)); + if (ibv_query_device( m_device.get(), &m_deviceAttr )) + throw Exception("Cannot query device"); + + LOG(3, "Queried IB device capabilities" ); + + m_maxRegSize = m_deviceAttr.max_mr_size; + LOG(3, "Maximum size for memory registration = " << m_maxRegSize ); + + // maximum number of work requests per Queue Pair + m_maxSrs = std::min( m_deviceAttr.max_qp_wr, // maximum work requests per QP + m_deviceAttr.max_cqe ); // maximum entries per CQ + LOG(3, "Maximum number of send requests is the minimum of " + << m_deviceAttr.max_qp_wr << " (the maximum of work requests per QP)" + << " and " << m_deviceAttr.max_cqe << " (the maximum of completion " + << " queue entries per QP), nameley " << m_maxSrs ); + + if ( m_deviceAttr.max_cqe < m_nprocs ) + throw Exception("Completion queue has insufficient completion queue capabilities"); + + struct ibv_port_attr port_attr; std::memset( &port_attr, 0, sizeof(port_attr)); + if (ibv_query_port( m_device.get(), m_ibPort, & port_attr )) + throw Exception("Cannot query IB port"); + + LOG(3, "Queried IB port " << m_ibPort << " capabilities" ); + + // store Maximum message size + m_maxMsgSize = port_attr.max_msg_sz; + LOG(3, "Maximum IB message size is " << m_maxMsgSize ); + + size_t sysRam = Config::instance().getLocalRamSize(); + m_minNrMsgs = sysRam / m_maxMsgSize; + LOG(3, "Minimum number of messages to allocate = " + "total system RAM / maximum message size = " + << sysRam << " / " << m_maxMsgSize << " = " << m_minNrMsgs ); + + // store LID + m_lid = port_attr.lid; + LOG(3, "LID is " << m_lid ); + + struct ibv_pd * const pd_new_p = ibv_alloc_pd( m_device.get() ); + if( pd_new_p == NULL ) + m_pd.reset(); + else + m_pd.reset( pd_new_p, ibv_dealloc_pd ); + if (!m_pd) { + LOG(1, "Could not allocate protection domain "); + throw Exception("Could not allocate protection domain"); + } + LOG(3, "Opened protection domain"); + + m_cqLocal.reset(ibv_create_cq( m_device.get(), 1, NULL, NULL, 0 ), ibv_destroy_cq); + m_cqRemote.reset(ibv_create_cq( m_device.get(), m_nprocs, NULL, NULL, 0 ), ibv_destroy_cq); + /** + * New notification functionality for HiCR + */ + struct ibv_srq_init_attr srq_init_attr; + srq_init_attr.srq_context = NULL; + srq_init_attr.attr.max_wr = m_deviceAttr.max_srq_wr; + srq_init_attr.attr.max_sge = m_deviceAttr.max_srq_sge; + srq_init_attr.attr.srq_limit = 0; + m_srq.reset(ibv_create_srq(m_pd.get(), &srq_init_attr ), + ibv_destroy_srq); + + + m_cqLocal.reset(ibv_create_cq( m_device.get(), m_cqSize, NULL, NULL, 0), ibv_destroy_cq); + if (!m_cqLocal) { + LOG(1, "Could not allocate completion queue with '" + << m_nprocs << " entries" ); + throw Exception("Could not allocate completion queue"); + } + m_cqRemote.reset(ibv_create_cq( m_device.get(), m_cqSize * m_nprocs, NULL, NULL, 0), ibv_destroy_cq); + if (!m_cqLocal) { + LOG(1, "Could not allocate completion queue with '" + << m_nprocs << " entries" ); + throw Exception("Could not allocate completion queue"); + } + + LOG(3, "Allocated completion queue with " << m_nprocs << " entries."); + + // allocate dummy buffer + m_dummyBuffer.resize( 8 ); + struct ibv_mr * const ibv_reg_mr_new_p = ibv_reg_mr( + m_pd.get(), m_dummyBuffer.data(), m_dummyBuffer.size(), + IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE + ); + if( ibv_reg_mr_new_p == NULL ) + m_dummyMemReg.reset(); + else + m_dummyMemReg.reset( ibv_reg_mr_new_p, ibv_dereg_mr ); + if (!m_dummyMemReg) { + LOG(1, "Could not register memory region"); + throw Exception("Could not register memory region"); + } + + LOG(3, "Queue pairs have been successfully initialized"); + +} + +IBVerbs :: ~IBVerbs() +{ } + + +inline void IBVerbs :: tryIncrement(Op op, Phase phase, SlotID slot) { + + switch (phase) { + case Phase::INIT: + rcvdMsgCount[slot] = 0; + getMsgCount[slot] = 0; + m_recvInitMsgCount[slot] = 0; + m_getInitMsgCount[slot] = 0; + sentMsgCount[slot] = 0; + m_sendInitMsgCount[slot] = 0; + slotActive[slot] = true; + break; + case Phase::PRE: + if (op == Op::SEND) { + m_numMsgs++; + //m_sendTotalInitMsgCount++; + m_sendInitMsgCount[slot]++; + } + if (op == Op::RECV) { + m_recvTotalInitMsgCount++; + m_recvInitMsgCount[slot]++; + } + if (op == Op::GET) { + m_recvTotalInitMsgCount++; + m_getInitMsgCount[slot]++; + } + break; + case Phase::POST: + if (op == Op::RECV) { + m_recvdMsgs ++; + rcvdMsgCount[slot]++; + } + if (op == Op::GET) { + m_recvdMsgs++; + getMsgCount[slot]++; + } + if (op == Op::SEND) { + m_sentMsgs++; + sentMsgCount[slot]++; + } + break; + } +} + +void IBVerbs :: stageQPs( size_t maxMsgs ) +{ + // create the queue pairs + for ( size_t i = 0; i < static_cast(m_nprocs); ++i) { + struct ibv_qp_init_attr attr; + std::memset(&attr, 0, sizeof(attr)); + + attr.qp_type = IBV_QPT_RC; // we want reliable connection + attr.sq_sig_all = 0; // only wait for selected messages + attr.send_cq = m_cqLocal.get(); + attr.recv_cq = m_cqRemote.get(); + attr.srq = m_srq.get(); + attr.cap.max_send_wr = std::min(maxMsgs + m_minNrMsgs,m_maxSrs/4); + attr.cap.max_recv_wr = std::min(maxMsgs + m_minNrMsgs,m_maxSrs/4); + attr.cap.max_send_sge = 1; + attr.cap.max_recv_sge = 1; + + struct ibv_qp * const ibv_new_qp_p = ibv_create_qp( m_pd.get(), &attr ); + ASSERT(m_stagedQps.size() > i); + if( ibv_new_qp_p == NULL ) { + m_stagedQps[i].reset(); + } else { + m_stagedQps[i].reset( ibv_new_qp_p, ibv_destroy_qp ); + } + if (!m_stagedQps[i]) { + LOG( 1, "Could not create Infiniband Queue pair number " << i ); + throw std::bad_alloc(); + } + + LOG(3, "Created new Queue pair for " << m_pid << " -> " << i << " with qp_num = " << ibv_new_qp_p->qp_num); + } +} + +void IBVerbs :: doRemoteProgress() { + struct ibv_wc wcs[POLL_BATCH]; + struct ibv_recv_wr wr; + struct ibv_sge sg; + struct ibv_recv_wr *bad_wr; + sg.addr = (uint64_t) NULL; + sg.length = 0; + sg.lkey = 0; + wr.next = NULL; + wr.sg_list = &sg; + wr.num_sge = 0; + wr.wr_id = 66; + int pollResult, totalResults = 0; + do { + pollResult = ibv_poll_cq(m_cqRemote.get(), POLL_BATCH, wcs); + if (pollResult > 0) { + LOG(3, "Process " << m_pid << " signals: I received " << pollResult << " remote messages in doRemoteProgress"); + } + else if (pollResult < 0) + { + LOG( 1, "Failed to poll IB completion queue" ); + throw Exception("Poll CQ failure"); + } + + for(int i = 0; i < pollResult; i++) { + if (wcs[i].status != IBV_WC_SUCCESS) { + LOG( 2, "Got bad completion status from IB message." + " status = 0x" << std::hex << wcs[i].status + << ", vendor syndrome = 0x" << std::hex + << wcs[i].vendor_err ); + } + else { + LOG(3, "Process " << m_pid << " Recv wcs[" << i << "].src_qp = "<< wcs[i].src_qp); + LOG(3, "Process " << m_pid << " Recv wcs[" << i << "].slid = "<< wcs[i].slid); + LOG(3, "Process " << m_pid << " Recv wcs[" << i << "].wr_id = "<< wcs[i].wr_id); + LOG(3, "Process " << m_pid << " Recv wcs[" << i << "].imm_data = "<< wcs[i].imm_data); + + /** + * Here is a trick: + * The sender sends relatively generic LPF memslot ID. + * But for IB Verbs, we need to translate that into + * an IB Verbs slot via @getVerbID -- or there will be + * a mismatch when IB Verbs looks up the slot ID + */ + + // Note: Ignore compare-and-swap atomics! + if (wcs[i].opcode != IBV_WC_COMP_SWAP) { + SlotID slot; + // This receive is from a PUT call + if (wcs[i].opcode == IBV_WC_RECV_RDMA_WITH_IMM) { + slot = wcs[i].imm_data; + tryIncrement(Op::RECV, Phase::POST, slot); + LOG(3, "Rank " << m_pid << " increments received message count to " << rcvdMsgCount[slot] << " for LPF slot " << slot); + } + } + ibv_post_srq_recv(m_srq.get(), &wr, &bad_wr); + } + } + if(pollResult > 0) totalResults += pollResult; + } while (pollResult == POLL_BATCH && totalResults < MAX_POLLING); +} + +void IBVerbs :: reconnectQPs() +{ + ASSERT( m_stagedQps[0] ); + m_comm.barrier(); + + union ibv_gid myGid; + std::vector< uint32_t> localQpNums, remoteQpNums; + std::vector< uint16_t> lids; + std::vector< union ibv_gid > gids; + try { + // Exchange info about the queue pairs + if (m_gidIdx >= 0) { + if (ibv_query_gid(m_device.get(), m_ibPort, m_gidIdx, &myGid)) { + LOG(1, "Could not get GID of Infiniband device port " << m_ibPort); + throw Exception( "Could not get gid for IB port"); + } + LOG(3, "GID of Infiniband device was retrieved" ); + } + else { + std::memset( &myGid, 0, sizeof(myGid) ); + LOG(3, "GID of Infiniband device will not be used" ); + } + + localQpNums.resize(m_nprocs); + remoteQpNums.resize(m_nprocs); + lids.resize(m_nprocs); + gids.resize(m_nprocs); + + for ( int i = 0; i < m_nprocs; ++i) + localQpNums[i] = m_stagedQps[i]->qp_num; + } + catch(...) + { + m_comm.allreduceOr( true ); + throw; + } + if (m_comm.allreduceOr( false) ) + throw Exception("Peer failed to allocate memory or query device while setting-up QP"); + + m_comm.allToAll( localQpNums.data(), remoteQpNums.data() ); + m_comm.allgather( m_lid, lids.data() ); + m_comm.allgather( myGid, gids.data() ); + + LOG(3, "Connection initialisation data has been exchanged"); + + try { + // Bring QPs to INIT + for (int i = 0; i < m_nprocs; ++i ) { + struct ibv_qp_attr attr; + int flags; + + std::memset(&attr, 0, sizeof(attr)); + attr.qp_state = IBV_QPS_INIT; + attr.port_num = m_ibPort; + attr.pkey_index = 0; + attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC; + flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS; + if ( ibv_modify_qp(m_stagedQps[i].get(), &attr, flags) ) { + LOG(1, "Cannot bring state of QP " << i << " to INIT"); + throw Exception("Failed to bring QP's state to Init" ); + } + + // post a dummy receive + + struct ibv_recv_wr rr; std::memset(&rr, 0, sizeof(rr)); + struct ibv_sge sge; std::memset(&sge, 0, sizeof(sge)); + sge.addr = reinterpret_cast(m_dummyBuffer.data()); + sge.length = m_dummyBuffer.size(); + sge.lkey = m_dummyMemReg->lkey; + rr.next = NULL; + rr.wr_id = 46; + rr.sg_list = &sge; + rr.num_sge = 1; + + // Bring QP to RTR + std::memset(&attr, 0, sizeof(attr)); + attr.qp_state = IBV_QPS_RTR; + attr.path_mtu = m_mtu; + attr.dest_qp_num = remoteQpNums[i]; + attr.rq_psn = 0; + attr.max_dest_rd_atomic = 1; + attr.min_rnr_timer = 0x12; + attr.ah_attr.is_global = 0; + attr.ah_attr.dlid = lids[i]; + attr.ah_attr.sl = 0; + attr.ah_attr.src_path_bits = 0; + attr.ah_attr.port_num = m_ibPort; + if (m_gidIdx >= 0) + { + attr.ah_attr.is_global = 1; + attr.ah_attr.port_num = 1; + memcpy(&attr.ah_attr.grh.dgid, &gids[i], 16); + attr.ah_attr.grh.flow_label = 0; + attr.ah_attr.grh.hop_limit = 1; + attr.ah_attr.grh.sgid_index = m_gidIdx; + attr.ah_attr.grh.traffic_class = 0; + } + flags = IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER; + + if (ibv_modify_qp(m_stagedQps[i].get(), &attr, flags)) { + LOG(1, "Cannot bring state of QP " << i << " to RTR" ); + throw Exception("Failed to bring QP's state to RTR" ); + } + + // Bring QP to RTS + std::memset(&attr, 0, sizeof(attr)); + attr.qp_state = IBV_QPS_RTS; + attr.timeout = 0x12; + attr.retry_cnt = 0;//7; + attr.rnr_retry = 0;//7; + attr.sq_psn = 0; + attr.max_rd_atomic = 1; + flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | + IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC; + if( ibv_modify_qp(m_stagedQps[i].get(), &attr, flags)) { + LOG(1, "Cannot bring state of QP " << i << " to RTS" ); + throw Exception("Failed to bring QP's state to RTS" ); + } + + LOG(3, "Connected Queue pair for " << m_pid << " -> " << i ); + + } // for each peer + } + catch(...) { + m_comm.allreduceOr( true ); + throw; + } + + if (m_comm.allreduceOr( false )) + throw Exception("Another peer failed to set-up Infiniband queue pairs"); + + LOG(3, "All staged queue pairs have been connected" ); + + m_connectedQps.swap( m_stagedQps ); + + LOG(3, "All old queue pairs have been removed"); + + m_comm.barrier(); + } + + +void IBVerbs :: resizeMemreg( size_t size ) +{ + if ( size > size_t(std::numeric_limits::max()) ) + { + LOG(2, "Could not expand memory register, because integer will overflow"); + throw Exception("Could not increase memory register"); + } + if ( int(size) > m_deviceAttr.max_mr ) { + LOG(2, "IB device only supports " << m_deviceAttr.max_mr + << " memory registrations, while " << size + << " are being requested" ); + throw std::bad_alloc() ; + } + + MemoryRegistration newMR = { nullptr, 0, 0, 0, m_pid}; + MemorySlot dflt; dflt.glob.resize( m_nprocs, newMR); + + m_memreg.reserve( size, dflt ); +} + +void IBVerbs :: resizeMesgq( size_t size ) +{ + + m_cqSize = std::min(size,m_maxSrs/4); + size_t remote_size = std::min(m_cqSize*m_nprocs,m_maxSrs/4); + if (m_cqLocal) { + ibv_resize_cq(m_cqLocal.get(), m_cqSize); + } + if(remote_size >= m_postCount){ + if (m_cqRemote) { + ibv_resize_cq(m_cqRemote.get(), remote_size); + } + } + stageQPs(m_cqSize); + reconnectQPs(); + if(remote_size >= m_postCount){ + if (m_srq) { + struct ibv_recv_wr wr; + struct ibv_sge sg; + struct ibv_recv_wr *bad_wr; + sg.addr = (uint64_t) NULL; + sg.length = 0; + sg.lkey = 0; + wr.next = NULL; + wr.sg_list = &sg; + wr.num_sge = 0; + wr.wr_id = m_pid; + for(int i = m_postCount; i < (int)remote_size; ++i){ + ibv_post_srq_recv(m_srq.get(), &wr, &bad_wr); + m_postCount++; + } + } + } + LOG(4, "Message queue has been reallocated to size " << size ); +} + +IBVerbs :: SlotID IBVerbs :: regLocal( void * addr, size_t size ) +{ + ASSERT( size <= m_maxRegSize ); + + MemorySlot slot; + if ( size > 0) { + LOG(4, "Registering locally memory area at " << addr << " of size " << size ); + struct ibv_mr * const ibv_mr_new_p = ibv_reg_mr( + m_pd.get(), addr, size, + IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC + ); + if( ibv_mr_new_p == NULL ) + slot.mr.reset(); + else + slot.mr.reset( ibv_mr_new_p, ibv_dereg_mr ); + if (!slot.mr) { + LOG(1, "Could not register memory area at " + << addr << " of size " << size << " with IB device"); + throw Exception("Could not register memory area"); + } + } + MemoryRegistration local((char *) addr, size, size?slot.mr->lkey:0, size?slot.mr->rkey:0, m_pid); + + SlotID id = m_memreg.addLocalReg( slot ); + tryIncrement(Op::SEND, Phase::INIT, id); + + m_memreg.update( id ).glob.resize( m_nprocs ); + m_memreg.update( id ).glob[m_pid] = local; + LOG(4, "Memory area " << addr << " of size " << size << " has been locally registered. Slot = " << id ); + return id; +} + +IBVerbs :: SlotID IBVerbs :: regGlobal( void * addr, size_t size ) +{ + ASSERT( size <= m_maxRegSize ); + + MemorySlot slot; + if ( size > 0 ) { + LOG(4, "Registering globally memory area at " << addr << " of size " << size ); + struct ibv_mr * const ibv_mr_new_p = ibv_reg_mr( + m_pd.get(), addr, size, + IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC + ); + if( ibv_mr_new_p == NULL ) + slot.mr.reset(); + else + slot.mr.reset( ibv_mr_new_p, ibv_dereg_mr ); + if (!slot.mr) { + LOG(1, "Could not register memory area at " + << addr << " of size " << size << " with IB device"); + m_comm.allreduceAnd(true); + throw Exception("Could not register memory area"); + } + } + if (m_comm.allreduceOr(false)) + throw Exception("Another process could not register memory area"); + + SlotID id = m_memreg.addGlobalReg( slot ); + tryIncrement(Op::SEND/* <- dummy for init */, Phase::INIT, id); + MemorySlot & ref = m_memreg.update(id); + // exchange memory registration info globally + ref.glob.resize(m_nprocs); + + MemoryRegistration local((char *) addr, size, size?slot.mr->lkey:0, size?slot.mr->rkey:0, m_pid); + LOG(4, "All-gathering memory register data" ); + + m_comm.allgather( local, ref.glob.data() ); + LOG(4, "Memory area " << addr << " of size " << size << " has been globally registered. Slot = " << id ); + return id; +} + +void IBVerbs :: dereg( SlotID id ) +{ + slotActive[id] = false; + m_recvInitMsgCount[id] = 0; + m_getInitMsgCount[id] = 0; + m_sendInitMsgCount[id] = 0; + rcvdMsgCount[id] = 0; + sentMsgCount[id] = 0; + m_memreg.removeReg( id ); + LOG(4, "Memory area of slot " << id << " has been deregistered"); +} + + +void IBVerbs :: blockingCompareAndSwap(SlotID srcSlot, size_t srcOffset, int dstPid, SlotID dstSlot, size_t dstOffset, size_t size, uint64_t compare_add, uint64_t swap) +{ + const MemorySlot & src = m_memreg.lookup( srcSlot ); + const MemorySlot & dst = m_memreg.lookup( dstSlot); + + char * localAddr + = static_cast(src.glob[m_pid]._addr) + srcOffset; + const char * remoteAddr + = static_cast(dst.glob[dstPid]._addr) + dstOffset; + + struct ibv_sge sge; + memset(&sge, 0, sizeof(sge)); + sge.addr = reinterpret_cast( localAddr ); + sge.length = std::min(size, m_maxMsgSize ); + sge.lkey = src.mr->lkey; + + struct ibv_send_wr wr; + memset(&wr, 0, sizeof(wr)); + wr.wr_id = srcSlot; + wr.sg_list = &sge; + wr.next = NULL; // this needs to be set, otherwise EINVAL return error in ibv_post_send + wr.num_sge = 1; + wr.opcode = IBV_WR_ATOMIC_CMP_AND_SWP; + wr.send_flags = IBV_SEND_SIGNALED; + wr.wr.atomic.remote_addr = reinterpret_cast(remoteAddr); + wr.wr.atomic.compare_add = compare_add; + wr.wr.atomic.swap = swap; + wr.wr.atomic.rkey = dst.glob[dstPid]._rkey; + struct ibv_send_wr *bad_wr; + int error; + std::vector opcodes; + +blockingCompareAndSwap: + if (int err = ibv_post_send(m_connectedQps[dstPid].get(), &wr, &bad_wr )) + { + LOG(1, "Error while posting RDMA requests: " << std::strerror(err) ); + throw Exception("Error while posting RDMA requests"); + } + + /** + * Keep waiting on a completion of events until you + * register a completed atomic compare-and-swap + */ + do { + opcodes = wait_completion(error); + if (error) { + LOG(1, "Error in wait_completion"); + std::abort(); + } + } while (std::find(opcodes.begin(), opcodes.end(), IBV_WC_COMP_SWAP) == opcodes.end()); + + uint64_t * remoteValueFound = reinterpret_cast(localAddr); + /* + * if we fetched the value we expected, then + * we are holding the lock now (that is, we swapped successfully!) + * else, re-post your request for the lock + */ + if (remoteValueFound[0] != compare_add) { + LOG(4, "Process " << m_pid << " couldn't get the lock. remoteValue = " << remoteValueFound[0] << " compare_add = " << compare_add << " go on, iterate\n"); + goto blockingCompareAndSwap; + } + else { + LOG(4, "Process " << m_pid << " reads value " << remoteValueFound[0] << " and expected = " << compare_add <<" gets the lock, done\n"); + } + // else we hold the lock and swap value into the remote slot ... +} + +void IBVerbs :: put( SlotID srcSlot, size_t srcOffset, + int dstPid, SlotID dstSlot, size_t dstOffset, size_t size) +{ + const MemorySlot & src = m_memreg.lookup( srcSlot ); + const MemorySlot & dst = m_memreg.lookup( dstSlot ); + + ASSERT( src.mr ); + + int numMsgs = size/m_maxMsgSize + (size % m_maxMsgSize > 0); //+1 if last msg size < m_maxMsgSize + if (size == 0) numMsgs = 1; + + struct ibv_sge sges[numMsgs]; + struct ibv_send_wr srs[numMsgs]; + struct ibv_sge *sge; + struct ibv_send_wr *sr; + for (int i=0; i < numMsgs; i++) { + sge = &sges[i]; std::memset(sge, 0, sizeof(ibv_sge)); + sr = &srs[i]; std::memset(sr, 0, sizeof(ibv_send_wr)); + const char * localAddr + = static_cast(src.glob[m_pid]._addr) + srcOffset; + const char * remoteAddr + = static_cast(dst.glob[dstPid]._addr) + dstOffset; + + sge->addr = reinterpret_cast( localAddr ); + sge->length = std::min(size, m_maxMsgSize ); + sge->lkey = src.mr->lkey; + sges[i] = *sge; + + bool lastMsg = (i == numMsgs-1); + sr->next = lastMsg ? NULL : &srs[ i+1]; + // since reliable connection guarantees keeps packets in order, + // we only need a signal from the last message in the queue + sr->send_flags = lastMsg ? IBV_SEND_SIGNALED : 0; + sr->opcode = lastMsg? IBV_WR_RDMA_WRITE_WITH_IMM : IBV_WR_RDMA_WRITE; + /* use wr_id to later demultiplex srcSlot */ + sr->wr_id = srcSlot; + /* + * In HiCR, we need to know at receiver end which slot + * has received the message. But here is a trick: + */ + sr->imm_data = dstSlot; + + sr->sg_list = &sges[i]; + sr->num_sge = 1; + sr->wr.rdma.remote_addr = reinterpret_cast( remoteAddr ); + sr->wr.rdma.rkey = dst.glob[dstPid]._rkey; + + srs[i] = *sr; + size -= sge->length; + srcOffset += sge->length; + dstOffset += sge->length; + + LOG(4, "PID " << m_pid << ": Enqueued put message of " << sge->length << " bytes to " << dstPid << " on slot" << dstSlot ); + + } + struct ibv_send_wr *bad_wr = NULL; + // srs[0] should be sufficient because the rest of srs are on a chain + if (int err = ibv_post_send(m_connectedQps[dstPid].get(), &srs[0], &bad_wr )) + { + LOG(1, "Error while posting RDMA requests: " << std::strerror(err) ); + throw Exception("Error while posting RDMA requests"); + } + + tryIncrement(Op::SEND, Phase::PRE, srcSlot); +} + +void IBVerbs :: get( int srcPid, SlotID srcSlot, size_t srcOffset, + SlotID dstSlot, size_t dstOffset, size_t size ) +{ + const MemorySlot & src = m_memreg.lookup( srcSlot ); + const MemorySlot & dst = m_memreg.lookup( dstSlot ); + + ASSERT( dst.mr ); + + int numMsgs = size/m_maxMsgSize + (size % m_maxMsgSize > 0); //+1 if last msg size < m_maxMsgSize + + struct ibv_sge sges[numMsgs+1]; + struct ibv_send_wr srs[numMsgs+1]; + struct ibv_sge *sge; + struct ibv_send_wr *sr; + + + for(int i = 0; i< numMsgs; i++){ + sge = &sges[i]; std::memset(sge, 0, sizeof(ibv_sge)); + sr = &srs[i]; std::memset(sr, 0, sizeof(ibv_send_wr)); + + const char * localAddr + = static_cast(dst.glob[m_pid]._addr) + dstOffset; + const char * remoteAddr + = static_cast(src.glob[srcPid]._addr) + srcOffset; + + sge->addr = reinterpret_cast( localAddr ); + sge->length = std::min(size, m_maxMsgSize ); + sge->lkey = dst.mr->lkey; + sges[i] = *sge; + LOG(4, "PID " << m_pid << ": Enqueued get message of " << sge->length << " bytes from " << srcPid << " on slot" << srcSlot ); + + bool lastMsg = (i == numMsgs-1); + sr->next = lastMsg ? NULL : &srs[ i+1]; + sr->send_flags = lastMsg ? IBV_SEND_SIGNALED : 0; + + sr->sg_list = &sges[i]; + sr->num_sge = 1; + sr->opcode = IBV_WR_RDMA_READ; + sr->wr.rdma.remote_addr = reinterpret_cast( remoteAddr ); + sr->wr.rdma.rkey = src.glob[srcPid]._rkey; + // This logic is reversed compared to ::put + // (not srcSlot, as this slot is remote) + sr->wr_id = dstSlot; // <= DO NOT CHANGE THIS !!! + sr->imm_data = srcSlot; // This is irrelevant as we don't send _WITH_IMM + srs[i] = *sr; + size -= sge->length; + srcOffset += sge->length; + dstOffset += sge->length; + } + + struct ibv_send_wr *bad_wr = NULL; + if (int err = ibv_post_send(m_connectedQps[srcPid].get(), &srs[0], &bad_wr )) + { + + LOG(1, "Error while posting RDMA requests: " << std::strerror(err) ); + if (err == ENOMEM) { + LOG(1, "Specific error code: ENOMEM (send queue is full or no resources)"); + } + throw Exception("Error while posting RDMA requests"); + } + tryIncrement(Op::GET, Phase::PRE, dstSlot); + +} + +void IBVerbs :: get_rcvd_msg_count(size_t * rcvd_msgs) { + *rcvd_msgs = m_recvdMsgs; +} + +void IBVerbs :: get_sent_msg_count(size_t * sent_msgs) { + *sent_msgs = m_sentMsgs; +} + +void IBVerbs :: get_rcvd_msg_count_per_slot(size_t * rcvd_msgs, SlotID slot) +{ + *rcvd_msgs = rcvdMsgCount[slot] + getMsgCount[slot]; +} + +void IBVerbs :: get_sent_msg_count_per_slot(size_t * sent_msgs, SlotID slot) +{ + *sent_msgs = sentMsgCount[slot]; +} + +std::vector IBVerbs :: wait_completion(int& error) { + + error = 0; + LOG(1, "Polling for messages" ); + struct ibv_wc wcs[POLL_BATCH]; + int pollResult = ibv_poll_cq(m_cqLocal.get(), POLL_BATCH, wcs); + std::vector opcodes; + if ( pollResult > 0) { + LOG(4, "Process " << m_pid << ": Received " << pollResult << " acknowledgements"); + + for (int i = 0; i < pollResult ; ++i) { + if (wcs[i].status != IBV_WC_SUCCESS) + { + LOG( 2, "Got bad completion status from IB message." + " status = 0x" << std::hex << wcs[i].status + << ", vendor syndrome = 0x" << std::hex + << wcs[i].vendor_err ); + const char * status_descr; + status_descr = ibv_wc_status_str(wcs[i].status); + LOG( 2, "The work completion status string: " << status_descr); + error = 1; + } + else { + LOG(4, "Process " << m_pid << " Send wcs[" << i << "].src_qp = "<< wcs[i].src_qp); + LOG(4, "Process " << m_pid << " Send wcs[" << i << "].slid = "<< wcs[i].slid); + LOG(4, "Process " << m_pid << " Send wcs[" << i << "].wr_id = "<< wcs[i].wr_id); + LOG(4, "Process " << m_pid << " Send wcs[" << i << "].imm_data = "<< wcs[i].imm_data); + } + + SlotID slot = wcs[i].wr_id; + opcodes.push_back(wcs[i].opcode); + // Ignore compare-and-swap atomics! + if (wcs[i].opcode != IBV_WC_COMP_SWAP) { + // This is a get call completing + if (wcs[i].opcode == IBV_WC_RDMA_READ) { + tryIncrement(Op::GET, Phase::POST, slot); + LOG(4, "Rank " << m_pid << " with GET, increments getMsgCount to " << getMsgCount[slot] << " for LPF slot " << slot); + } + // This is a put call completing + if (wcs[i].opcode == IBV_WC_RDMA_WRITE) { + tryIncrement(Op::SEND, Phase::POST, slot); + LOG(4, "Rank " << m_pid << " with SEND, increments getMsgCount to " << sentMsgCount[slot] << " for LPF slot " << slot); + } + + } + } + } + else if (pollResult < 0) + { + LOG( 1, "Failed to poll IB completion queue" ); + throw Exception("Poll CQ failure"); + } + return opcodes; +} + +void IBVerbs :: flushReceived() { + doRemoteProgress(); +} + +void IBVerbs :: flushSent() +{ + int isError = 0; + + bool sendsComplete; + do { + sendsComplete = true; + for (size_t i = 0; i sentMsgCount[i] || m_getInitMsgCount[i] > getMsgCount[i]) { + sendsComplete = false; + wait_completion(isError); + if (isError) { + LOG(1, "Error in wait_completion. Most likely issue is that receiver is not calling ibv_post_srq!\n"); + std::abort(); + } + } + } + } + } while (!sendsComplete); + +} + +void IBVerbs :: countingSyncPerSlot(SlotID slot, size_t expectedSent, size_t expectedRecvd) { + + bool sentOK = false; + bool recvdOK = false; + if (expectedSent == 0) sentOK = true; + if (expectedRecvd == 0) recvdOK = true; + int error; + if (slotActive[slot]) { + do { + wait_completion(error); + if (error) { + LOG(1, "Error in wait_completion"); + std::abort(); + } + // this call triggers doRemoteProgress + doRemoteProgress(); + + /* + * 1) Are we expecting nothing here (sentOK/recvdOK = true) + * 2) do the sent and received messages match our expectations? + */ + sentOK = (sentOK || sentMsgCount[slot] >= expectedSent); + // We can receive messages passively (from remote puts) and actively (from our gets) + recvdOK = (recvdOK || (rcvdMsgCount[slot] + getMsgCount[slot]) >= expectedRecvd); + LOG(4, "PID: " << m_pid << " rcvdMsgCount[" << slot << "] = " << rcvdMsgCount[slot] + << " expectedRecvd = " << expectedRecvd + << " sentMsgCount[" << slot << "] = " << sentMsgCount[slot] + << " expectedSent = " << expectedSent + << " m_recvInitMsgCount[" << slot << "] = " << m_recvInitMsgCount[slot] + << " m_sendInitMsgCount[" << slot << "] = " << m_sendInitMsgCount[slot]); + + } while (!(sentOK && recvdOK)); + } +} + +void IBVerbs :: syncPerSlot(SlotID slot) { + int error; + + do { + wait_completion(error); + if (error) { + LOG(1, "Error in wait_completion"); + std::abort(); + } + doRemoteProgress(); + } + while ((rcvdMsgCount.at(slot) < m_recvInitMsgCount.at(slot)) || (sentMsgCount.at(slot) < m_sendInitMsgCount.at(slot))); + + /** + * A subsequent barrier is a controversial decision: + * - if we use it, the sync guarantees that + * receiver has received all that it is supposed to + * receive. However, it loses all performance advantages + * of waiting "only on certain tags" + * - if we do not barrier, we only make sure the slot + * completes all sends and receives that HAVE ALREADY + * BEEN ISSUED. However, a receiver of an RMA put + * cannot know if it is supposed to receive more messages. + * It can only know if it is receiving via an RMA get. + * Therefore, now this operation is commented + */ + //m_comm.barrier(); + +} + +void IBVerbs :: sync(bool resized) +{ + (void) resized; + + // flush send queues + flushSent(); + // flush receive queues + flushReceived(); + + LOG(4, "Process " << m_pid << " will call barrier at end of sync\n"); + m_comm.barrier(); + + +} + + +} } diff --git a/src/MPI/init.cpp b/src/MPI/init.cpp index 68d16866..97768de1 100644 --- a/src/MPI/init.cpp +++ b/src/MPI/init.cpp @@ -54,9 +54,10 @@ namespace lpf { (engine.compare( "mpirma" ) == 0) || (engine.compare( "mpimsg" ) == 0) || (engine.compare( "ibverbs" ) == 0) || + (engine.compare( "zero" ) == 0) || (engine.compare( "hybrid" ) == 0); if( !engine_is_MPI ) { - (void) std::fprintf( stderr, "Warning: program was compiled for the mpirma, mpimsg, ibverbs, or hybrid engine but run-time requests the %s engine instead. For stable results please compile the program into a universal LPF program (by omitting the -engine flag to the lpfcc/lpfcxx utilities).\n", engine.c_str() ); + (void) std::fprintf( stderr, "Warning: program was compiled for the mpirma, mpimsg, ibverbs, zero, or hybrid engine but run-time requests the %s engine instead. For stable results please compile the program into a universal LPF program (by omitting the -engine flag to the lpfcc/lpfcxx utilities).\n", engine.c_str() ); } if( mpi_initializer_ran || !engine_is_MPI ) { diff --git a/src/MPI/interface.cpp b/src/MPI/interface.cpp index 30ece40d..2e969957 100644 --- a/src/MPI/interface.cpp +++ b/src/MPI/interface.cpp @@ -100,6 +100,81 @@ void Interface :: put( memslot_t srcSlot, size_t srcOffset, size ); } +// only for HiCR +//#ifdef + +void Interface :: lockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, + size_t size ) +{ + m_mesgQueue.lockSlot( srcSlot, srcOffset, + dstPid, dstSlot, dstOffset, + size ); +} + +void Interface :: unlockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, + size_t size ) +{ + m_mesgQueue.unlockSlot( srcSlot, srcOffset, + dstPid, dstSlot, dstOffset, + size ); +} + +void Interface :: getRcvdMsgCountPerSlot(size_t * msgs, SlotID slot) { + m_mesgQueue.getRcvdMsgCountPerSlot(msgs, slot); +} + +void Interface :: getSentMsgCountPerSlot(size_t * msgs, SlotID slot) { + m_mesgQueue.getSentMsgCountPerSlot(msgs, slot); +} + + +void Interface :: getRcvdMsgCount(size_t * msgs) { + m_mesgQueue.getRcvdMsgCount(msgs); +} + +void Interface :: getSentMsgCount(size_t * msgs) { + m_mesgQueue.getSentMsgCount(msgs); +} + +void Interface :: flushSent() { + m_mesgQueue.flushSent(); +} + +void Interface :: flushReceived() { + m_mesgQueue.flushReceived(); +} + +err_t Interface :: countingSyncPerSlot(memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + if ( 0 == m_aborted ) + { + m_aborted = m_mesgQueue.countingSyncPerSlot(slot, expected_sent, expected_rcvd); + return LPF_SUCCESS; + } + else + { + return LPF_ERR_FATAL; + } +} + +err_t Interface :: syncPerSlot(memslot_t slot) +{ + if ( 0 == m_aborted ) + { + m_aborted = m_mesgQueue.syncPerSlot(slot); + return LPF_SUCCESS; + } + else + { + return LPF_ERR_FATAL; + } +} + +// only for HiCR +//#endif + void Interface :: get( pid_t srcPid, memslot_t srcSlot, size_t srcOffset, memslot_t dstSlot, size_t dstOffset, size_t size ) @@ -137,9 +212,16 @@ err_t Interface :: resizeMesgQueue( size_t nMsgs ) void Interface :: abort() { ASSERT( 0 == m_aborted ); +#ifdef LPF_CORE_MPI_USES_zero + int vote = 1; + int voted; + m_comm.allreduceSum(&vote, &voted, 1); + m_aborted = voted; +#else // signal all other processes at the start of the next 'sync' that // this process aborted. m_aborted = m_mesgQueue.sync( true ); +#endif } pid_t Interface :: isAborted() const diff --git a/src/MPI/interface.hpp b/src/MPI/interface.hpp index 732f0a9b..9a10b8e5 100644 --- a/src/MPI/interface.hpp +++ b/src/MPI/interface.hpp @@ -70,6 +70,36 @@ class _LPFLIB_LOCAL Interface static err_t hook( const mpi::Comm & comm , spmd_t spmd, args_t args ); + // only for HiCR + // #if + err_t countingSyncPerSlot(memslot_t slot, size_t expected_sent, size_t expected_rcvd); + + err_t syncPerSlot(memslot_t slot); + + typedef size_t SlotID; + + void getRcvdMsgCountPerSlot(size_t * msgs, SlotID slot); + + void getSentMsgCountPerSlot(size_t * msgs, SlotID slot); + + void getSentMsgCount(size_t * msgs); + + void getRcvdMsgCount(size_t * msgs); + + void flushSent(); + + void flushReceived(); + + void lockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, + size_t size ); + + void unlockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, + size_t size ); + + // only for HiCR +//#endif err_t rehook( spmd_t spmd, args_t args); void probe( machine_t & machine ) ; diff --git a/src/MPI/memorytable.cpp b/src/MPI/memorytable.cpp index 3bb7a792..51947985 100644 --- a/src/MPI/memorytable.cpp +++ b/src/MPI/memorytable.cpp @@ -23,7 +23,7 @@ namespace lpf { MemoryTable :: MemoryTable( Communication & comm -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero , mpi::IBVerbs & ibverbs #endif ) @@ -34,7 +34,7 @@ MemoryTable :: MemoryTable( Communication & comm , m_removed( 0, 0 ) , m_comm( comm ) #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero , m_added( 0, 0 ) , m_ibverbs( ibverbs ) , m_comm( comm ) @@ -45,7 +45,7 @@ MemoryTable :: MemoryTable( Communication & comm MemoryTable :: Slot MemoryTable :: addLocal( void * mem, std::size_t size ) // nothrow { -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero Memory rec( mem, size, m_ibverbs.regLocal( mem, size)); #else Memory rec( mem, size); @@ -56,13 +56,13 @@ MemoryTable :: addLocal( void * mem, std::size_t size ) // nothrow MemoryTable :: Slot MemoryTable :: addGlobal( void * mem, std::size_t size ) // nothrow { -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero Memory rec(mem, size, -1); #else Memory rec(mem, size); #endif Slot slot = m_memreg.addGlobalReg(rec) ; -#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero m_added.insert( slot ); #endif return slot; @@ -92,7 +92,7 @@ void MemoryTable :: remove( Slot slot ) // nothrow m_memreg.removeReg( slot ); #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero if (m_added.contains(slot)) { m_added.erase(slot); } @@ -123,7 +123,7 @@ void MemoryTable :: reserve( size_t size ) // throws bad_alloc, strong safe m_memreg.reserve( size ); #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero m_memreg.reserve( size ); size_t range = m_memreg.range(); m_added.resize( range ); @@ -151,7 +151,7 @@ bool MemoryTable :: needsSync() const #ifdef LPF_CORE_MPI_USES_mpimsg return false; #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero return !m_added.empty(); #endif } @@ -194,7 +194,7 @@ void MemoryTable :: sync( ) } // if #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero if ( !m_added.empty() ) { // Register the global with IBverbs diff --git a/src/MPI/memorytable.hpp b/src/MPI/memorytable.hpp index 18dd5038..05c01eee 100644 --- a/src/MPI/memorytable.hpp +++ b/src/MPI/memorytable.hpp @@ -1,4 +1,3 @@ - /* * Copyright 2021 Huawei Technologies Co., Ltd. * @@ -24,7 +23,7 @@ #include "assert.hpp" #include "linkage.hpp" -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero #include "ibverbs.hpp" #endif @@ -44,7 +43,7 @@ class _LPFLIB_LOCAL MemoryTable struct Memory { char *addr; size_t size; -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero mpi::IBVerbs::SlotID slot; Memory( void * a, size_t s, mpi::IBVerbs::SlotID sl) : addr(static_cast(a)) @@ -65,7 +64,7 @@ class _LPFLIB_LOCAL MemoryTable static Slot invalidSlot() { return Register::invalidSlot(); } -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero explicit MemoryTable( Communication & comm, mpi::IBVerbs & verbs ); #else explicit MemoryTable( Communication & comm ); @@ -90,7 +89,7 @@ class _LPFLIB_LOCAL MemoryTable { return m_windows[ slot ]; } #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero mpi::IBVerbs::SlotID getVerbID( Slot slot ) const { return m_memreg.lookup( slot ).slot; } #endif @@ -118,7 +117,7 @@ class _LPFLIB_LOCAL MemoryTable Communication & m_comm; #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero DirtyList m_added; mpi::IBVerbs & m_ibverbs; Communication & m_comm; diff --git a/src/MPI/mesgqueue.cpp b/src/MPI/mesgqueue.cpp index 0f610a52..78d2c4db 100644 --- a/src/MPI/mesgqueue.cpp +++ b/src/MPI/mesgqueue.cpp @@ -16,6 +16,7 @@ */ #include "mesgqueue.hpp" +#include "ibverbs.hpp" #include "mpilib.hpp" #include "log.hpp" #include "assert.hpp" @@ -97,19 +98,19 @@ MessageQueue :: MessageQueue( Communication & comm ) , m_edgeRecv() , m_edgeSend() , m_edgeBuffer() -#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero , m_edgeBufferSlot( m_memreg.invalidSlot() ) #endif , m_bodySends() , m_bodyRecvs() , m_comm( dynamic_cast(comm) ) -#ifdef LPF_CORE_MPI_USES_ibverbs - , m_ibverbs( m_comm ) + , m_tinyMsgBuf( m_tinyMsgSize + largestHeader(m_nprocs, m_memRange, 0, 0)) +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero + , m_ibverbs(m_comm) , m_memreg( m_comm, m_ibverbs ) #else , m_memreg( m_comm ) #endif - , m_tinyMsgBuf( m_tinyMsgSize + largestHeader(m_nprocs, m_memRange, 0, 0)) { m_memreg.reserve(1); // reserve slot for edgeBuffer } @@ -179,7 +180,7 @@ err_t MessageQueue :: resizeMesgQueue( size_t nMsgs ) #ifdef LPF_CORE_MPI_USES_mpimsg m_comm.reserveMsgs( 6* nMsgs ); //another factor three stems from sending edges separately . #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero m_ibverbs.resizeMesgq( 6*nMsgs); #endif @@ -270,6 +271,14 @@ void MessageQueue :: removeReg( memslot_t slot ) void MessageQueue :: get( pid_t srcPid, memslot_t srcSlot, size_t srcOffset, memslot_t dstSlot, size_t dstOffset, size_t size ) { +#ifdef LPF_CORE_MPI_USES_zero + m_ibverbs.get(srcPid, + m_memreg.getVerbID( srcSlot), + srcOffset, + m_memreg.getVerbID( dstSlot), + dstOffset, + size ); +#else if (size > 0) { ASSERT( ! m_memreg.isLocalSlot( srcSlot ) ); @@ -310,11 +319,48 @@ void MessageQueue :: get( pid_t srcPid, memslot_t srcSlot, size_t srcOffset, } } } +#endif +} + +void MessageQueue :: lockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ) +{ + ASSERT(srcSlot != LPF_INVALID_MEMSLOT); + ASSERT(dstSlot != LPF_INVALID_MEMSLOT); + (void) srcOffset; + (void) dstOffset; + (void) dstPid; + (void) size; +#ifdef LPF_CORE_MPI_USES_zero +m_ibverbs.blockingCompareAndSwap(m_memreg.getVerbID(srcSlot), srcOffset, dstPid, m_memreg.getVerbID(dstSlot), dstOffset, size, 0ULL, 1ULL); +#endif +} + +void MessageQueue :: unlockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ) +{ + ASSERT(srcSlot != LPF_INVALID_MEMSLOT); + ASSERT(dstSlot != LPF_INVALID_MEMSLOT); + (void) srcOffset; + (void) dstOffset; + (void) dstPid; + (void) size; +#ifdef LPF_CORE_MPI_USES_zero +m_ibverbs.blockingCompareAndSwap(m_memreg.getVerbID(srcSlot), srcOffset, dstPid, m_memreg.getVerbID(dstSlot), dstOffset, size, 1ULL, 0ULL); +#endif } void MessageQueue :: put( memslot_t srcSlot, size_t srcOffset, pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ) { +#ifdef LPF_CORE_MPI_USES_zero + m_ibverbs.put( m_memreg.getVerbID( srcSlot), + srcOffset, + dstPid, + m_memreg.getVerbID( dstSlot), + dstOffset, + size); +#else if (size > 0) { ASSERT( ! m_memreg.isLocalSlot( dstSlot ) ); @@ -348,10 +394,20 @@ void MessageQueue :: put( memslot_t srcSlot, size_t srcOffset, } } } +#endif + } int MessageQueue :: sync( bool abort ) { +#ifdef LPF_CORE_MPI_USES_zero + // if not, deal with normal sync + (void) abort; + m_memreg.sync(); + m_ibverbs.sync(m_resized); + m_resized = false; +#else + LOG(4, "mpi :: MessageQueue :: sync( abort " << (abort?"true":"false") << " )"); using mpi::ipc::newMsg; @@ -971,9 +1027,96 @@ int MessageQueue :: sync( bool abort ) ASSERT( m_bodyRecvs.empty() ); LOG(4, "End of synchronisation"); +#endif return 0; + } +int MessageQueue :: countingSyncPerSlot(memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + + ASSERT(slot != LPF_INVALID_MEMSLOT); + (void) expected_sent; + (void) expected_rcvd; +#ifdef LPF_CORE_MPI_USES_zero + + // if not, deal with normal sync + m_memreg.sync(); + m_ibverbs.countingSyncPerSlot(m_memreg.getVerbID(slot), expected_sent, expected_rcvd); + m_resized = false; + + +#endif + return 0; +} + +int MessageQueue :: syncPerSlot(memslot_t slot) +{ + + ASSERT(slot != LPF_INVALID_MEMSLOT); +#ifdef LPF_CORE_MPI_USES_zero + + // if not, deal with normal sync + m_memreg.sync(); + m_ibverbs.syncPerSlot(m_memreg.getVerbID(slot)); + m_resized = false; + +#endif + return 0; +} + + +void MessageQueue :: getRcvdMsgCountPerSlot(size_t * msgs, memslot_t slot) +{ + + ASSERT(msgs != nullptr); + ASSERT(slot != LPF_INVALID_MEMSLOT); +#ifdef LPF_CORE_MPI_USES_zero + *msgs = 0; + m_ibverbs.get_rcvd_msg_count_per_slot(msgs, m_memreg.getVerbID(slot)); +#endif +} + +void MessageQueue :: getRcvdMsgCount(size_t * msgs) +{ + ASSERT(msgs != nullptr); +#ifdef LPF_CORE_MPI_USES_zero + *msgs = 0; + m_ibverbs.get_rcvd_msg_count(msgs); +#endif +} + +void MessageQueue :: getSentMsgCount(size_t * msgs) +{ + ASSERT(msgs != nullptr); +#ifdef LPF_CORE_MPI_USES_zero + *msgs = 0; + m_ibverbs.get_sent_msg_count(msgs); +#endif +} +void MessageQueue :: getSentMsgCountPerSlot(size_t * msgs, memslot_t slot) +{ + ASSERT(msgs != nullptr); + ASSERT(slot != LPF_INVALID_MEMSLOT); +#ifdef LPF_CORE_MPI_USES_zero + *msgs = 0; + m_ibverbs.get_sent_msg_count_per_slot(msgs, m_memreg.getVerbID(slot)); +#endif +} + +void MessageQueue :: flushSent() +{ +#ifdef LPF_CORE_MPI_USES_zero + m_ibverbs.flushSent(); +#endif +} + +void MessageQueue :: flushReceived() +{ +#ifdef LPF_CORE_MPI_USES_zero + m_ibverbs.flushReceived(); +#endif +} } // namespace lpf diff --git a/src/MPI/mesgqueue.hpp b/src/MPI/mesgqueue.hpp index 27e7beb5..9bb704d0 100644 --- a/src/MPI/mesgqueue.hpp +++ b/src/MPI/mesgqueue.hpp @@ -33,14 +33,16 @@ #include #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero #include "ibverbs.hpp" #endif + namespace lpf { class _LPFLIB_LOCAL MessageQueue { + public: explicit MessageQueue( Communication & comm ); @@ -49,7 +51,9 @@ class _LPFLIB_LOCAL MessageQueue memslot_t addLocalReg( void * mem, std::size_t size ); + memslot_t addGlobalReg( void * mem, std::size_t size ); + void removeReg( memslot_t slot ); void get( pid_t srcPid, memslot_t srcSlot, size_t srcOffset, @@ -62,8 +66,32 @@ class _LPFLIB_LOCAL MessageQueue // returns how many processes have entered in an aborted state int sync( bool abort ); +//only for HiCR + void lockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ); + + void unlockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ); + + void getRcvdMsgCountPerSlot(size_t * msgs, memslot_t slot); + + void getRcvdMsgCount(size_t * msgs); + + void getSentMsgCountPerSlot(size_t * msgs, memslot_t slot); + + void getSentMsgCount(size_t * msgs); + + void flushSent(); + + void flushReceived(); + + int countingSyncPerSlot(memslot_t slot, size_t expected_sent, size_t expected_rcvd); + + int syncPerSlot(memslot_t slot); +// end only for HiCR + private: - enum Msgs { BufPut , + enum Msgs { BufPut , BufGet, BufGetReply, HpPut, HpGet , HpBodyReply , HpEdges, HpEdgesReply }; @@ -72,7 +100,7 @@ class _LPFLIB_LOCAL MessageQueue SrcPid, DstPid, SrcOffset, DstOffset, BufOffset, SrcSlot, DstSlot, Size, - RoundedDstOffset, RoundedSize, + RoundedDstOffset, RoundedSize, Payload, Head, Tail}; struct Edge { @@ -126,13 +154,13 @@ class _LPFLIB_LOCAL MessageQueue std::vector< Edge > m_edgeRecv; std::vector< Edge > m_edgeSend; std::vector< char > m_edgeBuffer; -#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero memslot_t m_edgeBufferSlot; #endif std::vector< Body > m_bodySends; std::vector< Body > m_bodyRecvs; mpi::Comm m_comm; -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_zero mpi::IBVerbs m_ibverbs; #endif MemoryTable m_memreg; diff --git a/src/core-libraries/collectives.c b/src/core-libraries/collectives.c index ff952e1f..cc80a69b 100644 --- a/src/core-libraries/collectives.c +++ b/src/core-libraries/collectives.c @@ -390,6 +390,41 @@ lpf_err_t lpf_allgather( return LPF_SUCCESS; } + +lpf_err_t lpf_allgatherv( + lpf_coll_t coll, + lpf_memslot_t src, + lpf_memslot_t dst, + size_t *sizes, + bool exclude_myself + ) { + + ASSERT( coll.P > 0 ); + ASSERT( coll.s < coll.P ); + + size_t allgatherv_start_addresses[coll.P]; + + for (size_t i=0; i 0) { + for (size_t i=0; i ) -target_link_libraries(${libname} ${LIB_POSIX_THREADS}) -target_include_directories(${libname} PRIVATE ${MPI_C_INCLUDE_PATH}) +target_link_libraries( ${libname} ${LIB_POSIX_THREADS}) +target_include_directories( ${libname} PRIVATE ${MPI_C_INCLUDE_PATH}) set_target_properties(${libname} PROPERTIES SOVERSION ${SOVERSION} ) @@ -37,5 +37,4 @@ install(TARGETS ${libname} EXPORT lpf ARCHIVE DESTINATION ${INSTALL_LIB} ) -add_gtest(rwconflict_test "pthread" rwconflict.t.cpp rwconflict.cpp) - #$ ) +add_gtest(rwconflict_test "pthread" ON rwconflict.t.cpp rwconflict.cpp) diff --git a/src/debug/core.cpp b/src/debug/core.cpp index c3d0adec..f77ee715 100644 --- a/src/debug/core.cpp +++ b/src/debug/core.cpp @@ -30,6 +30,12 @@ #undef lpf_exec #undef lpf_hook #undef lpf_rehook +#undef lpf_abort +#undef lpf_get_rcvd_msg_count +#undef lpf_get_rcvd_msg_count_per_slot +#undef lpf_get_sent_msg_count_per_slot +#undef lpf_flush +#undef lpf_abort #undef lpf_init_t #undef lpf_pid_t @@ -718,6 +724,18 @@ class _LPFLIB_LOCAL Interface { return LPF_SUCCESS; } + lpf_err_t get_rcvd_msg_count_per_slot(size_t *msgs, lpf_memslot_t slot) { + return LPF_SUCCESS; + } + + lpf_err_t get_sent_msg_count_per_slot(size_t *msgs, lpf_memslot_t slot) { + return LPF_SUCCESS; + } + + lpf_err_t get_rcvd_msg_count(size_t *msgs) { + return LPF_SUCCESS; + } + lpf_err_t register_local( const char * file, int line, void * pointer, size_t size, lpf_memslot_t * memslot ) { diff --git a/src/hybrid/core.cpp b/src/hybrid/core.cpp index 404edda8..791a1f68 100644 --- a/src/hybrid/core.cpp +++ b/src/hybrid/core.cpp @@ -343,6 +343,23 @@ _LPFLIB_API lpf_err_t lpf_sync( lpf_t ctx, lpf_sync_attr_t attr ) return realContext(ctx)->sync(); } +_LPFLIB_API lpf_err_t lpf_counting_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + (void) attr; + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + return realContext(ctx)->countingSyncPerSlot(slot, expected_sent, expected_rcvd); +} + +_LPFLIB_API lpf_err_t lpf_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot) +{ + (void) attr; + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + return realContext(ctx)->syncPerSlot(slot); +} _LPFLIB_API lpf_err_t lpf_probe( lpf_t ctx, lpf_machine_t * params ) { @@ -387,10 +404,43 @@ _LPFLIB_API lpf_err_t lpf_resize_memory_register( lpf_t ctx, size_t max_regs ) return LPF_SUCCESS; } +_LPFLIB_API lpf_err_t lpf_get_rcvd_msg_count( lpf_t ctx, size_t * rcvd_msgs) +{ + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + ThreadState * t = realContext(ctx); + if (!t->error()) + return t->getRcvdMsgCount(rcvd_msgs); + else + return LPF_SUCCESS; +} + +_LPFLIB_API lpf_err_t lpf_get_rcvd_msg_count_per_slot( lpf_t ctx, size_t * rcvd_msgs, lpf_memslot_t slot ) +{ + using namespace lpf::hybrid; + ThreadState * t = realContext(ctx); + MPI mpi = t->nodeState().mpi(); + mpi.abort(); + return LPF_SUCCESS; +} + +_LPFLIB_API lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t ctx, size_t * sent_msgs, lpf_memslot_t slot ) +{ + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + ThreadState * t = realContext(ctx); + if (!t->error()) + return t->getSentMsgCount(sent_msgs, slot); + else + return LPF_SUCCESS; +} + _LPFLIB_API lpf_err_t lpf_abort(lpf_t ctx) { using namespace lpf::hybrid; - ThreadState * const t = realContext(ctx); + ThreadState * t = realContext(ctx); MPI mpi = t->nodeState().mpi(); mpi.abort(); return LPF_SUCCESS; diff --git a/src/hybrid/dispatch.hpp b/src/hybrid/dispatch.hpp index c131c412..15b35393 100644 --- a/src/hybrid/dispatch.hpp +++ b/src/hybrid/dispatch.hpp @@ -118,6 +118,21 @@ namespace lpf { namespace hybrid { err_t deregister( memslot_t memslot) { return USE_THREAD( deregister)(m_ctx, memslot); } + err_t get_rcvd_msg_count_per_slot( size_t * rcvd_msgs, lpf_memslot_t slot) + { return USE_THREAD( get_rcvd_msg_count_per_slot)(m_ctx, rcvd_msgs, slot); } + + err_t get_sent_msg_count_per_slot( size_t * sent_msgs, lpf_memslot_t slot) + { return USE_THREAD( get_sent_msg_count_per_slot)(m_ctx, sent_msgs, slot); } + + err_t get_rcvd_msg_count( size_t * rcvd_msgs) + { return USE_THREAD( get_rcvd_msg_count)(m_ctx, rcvd_msgs); } + + err_t flush_sent() + { return USE_THREAD(flush_sent)(m_ctx); } + + err_t flush_received() + { return USE_THREAD(flush_received)(m_ctx); } + err_t put( memslot_t src_slot, size_t src_offset, pid_t dst_pid, memslot_t dst_slot, size_t dst_offset, size_t size, msg_attr_t attr = MSG_DEFAULT ) @@ -133,6 +148,12 @@ namespace lpf { namespace hybrid { err_t sync( sync_attr_t attr = SYNC_DEFAULT ) { return USE_THREAD(sync)( m_ctx, attr ); } + err_t sync_per_slot( sync_attr_t attr = SYNC_DEFAULT, memslot_t slot = LPF_INVALID_MEMSLOT) + { return USE_THREAD(sync_per_slot)( m_ctx, attr, slot); } + + err_t counting_sync_per_slot( sync_attr_t attr = SYNC_DEFAULT, lpf_memslot_t slot = LPF_INVALID_MEMSLOT, size_t expected_sent = 0, size_t expected_recvd = 0) + { return USE_THREAD(counting_sync_per_slot)(m_ctx, attr, slot, expected_sent, expected_recvd); } + err_t probe( machine_t * params ) { return USE_THREAD(probe)(m_ctx, params ); } @@ -208,6 +229,21 @@ namespace lpf { namespace hybrid { err_t deregister( memslot_t memslot) { return USE_MPI( deregister)(m_ctx, memslot); } + err_t get_rcvd_msg_count_per_slot(size_t *rcvd_msgs, lpf_memslot_t slot) + { return USE_MPI( get_rcvd_msg_count_per_slot)( m_ctx, rcvd_msgs, slot); } + + err_t get_sent_msg_count_per_slot(size_t *sent_msgs, lpf_memslot_t slot) + { return USE_MPI( get_sent_msg_count_per_slot)( m_ctx, sent_msgs, slot); } + + err_t get_rcvd_msg_count( size_t * rcvd_msgs) + { return USE_MPI( get_rcvd_msg_count)(m_ctx, rcvd_msgs); } + + err_t flush_sent() + {return USE_MPI( flush_sent)(m_ctx);} + + err_t flush_received() + {return USE_MPI( flush_received)(m_ctx);} + err_t put( memslot_t src_slot, size_t src_offset, pid_t dst_pid, memslot_t dst_slot, size_t dst_offset, size_t size, msg_attr_t attr = MSG_DEFAULT ) @@ -223,6 +259,12 @@ namespace lpf { namespace hybrid { err_t sync( sync_attr_t attr = SYNC_DEFAULT ) { return USE_MPI(sync)( m_ctx, attr ); } + err_t sync_per_slot( sync_attr_t attr = SYNC_DEFAULT, lpf_memslot_t slot = LPF_INVALID_MEMSLOT ) + { return USE_MPI(sync_per_slot)( m_ctx, attr, slot); } + + err_t counting_sync_per_slot( sync_attr_t attr = SYNC_DEFAULT, lpf_memslot_t slot = LPF_INVALID_MEMSLOT, size_t expected_sent = 0, size_t expected_recvd = 0) + { return USE_MPI(counting_sync_per_slot)(m_ctx, attr, slot, expected_sent, expected_recvd); } + err_t probe( machine_t * params ) { return USE_MPI(probe)(m_ctx, params ); } diff --git a/src/hybrid/state.hpp b/src/hybrid/state.hpp index 6ae1dd3a..81466106 100644 --- a/src/hybrid/state.hpp +++ b/src/hybrid/state.hpp @@ -367,6 +367,16 @@ class _LPFLIB_LOCAL ThreadState { return LPF_SUCCESS; } + lpf_err_t countingSyncPerSlot(lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) + { + return m_nodeState.mpi().counting_sync_per_slot(slot, expected_sent, expected_rcvd); + } + + lpf_err_t syncPerSlot(lpf_memslot_t slot) + { + return m_nodeState.mpi().sync_per_slot(slot); + } + ThreadState( NodeState * nodeState, Thread thread ) : m_error(false) , m_threadId( thread.pid() ) @@ -405,6 +415,25 @@ class _LPFLIB_LOCAL ThreadState { bool error() const { return m_error; } + lpf_pid_t getRcvdMsgCount(size_t * rcvd_msgs, lpf_memslot_t slot) { + + return m_nodeState.mpi().get_rcvd_msg_count_per_slot(rcvd_msgs, slot); + } + + lpf_pid_t getSentMsgCount(size_t * sent_msgs, lpf_memslot_t slot) { + + return m_nodeState.mpi().get_sent_msg_count_per_slot(sent_msgs, slot); + } + + lpf_pid_t getRcvdMsgCount(size_t * rcvd_msgs) { + + return m_nodeState.mpi().get_rcvd_msg_count(rcvd_msgs); + } + + lpf_pid_t flush() { + return (m_nodeState.mpi().flush_sent() && m_nodeState.mpi().flush_received()); + } + private: bool m_error; diff --git a/src/imp/core.c b/src/imp/core.c index e076b811..2d846ece 100644 --- a/src/imp/core.c +++ b/src/imp/core.c @@ -139,6 +139,39 @@ lpf_err_t lpf_sync( lpf_t lpf, lpf_sync_attr_t attr ) return LPF_SUCCESS; } +lpf_err_t lpf_counting_sync_per_slot( lpf_t lpf, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + (void) lpf; + (void) attr; + return LPF_SUCCESS; +} + +lpf_err_t lpf_lock_slot( + lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +) { + return LPF_SUCCESS; +} + +lpf_err_t lpf_unlock_slot( + lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +) { + return LPF_SUCCESS; +} + static double messageGap( lpf_pid_t p, size_t min_msg_size, lpf_sync_attr_t attr) { (void) p; @@ -181,7 +214,29 @@ lpf_err_t lpf_resize_memory_register( lpf_t lpf, size_t max_regs ) return LPF_SUCCESS; } -lpf_err_t lpf_abort( lpf_t lpf ) +lpf_err_t lpf_get_rcvd_msg_count_per_slot( lpf_t lpf, size_t * rcvd_msgs, lpf_memslot_t slot) { + (void) lpf; + *rcvd_msgs = 0; + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_rcvd_msg_count( lpf_t lpf, size_t * rcvd_msgs) { + (void) lpf; + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t lpf, size_t * sent_msgs, lpf_memslot_t slot) { + (void) lpf; + *sent_msgs = 0; + return LPF_SUCCESS; +} + +lpf_err_t lpf_flush( lpf_t lpf) { + (void) lpf; + return LPF_SUCCESS; +} + +lpf_err_t lpf_abort( lpf_t lpf) { (void) lpf; return LPF_SUCCESS; diff --git a/src/pthreads/core.cpp b/src/pthreads/core.cpp index 080b6a1d..df455986 100644 --- a/src/pthreads/core.cpp +++ b/src/pthreads/core.cpp @@ -335,6 +335,13 @@ lpf_err_t lpf_sync( lpf_t ctx, lpf_sync_attr_t attr ) return realCtx(ctx)->sync(); } +lpf_err_t lpf_counting_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + (void) attr; // ignore attr parameter since this implementation only + // implements core functionality + return realCtx(ctx)->countingSyncPerSlot(slot, expected_sent, expected_rcvd); +} + namespace { double messageGap( lpf_pid_t p, size_t min_msg_size, @@ -383,6 +390,31 @@ lpf_err_t lpf_resize_memory_register( lpf_t ctx, size_t max_regs ) return t->resizeMemreg(max_regs); } +lpf_err_t lpf_get_rcvd_msg_count_per_slot(lpf_t ctx, size_t * msgs, lpf_memslot_t slot) { + *msgs = 0; + lpf::ThreadLocalData * t = realCtx(ctx); + if (t->isAborted()) + return LPF_SUCCESS; + return LPF_SUCCESS; +} + + +lpf_err_t lpf_get_rcvd_msg_count(lpf_t ctx, size_t * msgs) { + *msgs = 0; + lpf::ThreadLocalData * t = realCtx(ctx); + if (t->isAborted()) + return LPF_SUCCESS; + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_sent_msg_count_per_slot(lpf_t ctx, size_t * msgs, lpf_memslot_t slot) { + *msgs = 0; + lpf::ThreadLocalData * t = realCtx(ctx); + if (t->isAborted()) + return LPF_SUCCESS; + return LPF_SUCCESS; +} + lpf_err_t lpf_abort(lpf_t ctx) { (void) ctx; // Using std::abort is not portable diff --git a/src/pthreads/threadlocaldata.cpp b/src/pthreads/threadlocaldata.cpp index 6bb358f1..6a62e4d3 100644 --- a/src/pthreads/threadlocaldata.cpp +++ b/src/pthreads/threadlocaldata.cpp @@ -423,7 +423,7 @@ err_t ThreadLocalData :: resizeMemreg( size_t nRegs ) // nothrow } } -err_t ThreadLocalData :: sync( bool expectExit ) +err_t ThreadLocalData :: sync( bool expectExit) { if ( m_state->sync(m_pid) ) { @@ -441,6 +441,10 @@ err_t ThreadLocalData :: sync( bool expectExit ) return LPF_SUCCESS; } +err_t ThreadLocalData :: countingSyncPerSlot(bool expectExit, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) { + return LPF_SUCCESS; +} + namespace { int getNumberOfProcs() { diff --git a/src/pthreads/threadlocaldata.hpp b/src/pthreads/threadlocaldata.hpp index 66d56160..c1a83706 100644 --- a/src/pthreads/threadlocaldata.hpp +++ b/src/pthreads/threadlocaldata.hpp @@ -105,6 +105,8 @@ class _LPFLIB_LOCAL ThreadLocalData { return m_atExit[0]; } err_t sync( bool expectExit = false ); // nothrow + err_t countingSyncPerSlot( bool expectExit = false, lpf_memslot_t slot = LPF_INVALID_MEMSLOT, size_t expected_sent = 0, size_t expected_rcvd = 0); // nothrow + err_t syncPerSlot( bool expectExit = false, lpf_memslot_t slot = LPF_INVALID_MEMSLOT); // nothrow private: ThreadLocalData( const ThreadLocalData & ) ; // prohibit copying diff --git a/test_launcher.py b/test_launcher.py new file mode 100644 index 00000000..2f043e92 --- /dev/null +++ b/test_launcher.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python +import argparse +import subprocess +import sys + +parser = argparse.ArgumentParser( description='Death test launcher' ) +parser.add_argument("-e", "--engine", type=str) +parser.add_argument("-L", "--parallel_launcher", type=str) +parser.add_argument("-p", "--min_process_count", type=int) +parser.add_argument("-P", "--max_process_count", type=int) +parser.add_argument("-t", "--lpf_probe_timer", type=float) +parser.add_argument("-R", "--expected_return_code", type=int) +parser.add_argument( 'cmd', nargs=argparse.REMAINDER ) +args = parser.parse_args() +# This is only for passing Gtest info to CMake +# The parallel launcher is still needed as Open MPI +# binaries terminate without the launcher on our cluster, +# even for single process runs +if args.cmd[-1] == '--gtest_list_tests': + run_cmd = [args.parallel_launcher, '-engine', args.engine, '-n', '1'] + args.cmd + cmd = subprocess.run( run_cmd) + sys.exit(cmd.returncode) +# Actual use of our launcher +else: + for i in range(args.min_process_count, args.max_process_count+1): + if args.lpf_probe_timer > 0.0: + run_cmd = [args.parallel_launcher, '-engine', args.engine, '-probe', str(args.lpf_probe_timer), '-n', str(i)] + args.cmd + else: + run_cmd = [args.parallel_launcher, '-engine', args.engine, '-n', str(i)] + args.cmd + print("Run command: ") + print(run_cmd) + cmd = subprocess.run( run_cmd) + print("Test returned code = " + str(cmd.returncode)) + retcode = cmd.returncode + if (retcode != args.expected_return_code): + print("Test " + args.cmd[0] + args.cmd[1] + "\nreturned\t" + str(retcode) + "\nexpected return code was: " + str(args.expected_return_code)) + sys.exit(1) + print("Test " + args.cmd[0] + args.cmd[1] + " passed") diff --git a/tests/functional/CMakeLists.txt b/tests/functional/CMakeLists.txt index 0eb7eea6..049fc8f2 100644 --- a/tests/functional/CMakeLists.txt +++ b/tests/functional/CMakeLists.txt @@ -125,8 +125,20 @@ foreach (LPF_IMPL_ID ${ENGINES}) string(REGEX REPLACE "(.${LPF_IMPL_ID})?.cpp$" "" baseName ${testSource}) get_filename_component(baseName ${testSource} NAME_WE ) set(exeName "${baseName}_${LPF_IMPL_ID}_${LPF_IMPL_CONFIG}${mode}") - add_gtest(${exeName} ${LPF_IMPL_ID} ${debug} "${CMAKE_CURRENT_SOURCE_DIR}/${testSource}") + # Following tests are NOT run with zero engine: + # - bsplib tests, as their semantics is not clear + # - simple early_exit tests fail, as only a subset of + # processes call lpf_sync - zero engine does not support that, + # and lpf_sync has an implicit barrier in zero engine + # - overlapping tests perform conflict resolution, but + # zero engine does not support that + string(REGEX MATCH "overlapping|early|bsplib" foundTest ${testSource}) + if (NOT ${LPF_IMPL_ID} STREQUAL "zero") + add_gtest(${exeName} ${LPF_IMPL_ID} ${debug} "${CMAKE_CURRENT_SOURCE_DIR}/${testSource}") + elseif ("${foundTest}" STREQUAL "") + add_gtest(${exeName} ${LPF_IMPL_ID} ${debug} "${CMAKE_CURRENT_SOURCE_DIR}/${testSource}") + endif() endforeach(testSource) endforeach(LPF_IMPL_ID) diff --git a/tests/functional/collectives/CMakeLists.txt b/tests/functional/collectives/CMakeLists.txt index 7047e3e6..ad2b1a9f 100644 --- a/tests/functional/collectives/CMakeLists.txt +++ b/tests/functional/collectives/CMakeLists.txt @@ -47,8 +47,6 @@ foreach (LPF_IMPL_ID ${ENGINES}) string(REGEX REPLACE "(.${LPF_IMPL_ID})?.cpp$" "" baseName ${testSource}) get_filename_component(baseName ${testSource} NAME_WE ) set(exeName "${baseName}_${LPF_IMPL_ID}_${LPF_IMPL_CONFIG}${mode}") - add_gtest(${exeName} ${LPF_IMPL_ID} ${debug} "${CMAKE_CURRENT_SOURCE_DIR}/${testSource}") - endforeach(testSource) endforeach(LPF_IMPL_ID) diff --git a/tests/functional/func_lpf_compare_and_swap.ibverbs.c b/tests/functional/func_lpf_compare_and_swap.ibverbs.c new file mode 100644 index 00000000..b4d84773 --- /dev/null +++ b/tests/functional/func_lpf_compare_and_swap.ibverbs.c @@ -0,0 +1,86 @@ + +/* + * Copyright 2021 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "Test.h" + +void spmd( lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args) +{ + (void) args; // ignore args parameter + lpf_err_t rc = LPF_SUCCESS; + + // local x is the compare-and-swap value and is important at non-root + uint64_t localSwap = 0ULL; + // global y is the global slot at 0, and should be initialized to 0ULL + uint64_t globalSwap = 0ULL; + int x = 0; + int y = 0; + lpf_memslot_t localSwapSlot = LPF_INVALID_MEMSLOT; + lpf_memslot_t globalSwapSlot = LPF_INVALID_MEMSLOT; + size_t maxmsgs = 2 , maxregs = 2; + rc = lpf_resize_message_queue( lpf, maxmsgs); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_resize_memory_register( lpf, maxregs ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_sync( lpf, LPF_SYNC_DEFAULT ); + lpf_memslot_t xslot = LPF_INVALID_MEMSLOT; + lpf_memslot_t yslot = LPF_INVALID_MEMSLOT; + rc = lpf_register_local( lpf, &localSwap, sizeof(localSwap), &localSwapSlot ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_register_local( lpf, &x, sizeof(x), &xslot ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_register_global( lpf, &globalSwap, sizeof(globalSwap), &globalSwapSlot ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_register_global( lpf, &y, sizeof(y), &yslot ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_sync( lpf, LPF_SYNC_DEFAULT); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + + + // BLOCKING + rc = lpf_lock_slot(lpf, localSwapSlot, 0, 0 /* rank where global slot to lock resides*/, globalSwapSlot, 0, sizeof(globalSwapSlot), LPF_MSG_DEFAULT); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_get( lpf, 0, yslot, 0, xslot, 0, sizeof(x), LPF_MSG_DEFAULT ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_sync_per_slot( lpf, LPF_SYNC_DEFAULT, xslot); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + x = x + 1; + rc = lpf_put( lpf, xslot, 0, 0, yslot, 0, sizeof(x), LPF_MSG_DEFAULT ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_sync_per_slot( lpf, LPF_SYNC_DEFAULT, xslot); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + // BLOCKING + lpf_unlock_slot(lpf, localSwapSlot, 0, 0 /* rank where global slot to lock resides*/, globalSwapSlot, 0, sizeof(globalSwapSlot), LPF_MSG_DEFAULT); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + lpf_sync(lpf, LPF_MSG_DEFAULT); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + if (pid == 0) + printf("Rank %d: y = %d\n", pid, y); +} + +/** + * \test Test atomic compare-and-swap on a global slot + * \pre P >= 1 + * \return Exit code: 0 + */ +TEST( func_lpf_compare_and_swap ) +{ + lpf_err_t rc = lpf_exec( LPF_ROOT, LPF_MAX_P, spmd, LPF_NO_ARGS); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + return 0; +} diff --git a/tests/functional/func_lpf_probe_parallel_nested.cpp b/tests/functional/func_lpf_probe_parallel_nested.cpp index f594b7b8..5381bffe 100644 --- a/tests/functional/func_lpf_probe_parallel_nested.cpp +++ b/tests/functional/func_lpf_probe_parallel_nested.cpp @@ -117,8 +117,8 @@ void spmd1( lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args) EXPECT_LT( 0.0, (*(subMachine.g))(machine.p, (size_t)(-1), LPF_SYNC_DEFAULT) ); EXPECT_LT( 0.0, (*(subMachine.l))(machine.p, (size_t)(-1), LPF_SYNC_DEFAULT) ); - const int pthread = 1, mpirma = 1, mpimsg = 1, hybrid = 0, ibverbs=1; - (void) pthread; (void) mpirma; (void) mpimsg; (void) hybrid; (void) ibverbs; + const int pthread = 1, mpirma = 1, mpimsg = 1, hybrid = 0, ibverbs=1, zero = 1; + (void) pthread; (void) mpirma; (void) mpimsg; (void) hybrid; (void) ibverbs; (void) zero; if (LPF_CORE_IMPL_ID) // this part is disabled for the hybrid implementation, because { // that one doesn't do generic nesting of lpf_exec's EXPECT_EQ( 1, subMachine.free_p == 2 || subMachine.free_p == 3 ); @@ -203,5 +203,4 @@ TEST( API, func_lpf_probe_parallel_nested ) rc = lpf_exec( LPF_ROOT, machine.p / 2, &spmd1, args ); EXPECT_EQ( LPF_SUCCESS, rc ); - } diff --git a/tests/functional/macro_LPF_VERSION.cpp b/tests/functional/macro_LPF_VERSION.cpp index 7588aeea..008ccfa2 100644 --- a/tests/functional/macro_LPF_VERSION.cpp +++ b/tests/functional/macro_LPF_VERSION.cpp @@ -19,10 +19,10 @@ #include "gtest/gtest.h" #ifdef _LPF_VERSION - #if _LPF_VERSION == 202000L + #if _LPF_VERSION == 202400L // everything is OK #else - #error Macro _LPF_VERSION has not been defined as 202000L + #error Macro _LPF_VERSION has not been defined as 202400L #endif #else #error Macro _LPF_VERSION has not been defined @@ -35,5 +35,5 @@ */ TEST( API, macro_LPF_VERSION ) { - EXPECT_EQ( 202000L, _LPF_VERSION ); + EXPECT_EQ( 202400L, _LPF_VERSION ); }