diff --git a/CMakeLists.txt b/CMakeLists.txt index 00b78dde..f1c8b1e4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,13 +112,13 @@ if (TR1_ARRAY) set(CMAKE_CXX_STANDARD 98) set(CMAKE_CXX_STANDARD_REQUIRED YES) else() - message(STATUS "Governing C++ standard is C++11") - set(CMAKE_CXX_STANDARD 11) + message(STATUS "Governing C++ standard is C++14") + set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_STANDARD_REQUIRED YES) endif() # Dependencies -set(ENGINES) +set(ENGINES "") find_library( LIB_POSIX_THREADS NAMES "pthread" DOC "Posix Threads" @@ -191,6 +191,7 @@ if ( LIB_MATH AND LIB_DL AND MPI_FOUND ) if (LIB_IBVERBS) list(APPEND ENGINES "ibverbs") + list(APPEND ENGINES "hicr") endif() endif() @@ -462,5 +463,7 @@ install(DIRECTORY "include/bsp" DESTINATION ${INSTALL_HEADERS}) install(DIRECTORY "include/debug" DESTINATION ${INSTALL_HEADERS}/lpf ) # Post install actions -add_subdirectory(post-install) +# Kiril is commenting the post-install runs as they always fail +# Probably should fix them at some point +# add_subdirectory(post-install) diff --git a/bootstrap.sh b/bootstrap.sh index e641e56e..93cb5bbd 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -84,7 +84,7 @@ builddir=`pwd` # Parse command line parameters installdir="$builddir" -config=Release +config=Debug #Release doc=OFF functests=OFF googletest_license_agreement=NO diff --git a/examples/rc_pingpong.c b/examples/rc_pingpong.c new file mode 100644 index 00000000..b38a3de7 --- /dev/null +++ b/examples/rc_pingpong.c @@ -0,0 +1,888 @@ +/* + * Copyright (c) 2005 Topspin Communications. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * OpenIB.org BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +#define _GNU_SOURCE +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "pingpong.h" + +#include + +#include "mpi.h" + +enum { + PINGPONG_RECV_WRID = 1, + PINGPONG_SEND_WRID = 2, + SWAP_WRID = 3, +}; + +static int page_size; +static int implicit_odp; +static int prefetch_mr; +static int validate_buf; + +struct pingpong_context { + struct ibv_context *context; + struct ibv_comp_channel *channel; + struct ibv_pd *pd; + struct ibv_mr *mr; + struct ibv_dm *dm; + union { + struct ibv_cq *cq; + struct ibv_cq_ex *cq_ex; + } cq_s; + struct ibv_qp *qp; + struct ibv_qp_ex *qpx; + char *buf; + int size; + int send_flags; + int rx_depth; + int pending; + struct ibv_port_attr portinfo; + uint64_t completion_timestamp_mask; +}; + +static struct ibv_cq *pp_cq(struct pingpong_context *ctx) +{ + return ctx->cq_s.cq; +} + +struct pingpong_dest { + int lid; + int qpn; + int psn; + uint32_t key; + uint64_t addr; + union ibv_gid gid; +}; + +static int pp_connect_ctx(struct pingpong_context *ctx, int port, int my_psn, + enum ibv_mtu mtu, int sl, + struct pingpong_dest *dest, int sgid_idx) +{ + struct ibv_qp_attr attr = { + .qp_state = IBV_QPS_RTR, + .path_mtu = mtu, + .dest_qp_num = dest->qpn, + .rq_psn = dest->psn, + .max_dest_rd_atomic = 1, + .min_rnr_timer = 12, + .ah_attr = { + .is_global = 0, + .dlid = dest->lid, + .sl = sl, + .src_path_bits = 0, + .port_num = port + } + }; + + if (dest->gid.global.interface_id) { + attr.ah_attr.is_global = 1; + attr.ah_attr.grh.hop_limit = 1; + attr.ah_attr.grh.dgid = dest->gid; + attr.ah_attr.grh.sgid_index = sgid_idx; + } + if (ibv_modify_qp(ctx->qp, &attr, + IBV_QP_STATE | + IBV_QP_AV | + IBV_QP_PATH_MTU | + IBV_QP_DEST_QPN | + IBV_QP_RQ_PSN | + IBV_QP_MAX_DEST_RD_ATOMIC | + IBV_QP_MIN_RNR_TIMER)) { + fprintf(stderr, "Failed to modify QP to RTR\n"); + return 1; + } + + attr.qp_state = IBV_QPS_RTS; + attr.timeout = 14; + attr.retry_cnt = 7; + attr.rnr_retry = 7; + attr.sq_psn = my_psn; + attr.max_rd_atomic = 1; + if (ibv_modify_qp(ctx->qp, &attr, + IBV_QP_STATE | + IBV_QP_TIMEOUT | + IBV_QP_RETRY_CNT | + IBV_QP_RNR_RETRY | + IBV_QP_SQ_PSN | + IBV_QP_MAX_QP_RD_ATOMIC)) { + fprintf(stderr, "Failed to modify QP to RTS\n"); + return 1; + } + + return 0; +} + +static struct pingpong_dest *pp_client_exch_dest(const char *servername, int port, + const struct pingpong_dest *my_dest) +{ + struct pingpong_dest *rem_dest = NULL; + char gid[33]; + + + gid_to_wire_gid(&my_dest->gid, gid); + + MPI_Send(&(my_dest->lid), 1, MPI_INT, 0, 0, MPI_COMM_WORLD); + MPI_Send(&(my_dest->qpn), 1, MPI_INT, 0, 0, MPI_COMM_WORLD); + MPI_Send(&(my_dest->psn), 1, MPI_INT, 0, 0, MPI_COMM_WORLD); + MPI_Send(gid, 33, MPI_CHAR, 0, 0, MPI_COMM_WORLD); + + rem_dest = malloc(sizeof *rem_dest); + + MPI_Recv(&(rem_dest->lid), 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&(rem_dest->qpn), 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&(rem_dest->psn), 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(gid, 33, MPI_CHAR, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&(rem_dest->key), 1, MPI_UINT32_T, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&(rem_dest->addr), 1, MPI_UINT64_T, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + wire_gid_to_gid(gid, &rem_dest->gid); + + return rem_dest; +} + +static struct pingpong_dest *pp_server_exch_dest(struct pingpong_context *ctx, + int ib_port, enum ibv_mtu mtu, + int port, int sl, + const struct pingpong_dest *my_dest, + int sgid_idx) +{ + + printf("Server process\n"); + struct pingpong_dest *rem_dest = NULL; + char gid[33]; + + + rem_dest = malloc(sizeof *rem_dest); + + MPI_Recv(&(rem_dest->lid), 1, MPI_INT, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&(rem_dest->qpn), 1, MPI_INT, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&(rem_dest->psn), 1, MPI_INT, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(gid, 33, MPI_CHAR, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + wire_gid_to_gid(gid, &rem_dest->gid); + + gid_to_wire_gid(&my_dest->gid, gid); + + MPI_Send(&(my_dest->lid), 1, MPI_INT, 1, 0, MPI_COMM_WORLD); + MPI_Send(&(my_dest->qpn), 1, MPI_INT, 1, 0, MPI_COMM_WORLD); + MPI_Send(&(my_dest->psn), 1, MPI_INT, 1, 0, MPI_COMM_WORLD); + MPI_Send(gid, 33, MPI_CHAR, 1, 0, MPI_COMM_WORLD); + uint32_t lkey = ctx->mr->lkey; + MPI_Send(&lkey, 1, MPI_UINT32_T, 1, 0, MPI_COMM_WORLD); + uint64_t addr = (uint64_t) ctx->buf; + MPI_Send(&addr, 1, MPI_UINT64_T, 1, 0, MPI_COMM_WORLD); + + if (pp_connect_ctx(ctx, ib_port, my_dest->psn, mtu, sl, rem_dest, + sgid_idx)) { + fprintf(stderr, "Couldn't connect to remote QP\n"); + free(rem_dest); + rem_dest = NULL; + return rem_dest; + } + + return rem_dest; +} + +static struct pingpong_context *pp_init_ctx(struct ibv_device *ib_dev, int size, + int rx_depth, int port) +{ + struct pingpong_context *ctx; + int access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC; + + ctx = calloc(1, sizeof *ctx); + if (!ctx) + return NULL; + + ctx->size = size; + ctx->send_flags = IBV_SEND_SIGNALED; + ctx->rx_depth = rx_depth; + + ctx->buf = memalign(page_size, size); + if (!ctx->buf) { + fprintf(stderr, "Couldn't allocate work buf.\n"); + goto clean_ctx; + } + + /* FIXME memset(ctx->buf, 0, size); */ + memset(ctx->buf, 0x7b, size); + + ctx->context = ibv_open_device(ib_dev); + if (!ctx->context) { + fprintf(stderr, "Couldn't get context for %s\n", + ibv_get_device_name(ib_dev)); + goto clean_buffer; + } + + ctx->channel = NULL; + + ctx->pd = ibv_alloc_pd(ctx->context); + if (!ctx->pd) { + fprintf(stderr, "Couldn't allocate PD\n"); + goto clean_comp_channel; + } + + + if (implicit_odp) { + ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, access_flags); + } else { + ctx->mr = ibv_reg_mr(ctx->pd, ctx->buf, size, access_flags); + } + + if (!ctx->mr) { + fprintf(stderr, "Couldn't register MR\n"); + goto clean_dm; + } + + if (prefetch_mr) { + struct ibv_sge sg_list; + int ret; + + sg_list.lkey = ctx->mr->lkey; + sg_list.addr = (uintptr_t)ctx->buf; + sg_list.length = size; + + ret = ibv_advise_mr(ctx->pd, IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE, + IB_UVERBS_ADVISE_MR_FLAG_FLUSH, + &sg_list, 1); + + if (ret) + fprintf(stderr, "Couldn't prefetch MR(%d). Continue anyway\n", ret); + } + + ctx->cq_s.cq = ibv_create_cq(ctx->context, rx_depth + 1, NULL, + ctx->channel, 0); + + if (!pp_cq(ctx)) { + fprintf(stderr, "Couldn't create CQ\n"); + goto clean_mr; + } + + { + struct ibv_qp_attr attr; + struct ibv_qp_init_attr init_attr = { + .send_cq = pp_cq(ctx), + .recv_cq = pp_cq(ctx), + .cap = { + .max_send_wr = 1, + .max_recv_wr = rx_depth, + .max_send_sge = 1, + .max_recv_sge = 1 + }, + .qp_type = IBV_QPT_RC + }; + + ctx->qp = ibv_create_qp(ctx->pd, &init_attr); + + if (!ctx->qp) { + fprintf(stderr, "Couldn't create QP\n"); + goto clean_cq; + } + + ibv_query_qp(ctx->qp, &attr, IBV_QP_CAP, &init_attr); + if (init_attr.cap.max_inline_data >= size ) + ctx->send_flags |= IBV_SEND_INLINE; + } + + { + struct ibv_qp_attr attr = { + .qp_state = IBV_QPS_INIT, + .pkey_index = 0, + .port_num = port, + .qp_access_flags = IBV_ACCESS_REMOTE_ATOMIC, + }; + + if (ibv_modify_qp(ctx->qp, &attr, + IBV_QP_STATE | + IBV_QP_PKEY_INDEX | + IBV_QP_PORT | + IBV_QP_ACCESS_FLAGS)) { + fprintf(stderr, "Failed to modify QP to INIT\n"); + goto clean_qp; + } + } + + return ctx; + +clean_qp: + ibv_destroy_qp(ctx->qp); + +clean_cq: + ibv_destroy_cq(pp_cq(ctx)); + +clean_mr: + ibv_dereg_mr(ctx->mr); + +clean_dm: + if (ctx->dm) + ibv_free_dm(ctx->dm); + +clean_pd: + ibv_dealloc_pd(ctx->pd); + +clean_comp_channel: + if (ctx->channel) + ibv_destroy_comp_channel(ctx->channel); + +clean_device: + ibv_close_device(ctx->context); + +clean_buffer: + free(ctx->buf); + +clean_ctx: + free(ctx); + + return NULL; +} + +static int pp_close_ctx(struct pingpong_context *ctx) +{ + if (ibv_destroy_qp(ctx->qp)) { + fprintf(stderr, "Couldn't destroy QP\n"); + return 1; + } + + if (ibv_destroy_cq(pp_cq(ctx))) { + fprintf(stderr, "Couldn't destroy CQ\n"); + return 1; + } + + if (ibv_dereg_mr(ctx->mr)) { + fprintf(stderr, "Couldn't deregister MR\n"); + return 1; + } + + if (ctx->dm) { + if (ibv_free_dm(ctx->dm)) { + fprintf(stderr, "Couldn't free DM\n"); + return 1; + } + } + + if (ibv_dealloc_pd(ctx->pd)) { + fprintf(stderr, "Couldn't deallocate PD\n"); + return 1; + } + + if (ctx->channel) { + if (ibv_destroy_comp_channel(ctx->channel)) { + fprintf(stderr, "Couldn't destroy completion channel\n"); + return 1; + } + } + + if (ibv_close_device(ctx->context)) { + fprintf(stderr, "Couldn't release context\n"); + return 1; + } + + free(ctx->buf); + free(ctx); + + return 0; +} + +static int pp_post_recv(struct pingpong_context *ctx, int n) +{ + struct ibv_sge list = { + .addr = (uintptr_t) ctx->buf, + .length = ctx->size, + .lkey = ctx->mr->lkey + }; + struct ibv_recv_wr wr = { + .wr_id = PINGPONG_RECV_WRID, + .sg_list = &list, + .num_sge = 1, + }; + struct ibv_recv_wr *bad_wr; + int i; + + for (i = 0; i < n; ++i) + if (ibv_post_recv(ctx->qp, &wr, &bad_wr)) + break; + + return i; +} + +static int pp_post_send(struct pingpong_context *ctx) +{ + struct ibv_sge list = { + .addr = (uintptr_t) ctx->buf, + .length = ctx->size, + .lkey = ctx->mr->lkey + }; + struct ibv_send_wr wr = { + .wr_id = PINGPONG_SEND_WRID, + .sg_list = &list, + .num_sge = 1, + .opcode = IBV_WR_SEND, + .send_flags = ctx->send_flags, + }; + struct ibv_send_wr *bad_wr; + + return ibv_post_send(ctx->qp, &wr, &bad_wr); +} + +static int pp_post_swap(struct pingpong_context *ctx, struct pingpong_dest *rem_dest, uint64_t compare_add, uint64_t swap) +{ + struct ibv_sge list = { + .addr = (uintptr_t) ctx->buf, + .length = ctx->size, + .lkey = ctx->mr->lkey + }; + struct ibv_send_wr wr = { + .wr_id = SWAP_WRID, + .sg_list = &list, + .num_sge = 1, + .opcode = IBV_WR_ATOMIC_CMP_AND_SWP, + .send_flags = IBV_SEND_SIGNALED, + .wr.atomic.remote_addr = rem_dest->addr, + .wr.atomic.compare_add = compare_add, + .wr.atomic.swap = swap, + .wr.atomic.rkey = rem_dest->key, + }; + struct ibv_send_wr *bad_wr; + + return ibv_post_send(ctx->qp, &wr, &bad_wr); +} + +struct ts_params { + uint64_t comp_recv_max_time_delta; + uint64_t comp_recv_min_time_delta; + uint64_t comp_recv_total_time_delta; + uint64_t comp_recv_prev_time; + int last_comp_with_ts; + unsigned int comp_with_time_iters; +}; + +static inline int parse_single_wc(struct pingpong_context *ctx, int *scnt, + int *rcnt, int *routs, int iters, + uint64_t wr_id, enum ibv_wc_status status, + uint64_t completion_timestamp, + struct ts_params *ts, + struct pingpong_dest *rem_dest + ) +{ + if (status != IBV_WC_SUCCESS) { + fprintf(stderr, "Failed status %s (%d) for wr_id %d\n", + ibv_wc_status_str(status), + status, (int)wr_id); + return 1; + } + + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + printf("Rank %d will process single wc = %"PRIu64"\n", rank, wr_id); + switch ((int)wr_id) { + case PINGPONG_SEND_WRID: + ++(*scnt); + break; + + case PINGPONG_RECV_WRID: + if (--(*routs) <= 1) { + //printf("Calling pp_post_recv\n"); + *routs += pp_post_recv(ctx, ctx->rx_depth - *routs); + if (*routs < ctx->rx_depth) { + fprintf(stderr, + "Couldn't post receive (%d)\n", + *routs); + return 1; + } + } + + ++(*rcnt); + ts->last_comp_with_ts = 0; + + break; + + default: + fprintf(stderr, "Completion for unknown wr_id %d\n", + (int)wr_id); + return 1; + } + + ctx->pending &= ~(int)wr_id; + if (*scnt < iters && !ctx->pending) { + if (pp_post_swap(ctx, &rem_dest, 0ULL, 1ULL)) { + fprintf(stderr, "Couldn't post send\n"); + return 1; + } + printf("After pp_post_swap\n"); + ctx->pending = PINGPONG_RECV_WRID | + PINGPONG_SEND_WRID; + } + + return 0; +} + +static void usage(const char *argv0) +{ + printf("Usage:\n"); + printf(" %s start a server and wait for connection\n", argv0); + printf(" %s connect to server at \n", argv0); + printf("\n"); + printf("Options:\n"); + printf(" -p, --port= listen on/connect to port (default 18515)\n"); + printf(" -d, --ib-dev= use IB device (default first device found)\n"); + printf(" -i, --ib-port= use port of IB device (default 1)\n"); + printf(" -s, --size= size of message to exchange (default 4096)\n"); + printf(" -m, --mtu= path MTU (default 1024)\n"); + printf(" -r, --rx-depth= number of receives to post at a time (default 500)\n"); + printf(" -n, --iters= number of exchanges (default 1000)\n"); + printf(" -l, --sl= service level value\n"); + printf(" -g, --gid-idx= local port gid index\n"); + printf(" -o, --odp use on demand paging\n"); + printf(" -O, --iodp use implicit on demand paging\n"); + printf(" -P, --prefetch prefetch an ODP MR\n"); + printf(" -t, --ts get CQE with timestamp\n"); + printf(" -c, --chk validate received buffer\n"); + printf(" -j, --dm use device memory\n"); +} + +int main(int argc, char *argv[]) +{ + struct ibv_device **dev_list; + struct ibv_device *ib_dev; + struct pingpong_context *ctx; + struct pingpong_dest my_dest; + struct pingpong_dest *rem_dest; + struct timeval start, end; + char *ib_devname = NULL; + char *servername = NULL; + unsigned int port = 18515; + int ib_port = 1; + unsigned int size = 4096; + enum ibv_mtu mtu = IBV_MTU_1024; + unsigned int rx_depth = 500; + unsigned int iters = 1000; + int routs; + int rcnt, scnt; + int num_cq_events = 0; + int sl = 0; + int gidx = -1; + char gid[33]; + struct ts_params ts; + int comm_rank, comm_size; + + srand48(getpid() * time(NULL)); + + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank); + MPI_Comm_size(MPI_COMM_WORLD, &comm_size); + + while (1) { + int c; + + static struct option long_options[] = { + { .name = "port", .has_arg = 1, .val = 'p' }, + { .name = "ib-dev", .has_arg = 1, .val = 'd' }, + { .name = "ib-port", .has_arg = 1, .val = 'i' }, + { .name = "size", .has_arg = 1, .val = 's' }, + { .name = "mtu", .has_arg = 1, .val = 'm' }, + { .name = "rx-depth", .has_arg = 1, .val = 'r' }, + { .name = "iters", .has_arg = 1, .val = 'n' }, + { .name = "sl", .has_arg = 1, .val = 'l' }, + { .name = "events", .has_arg = 0, .val = 'e' }, + { .name = "gid-idx", .has_arg = 1, .val = 'g' }, + { .name = "odp", .has_arg = 0, .val = 'o' }, + { .name = "iodp", .has_arg = 0, .val = 'O' }, + { .name = "prefetch", .has_arg = 0, .val = 'P' }, + { .name = "ts", .has_arg = 0, .val = 't' }, + { .name = "chk", .has_arg = 0, .val = 'c' }, + { .name = "dm", .has_arg = 0, .val = 'j' }, + { .name = "new_send", .has_arg = 0, .val = 'N' }, + {} + }; + + c = getopt_long(argc, argv, "p:d:i:s:m:r:n:l:eg:oOPtcjN", + long_options, NULL); + + if (c == -1) + break; + + switch (c) { + case 'p': + port = strtoul(optarg, NULL, 0); + if (port > 65535) { + usage(argv[0]); + return 1; + } + break; + + case 'd': + ib_devname = strdupa(optarg); + break; + + case 'i': + ib_port = strtol(optarg, NULL, 0); + if (ib_port < 1) { + usage(argv[0]); + return 1; + } + break; + + case 's': + size = strtoul(optarg, NULL, 0); + break; + + case 'm': + mtu = pp_mtu_to_enum(strtol(optarg, NULL, 0)); + if (mtu == 0) { + usage(argv[0]); + return 1; + } + break; + + case 'r': + rx_depth = strtoul(optarg, NULL, 0); + break; + + case 'n': + iters = strtoul(optarg, NULL, 0); + break; + + case 'l': + sl = strtol(optarg, NULL, 0); + break; + + case 'g': + gidx = strtol(optarg, NULL, 0); + break; + + case 'P': + prefetch_mr = 1; + break; + case 'c': + validate_buf = 1; + break; + + default: + usage(argv[0]); + return 1; + } + } + + if (optind == argc - 1) + servername = strdupa(argv[optind]); + else if (optind < argc) { + usage(argv[0]); + return 1; + } + + if ( prefetch_mr) { + fprintf(stderr, "prefetch is valid only with on-demand memory region\n"); + return 1; + } + + page_size = sysconf(_SC_PAGESIZE); + + dev_list = ibv_get_device_list(NULL); + if (!dev_list) { + perror("Failed to get IB devices list"); + return 1; + } + + if (!ib_devname) { + ib_dev = *dev_list; + if (!ib_dev) { + fprintf(stderr, "No IB devices found\n"); + return 1; + } + } else { + int i; + for (i = 0; dev_list[i]; ++i) + if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname)) + break; + ib_dev = dev_list[i]; + if (!ib_dev) { + fprintf(stderr, "IB device %s not found\n", ib_devname); + return 1; + } + } + + ctx = pp_init_ctx(ib_dev, size, rx_depth, ib_port); + if (!ctx) + return 1; + + routs = pp_post_recv(ctx, ctx->rx_depth); + if (routs < ctx->rx_depth) { + fprintf(stderr, "Couldn't post receive (%d)\n", routs); + return 1; + } + + + if (pp_get_port_info(ctx->context, ib_port, &ctx->portinfo)) { + fprintf(stderr, "Couldn't get port info\n"); + return 1; + } + + my_dest.lid = ctx->portinfo.lid; + if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && + !my_dest.lid) { + fprintf(stderr, "Couldn't get local LID\n"); + return 1; + } + + if (gidx >= 0) { + if (ibv_query_gid(ctx->context, ib_port, gidx, &my_dest.gid)) { + fprintf(stderr, "can't read sgid of index %d\n", gidx); + return 1; + } + } else + memset(&my_dest.gid, 0, sizeof my_dest.gid); + + my_dest.qpn = ctx->qp->qp_num; + my_dest.psn = lrand48() & 0xffffff; + inet_ntop(AF_INET6, &my_dest.gid, gid, sizeof gid); + printf(" local address: LID 0x%04x, QPN 0x%06x, PSN 0x%06x, GID %s\n", + my_dest.lid, my_dest.qpn, my_dest.psn, gid); + + + if (comm_rank == 0) + rem_dest = pp_server_exch_dest(ctx, ib_port, mtu, port, sl, &my_dest, gidx); + else + rem_dest = pp_client_exch_dest(servername, port, &my_dest); + + if (!rem_dest) + return 1; + + inet_ntop(AF_INET6, &rem_dest->gid, gid, sizeof gid); + printf(" remote address: LID 0x%04x, QPN 0x%06x, PSN 0x%06x, GID %s\n", + rem_dest->lid, rem_dest->qpn, rem_dest->psn, gid); + + if (comm_rank != 0) + if (pp_connect_ctx(ctx, ib_port, my_dest.psn, mtu, sl, rem_dest, + gidx)) + return 1; + + ctx->pending = PINGPONG_RECV_WRID; + + if (comm_rank != 0) { + if (validate_buf) + for (int i = 0; i < size; i += page_size) + ctx->buf[i] = i / page_size % sizeof(char); + + if (pp_post_swap(ctx, rem_dest, 0ULL, 1ULL)) { + //if (pp_post_send(ctx)) { + fprintf(stderr, "Couldn't post send\n"); + return 1; + } + printf("After pp_post_swap\n"); + ctx->pending |= PINGPONG_SEND_WRID; + } + + if (gettimeofday(&start, NULL)) { + perror("gettimeofday"); + return 1; + } + + rcnt = scnt = 0; + if (comm_rank == 0) { + + } + while (rcnt < iters || scnt < iters) { + int ret; + + + int ne, i; + struct ibv_wc wc[2]; + + do { + ne = ibv_poll_cq(pp_cq(ctx), 2, wc); + if (ne < 0) { + fprintf(stderr, "poll CQ failed %d\n", ne); + return 1; + } + } while (ne < 1); + + for (i = 0; i < ne; ++i) { + ret = parse_single_wc(ctx, &scnt, &rcnt, &routs, + iters, + wc[i].wr_id, + wc[i].status, + 0, &ts, rem_dest); + if (ret) { + fprintf(stderr, "parse WC failed %d\n", ne); + return 1; + } + } + } + + if (gettimeofday(&end, NULL)) { + perror("gettimeofday"); + return 1; + } + + { + float usec = (end.tv_sec - start.tv_sec) * 1000000 + + (end.tv_usec - start.tv_usec); + long long bytes = (long long) size * iters * 2; + + printf("%lld bytes in %.2f seconds = %.2f Mbit/sec\n", + bytes, usec / 1000000., bytes * 8. / usec); + printf("%d iters in %.2f seconds = %.2f usec/iter\n", + iters, usec / 1000000., usec / iters); + + if ((comm_rank == 0) && (validate_buf)) { + for (int i = 0; i < size; i += page_size) + if (ctx->buf[i] != i / page_size % sizeof(char)) + printf("invalid data in page %d\n", + i / page_size); + } + } + + ibv_ack_cq_events(pp_cq(ctx), num_cq_events); + + if (pp_close_ctx(ctx)) + return 1; + + ibv_free_device_list(dev_list); + free(rem_dest); + + MPI_Finalize(); + + return 0; +} diff --git a/include/debug/lpf/core.h b/include/debug/lpf/core.h index 028e015f..03e5f6aa 100644 --- a/include/debug/lpf/core.h +++ b/include/debug/lpf/core.h @@ -64,6 +64,12 @@ extern "C" { #define lpf_sync( ctx, attrs ) \ lpf_debug_sync( __FILE__, __LINE__, (ctx), (attrs) ) +#define lpf_counting_sync_per_tag( ctx, attrs, slot, expected_sends, expected_rcvs ) \ + lpf_debug_counting_sync_per_tag( __FILE__, __LINE__, (ctx), (attrs), (slot), (expected_sends), (expected_rcvs) ) + +#define lpf_sync_per_tag( ctx, attrs, slot) \ + lpf_debug_sync_per_tag( __FILE__, __LINE__, (ctx), (attrs), (slot)) + #define lpf_resize_memory_register( ctx, size ) \ lpf_debug_resize_memory_register( __FILE__, __LINE__, (ctx), (size) ) @@ -125,6 +131,9 @@ extern _LPFLIB_API lpf_err_t lpf_debug_sync( const char * file, int line, lpf_t ctx, lpf_sync_attr_t attr ); +lpf_err_t lpf_debug_counting_sync_per_tag( const char * file, int line, + lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sends, size_t expected_rcvs); + extern _LPFLIB_API lpf_err_t lpf_debug_resize_memory_register( const char * file, int line, lpf_t ctx, size_t max_regs ); diff --git a/include/lpf/collectives.h b/include/lpf/collectives.h index 4304c5f0..871b7f27 100644 --- a/include/lpf/collectives.h +++ b/include/lpf/collectives.h @@ -116,6 +116,16 @@ typedef void (*lpf_combiner_t) (size_t n, const void * combine, void * into ); */ extern _LPFLIB_API const lpf_coll_t LPF_INVALID_COLL; +/** + * ToDo: document allgatherv + */ +lpf_err_t lpf_allgatherv( + lpf_coll_t coll, + lpf_memslot_t src, + lpf_memslot_t dst, + size_t *sizes, + bool exclude_myself + ); /** * Initialises a collectives struct, which allows the scheduling of collective * calls. The initialised struct is only valid after a next call to lpf_sync(). diff --git a/include/lpf/core.h b/include/lpf/core.h index 42872f15..90a26b9e 100644 --- a/include/lpf/core.h +++ b/include/lpf/core.h @@ -2058,6 +2058,25 @@ lpf_err_t lpf_get( extern _LPFLIB_API lpf_err_t lpf_sync( lpf_t ctx, lpf_sync_attr_t attr ); +/** + * This synchronisation waits on memory slot @slot to complete sending + * and receiving @expected_sent and @expected_rcvd messages. The counts are + * checked in the ibv_poll_cq calls and associated to certain LPF slots. + * This call is only implemented for IB verbs at the moment. + */ +extern _LPFLIB_API +lpf_err_t lpf_counting_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd); + +/** + * This synchronisation waits on memory slot @slot to complete sending + * or receiving all outstanding messages. For the current implementation + * in IB verbs, this means all scheduled sends via ibv_post_send are + * checked for completion via ibv_poll_cq. Currently, there is no logic + * scheduling receives, but only sends -- for either get or put. + */ +extern _LPFLIB_API +lpf_err_t lpf_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot); + /** * This primitive allows a user to inspect the machine that this LPF program * has been assigned. All resources reported in the #lpf_machine_t struct are @@ -2315,6 +2334,68 @@ lpf_err_t lpf_resize_memory_register( lpf_t ctx, size_t max_regs ); extern _LPFLIB_API lpf_err_t lpf_resize_message_queue( lpf_t ctx, size_t max_msgs ); +extern _LPFLIB_API +lpf_err_t lpf_lock_slot( + lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +); + +extern _LPFLIB_API +lpf_err_t lpf_unlock_slot( + lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +); + +/** + * This function returns in @rcvd_msgs the received message count on LPF slot @slot + */ +extern _LPFLIB_API +lpf_err_t lpf_get_rcvd_msg_count_per_slot( lpf_t ctx, size_t *rcvd_msgs, lpf_memslot_t slot); + +/** + * This function returns in @rcvd_msgs the total received message count + */ +extern _LPFLIB_API +lpf_err_t lpf_get_rcvd_msg_count( lpf_t ctx, size_t *rcvd_msgs); + +/** + * This function returns in @sent_msgs the sent message count on LPF slot @slot + */ +extern _LPFLIB_API +lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t ctx, size_t *sent_msgs, lpf_memslot_t slot); + +/** + * This function blocks until all the scheduled send messages + * (via ibv_post_send) are actually registered as sent (via ibv_poll_cq). + * No concept of slots is used here. + * This allows to reuse the send buffers e.g. in higher-level channel + * libraries. + */ +extern _LPFLIB_API +lpf_err_t lpf_flush_sent( lpf_t ctx); + +/** + * This function blocks until all the incoming received messages + * waiting on the receive completion queue are handled (via ibv_poll_cq). + * No concept of slots is used here. + * This allows to reuse the send buffers e.g. in higher-level channel + * libraries. + */ +extern _LPFLIB_API +lpf_err_t lpf_flush_received( lpf_t ctx); + #ifdef __cplusplus } #endif diff --git a/include/lpf/static_dispatch.h b/include/lpf/static_dispatch.h index 8df6a092..7cd24263 100644 --- a/include/lpf/static_dispatch.h +++ b/include/lpf/static_dispatch.h @@ -40,8 +40,15 @@ #undef lpf_get #undef lpf_put #undef lpf_sync +#undef lpf_counting_sync_per_slot +#undef lpf_sync_per_slot #undef lpf_register_local +#undef lpf_get_rcvd_msg_count +#undef lpf_get_rcvd_msg_count_per_slot +#undef lpf_get_sent_msg_count_per_slot #undef lpf_register_global +#undef lpf_flush_sent +#undef lpf_flush_received #undef lpf_deregister #undef lpf_probe #undef lpf_resize_memory_register @@ -83,7 +90,14 @@ #define lpf_get LPF_FUNC(get) #define lpf_put LPF_FUNC(put) #define lpf_sync LPF_FUNC(sync) +#define lpf_counting_sync_per_slot LPF_FUNC(counting_sync_per_slot) +#define lpf_sync_per_slot LPF_FUNC(sync_per_slot) #define lpf_register_local LPF_FUNC(register_local) +#define lpf_get_rcvd_msg_count LPF_FUNC(get_rcvd_msg_count) +#define lpf_get_rcvd_msg_count_per_slot LPF_FUNC(get_rcvd_msg_count_per_slot) +#define lpf_get_sent_msg_count_per_slot LPF_FUNC(get_sent_msg_count_per_slot) +#define lpf_flush_sent LPF_FUNC(flush_sent) +#define lpf_flush_received LPF_FUNC(flush_received) #define lpf_register_global LPF_FUNC(register_global) #define lpf_deregister LPF_FUNC(deregister) #define lpf_probe LPF_FUNC(probe) diff --git a/lpfrun.in b/lpfrun.in index 640fdc00..ce9c6ff9 100644 --- a/lpfrun.in +++ b/lpfrun.in @@ -57,7 +57,7 @@ function printhelp() echo echo " -engine " echo " Allow you to choose the engine. Currently supported" - echo " are: pthread, mpirma, mpimsg, ibverbs, hybrid" + echo " are: pthread, mpirma, mpimsg, ibverbs, hicr, hybrid" echo echo " -probe " echo " Set the number of seconds to probe the system for BSP" @@ -846,7 +846,7 @@ case $engine in exit_status=$? ;; - mpirma|mpimsg|ibverbs) + mpirma|mpimsg|ibverbs|hicr) mpi_impl=$(mpi_detect) proc_args= diff --git a/src/MPI/CMakeLists.txt b/src/MPI/CMakeLists.txt index beca3129..10a41fc6 100644 --- a/src/MPI/CMakeLists.txt +++ b/src/MPI/CMakeLists.txt @@ -24,6 +24,7 @@ if (MPI_FOUND) if (LIB_IBVERBS) list(APPEND MPI_ENGINES ibverbs) + list(APPEND MPI_ENGINES hicr) endif() if (MPI_OPEN_PORT) @@ -53,7 +54,7 @@ if (MPI_FOUND) set(comlib "lpf_common_${LPFLIB_CONFIG_NAME}") set(ibverbs_sources) - if (LPF_IMPL_ID STREQUAL ibverbs) + if (LPF_IMPL_ID STREQUAL ibverbs OR LPF_IMPL_ID STREQUAL hicr) set(ibverbs_sources ibverbs.cpp) endif() @@ -70,7 +71,7 @@ if (MPI_FOUND) spall2all.c messagesort.cpp spall2all.cpp - init.cpp + init.cpp ${ibverbs_sources} ) @@ -136,7 +137,7 @@ if (MPI_FOUND) ${LIB_POSIX_THREADS} ) - if (engine STREQUAL ibverbs) + if (engine STREQUAL ibverbs OR engine STREQUAL hicr) target_link_libraries(${target} ${LIB_IBVERBS}) endif() endfunction() @@ -201,6 +202,11 @@ if (MPI_FOUND) if (LIB_IBVERBS AND LPF_ENABLE_TESTS) add_gtest_mpi( ibverbs_test "1;2;5;10" ibverbs.t.cpp ibverbs.cpp $ mpilib.cpp) + target_compile_definitions(ibverbs_test + PRIVATE "LPF_CORE_MPI_USES_ibverbs=1" + "LPF_CORE_WARM_UP_PROBE=1" + "LPF_CORE_IMPL_ID=ibverbs" + "LPF_CORE_IMPL_CONFIG=${LPF_IMPL_CONFIG}") target_link_libraries( ibverbs_test ${LIB_IBVERBS}) endif() diff --git a/src/MPI/core.cpp b/src/MPI/core.cpp index 112403e6..1049c4d2 100644 --- a/src/MPI/core.cpp +++ b/src/MPI/core.cpp @@ -262,6 +262,102 @@ lpf_err_t lpf_sync( lpf_t ctx, lpf_sync_attr_t attr ) return realContext(ctx)->sync(); } + +lpf_err_t lpf_lock_slot( lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +) +{ + (void) attr; // ignore parameter 'msg' since this implementation only + // implements core functionality + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) + i->lockSlot( src_slot, src_offset, dst_pid, dst_slot, dst_offset, size ); + return LPF_SUCCESS; +} + +lpf_err_t lpf_unlock_slot( lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +) +{ + (void) attr; // ignore parameter 'msg' since this implementation only + // implements core functionality + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) + i->unlockSlot( src_slot, src_offset, dst_pid, dst_slot, dst_offset, size ); + return LPF_SUCCESS; +} + +lpf_err_t lpf_counting_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + (void) attr; // ignore attr parameter since this implementation only + // implements core functionality + return realContext(ctx)->countingSyncPerSlot(slot, expected_sent, expected_rcvd); +} + +lpf_err_t lpf_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot) +{ + (void) attr; // ignore attr parameter since this implementation only + // implements core functionality + return realContext(ctx)->syncPerSlot(slot); +} + +lpf_err_t lpf_get_rcvd_msg_count_per_slot( lpf_t ctx, size_t * rcvd_msgs, lpf_memslot_t slot) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->getRcvdMsgCountPerSlot(rcvd_msgs, slot); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_rcvd_msg_count( lpf_t ctx, size_t * rcvd_msgs) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->getRcvdMsgCount(rcvd_msgs); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t ctx, size_t * sent_msgs, lpf_memslot_t slot) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->getSentMsgCountPerSlot(sent_msgs, slot); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_flush_sent( lpf_t ctx) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->flushSent(); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_flush_received( lpf_t ctx) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->flushReceived(); + } + return LPF_SUCCESS; +} + lpf_err_t lpf_probe( lpf_t ctx, lpf_machine_t * params ) { lpf::Interface * i = realContext(ctx); diff --git a/src/MPI/ibverbs.cpp b/src/MPI/ibverbs.cpp index 44852caa..30d8519c 100644 --- a/src/MPI/ibverbs.cpp +++ b/src/MPI/ibverbs.cpp @@ -22,10 +22,17 @@ #include #include +#include +#include +#define POLL_BATCH 64 +#define MAX_POLLING 128 +#define ARRAY_SIZE 1000 -namespace lpf { namespace mpi { +namespace lpf { + +namespace mpi { struct IBVerbs::Exception : std::runtime_error { Exception(const char * what) : std::runtime_error( what ) {} @@ -59,7 +66,6 @@ IBVerbs :: IBVerbs( Communication & comm ) , m_maxSrs(0) , m_device() , m_pd() - , m_cq() , m_stagedQps( m_nprocs ) , m_connectedQps( m_nprocs ) , m_srs() @@ -68,12 +74,38 @@ IBVerbs :: IBVerbs( Communication & comm ) , m_activePeers(0, m_nprocs) , m_peerList() , m_sges() +#ifdef LPF_CORE_MPI_USES_hicr + , m_cqLocal() + , m_cqRemote() + , m_cqSize(1) + , m_postCount(0) + , m_recvCount(0) + , m_numMsgs(0) + //, m_sendTotalInitMsgCount(0) + , m_recvTotalInitMsgCount(0) + , m_sentMsgs(0) + , m_recvdMsgs(0) +#endif +#ifdef LPF_CORE_MPI_USES_ibverbs , m_wcs(m_nprocs) + , m_cq() +#endif , m_memreg() , m_dummyMemReg() , m_dummyBuffer() , m_comm( comm ) { + + // arrays instead of hashmap for counters + #ifdef LPF_CORE_MPI_USES_hicr + m_recvInitMsgCount.resize(ARRAY_SIZE, 0); + m_getInitMsgCount.resize(ARRAY_SIZE, 0); + m_sendInitMsgCount.resize(ARRAY_SIZE, 0); + rcvdMsgCount.resize(ARRAY_SIZE, 0); + sentMsgCount.resize(ARRAY_SIZE, 0); + slotActive.resize(ARRAY_SIZE, 0); +#endif + m_peerList.reserve( m_nprocs ); int numDevices = -1; @@ -183,6 +215,35 @@ IBVerbs :: IBVerbs( Communication & comm ) } LOG(3, "Opened protection domain"); +#ifdef LPF_CORE_MPI_USES_hicr + m_cqLocal.reset(ibv_create_cq( m_device.get(), 1, NULL, NULL, 0 ), ibv_destroy_cq); + m_cqRemote.reset(ibv_create_cq( m_device.get(), m_nprocs, NULL, NULL, 0 ), ibv_destroy_cq); + /** + * New notification functionality for HiCR + */ + struct ibv_srq_init_attr srq_init_attr; + srq_init_attr.srq_context = NULL; + srq_init_attr.attr.max_wr = m_deviceAttr.max_srq_wr; + srq_init_attr.attr.max_sge = m_deviceAttr.max_srq_sge; + srq_init_attr.attr.srq_limit = 0; + m_srq.reset(ibv_create_srq(m_pd.get(), &srq_init_attr ), + ibv_destroy_srq); + + + m_cqLocal.reset(ibv_create_cq( m_device.get(), m_cqSize, NULL, NULL, 0), ibv_destroy_cq); + if (!m_cqLocal) { + LOG(1, "Could not allocate completion queue with '" + << m_nprocs << " entries" ); + throw Exception("Could not allocate completion queue"); + } + m_cqRemote.reset(ibv_create_cq( m_device.get(), m_cqSize * m_nprocs, NULL, NULL, 0), ibv_destroy_cq); + if (!m_cqLocal) { + LOG(1, "Could not allocate completion queue with '" + << m_nprocs << " entries" ); + throw Exception("Could not allocate completion queue"); + } +#endif +#ifdef LPF_CORE_MPI_USES_ibverbs struct ibv_cq * const ibv_cq_new_p = ibv_create_cq( m_device.get(), m_nprocs, NULL, NULL, 0 ); if( ibv_cq_new_p == NULL ) m_cq.reset(); @@ -193,6 +254,7 @@ IBVerbs :: IBVerbs( Communication & comm ) << m_nprocs << " entries" ); throw Exception("Could not allocate completion queue"); } +#endif LOG(3, "Allocated completion queue with " << m_nprocs << " entries."); @@ -220,8 +282,45 @@ IBVerbs :: ~IBVerbs() } +inline void IBVerbs :: tryIncrement(Op op, Phase phase, SlotID slot) { + + switch (phase) { + case Phase::INIT: + rcvdMsgCount[slot] = 0; + m_recvInitMsgCount[slot] = 0; + m_getInitMsgCount[slot] = 0; + sentMsgCount[slot] = 0; + m_sendInitMsgCount[slot] = 0; + slotActive[slot] = true; + break; + case Phase::PRE: + if (op == Op::SEND) { + m_numMsgs++; + //m_sendTotalInitMsgCount++; + m_sendInitMsgCount[slot]++; + } + if (op == Op::RECV || op == Op::GET) { + m_recvTotalInitMsgCount++; + m_recvInitMsgCount[slot]++; + } + break; + case Phase::POST: + if (op == Op::RECV || op == Op::GET) { + m_recvTotalInitMsgCount++; + m_recvdMsgs ++; + rcvdMsgCount[slot]++; + } + if (op == Op::SEND) { + m_sentMsgs++; + sentMsgCount[slot]++; + } + break; + } +} + void IBVerbs :: stageQPs( size_t maxMsgs ) { + LOG(1, "Enter stageQPs"); // create the queue pairs for ( int i = 0; i < m_nprocs; ++i) { struct ibv_qp_init_attr attr; @@ -229,10 +328,17 @@ void IBVerbs :: stageQPs( size_t maxMsgs ) attr.qp_type = IBV_QPT_RC; // we want reliable connection attr.sq_sig_all = 0; // only wait for selected messages - attr.send_cq = m_cq.get(); - attr.recv_cq = m_cq.get(); +#ifdef LPF_CORE_MPI_USES_hicr + attr.send_cq = m_cqLocal.get(); + attr.recv_cq = m_cqRemote.get(); + attr.srq = m_srq.get(); attr.cap.max_send_wr = std::min(maxMsgs + m_minNrMsgs,m_maxSrs); attr.cap.max_recv_wr = 1; // one for the dummy +#endif +#ifdef LPF_CORE_MPI_USES_ibverbs + attr.send_cq = m_cq.get(); + attr.recv_cq = m_cq.get(); +#endif attr.cap.max_send_sge = 1; attr.cap.max_recv_sge = 1; @@ -247,10 +353,72 @@ void IBVerbs :: stageQPs( size_t maxMsgs ) throw std::bad_alloc(); } - LOG(3, "Created new Queue pair for " << m_pid << " -> " << i ); + LOG(3, "Created new Queue pair for " << m_pid << " -> " << i << " with qp_num = " << ibv_new_qp_p->qp_num); } } +void IBVerbs :: doRemoteProgress() { + struct ibv_wc wcs[POLL_BATCH]; + struct ibv_recv_wr wr; + struct ibv_sge sg; + struct ibv_recv_wr *bad_wr; + sg.addr = (uint64_t) NULL; + sg.length = 0; + sg.lkey = 0; + wr.next = NULL; + wr.sg_list = &sg; + wr.num_sge = 0; + wr.wr_id = 66; + int pollResult, totalResults = 0; + do { + pollResult = ibv_poll_cq(m_cqRemote.get(), POLL_BATCH, wcs); + if (pollResult > 0) { + LOG(3, "Process " << m_pid << " signals: I received " << pollResult << " remote messages in doRemoteProgress"); + } + else if (pollResult < 0) + { + LOG( 1, "Failed to poll IB completion queue" ); + throw Exception("Poll CQ failure"); + } + + for(int i = 0; i < pollResult; i++) { + if (wcs[i].status != IBV_WC_SUCCESS) { + LOG( 2, "Got bad completion status from IB message." + " status = 0x" << std::hex << wcs[i].status + << ", vendor syndrome = 0x" << std::hex + << wcs[i].vendor_err ); + } + else { + LOG(2, "Process " << m_pid << " Recv wcs[" << i << "].src_qp = "<< wcs[i].src_qp); + LOG(2, "Process " << m_pid << " Recv wcs[" << i << "].slid = "<< wcs[i].slid); + LOG(2, "Process " << m_pid << " Recv wcs[" << i << "].wr_id = "<< wcs[i].wr_id); + LOG(2, "Process " << m_pid << " Recv wcs[" << i << "].imm_data = "<< wcs[i].imm_data); + + /** + * Here is a trick: + * The sender sends relatively generic LPF memslot ID. + * But for IB Verbs, we need to translate that into + * an IB Verbs slot via @getVerbID -- or there will be + * a mismatch when IB Verbs looks up the slot ID + */ + + // Note: Ignore compare-and-swap atomics! + if (wcs[i].opcode != IBV_WC_COMP_SWAP) { + SlotID slot; + // This receive is from a PUT call + if (wcs[i].opcode == IBV_WC_RECV_RDMA_WITH_IMM) { + slot = wcs[i].imm_data; + tryIncrement(Op::RECV, Phase::POST, slot); + LOG(3, "Rank " << m_pid << " increments received message count to " << rcvdMsgCount[slot] << " for LPF slot " << slot); + } + } + ibv_post_srq_recv(m_srq.get(), &wr, &bad_wr); + } + } + if(pollResult > 0) totalResults += pollResult; + } while (pollResult == POLL_BATCH && totalResults < MAX_POLLING); +} + void IBVerbs :: reconnectQPs() { ASSERT( m_stagedQps[0] ); @@ -306,7 +474,12 @@ void IBVerbs :: reconnectQPs() attr.qp_state = IBV_QPS_INIT; attr.port_num = m_ibPort; attr.pkey_index = 0; +#ifdef LPF_CORE_MPI_USES_hicr + attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC; +#endif +#ifdef LPF_CORE_MPI_USES_ibverbs attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE; +#endif flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS; if ( ibv_modify_qp(m_stagedQps[i].get(), &attr, flags) ) { LOG(1, "Cannot bring state of QP " << i << " to INIT"); @@ -326,10 +499,12 @@ void IBVerbs :: reconnectQPs() rr.sg_list = &sge; rr.num_sge = 1; +#ifdef LPF_CORE_MPI_USES_ibverbs if (ibv_post_recv(m_stagedQps[i].get(), &rr, &bad_wr)) { LOG(1, "Cannot post a single receive request to QP " << i ); throw Exception("Could not post dummy receive request"); } +#endif // Bring QP to RTR std::memset(&attr, 0, sizeof(attr)); @@ -421,6 +596,8 @@ void IBVerbs :: resizeMemreg( size_t size ) void IBVerbs :: resizeMesgq( size_t size ) { + +#ifdef LPF_CORE_MPI_USES_ibverbs ASSERT( m_srs.max_size() > m_minNrMsgs ); if ( size > m_srs.max_size() - m_minNrMsgs ) @@ -433,6 +610,41 @@ void IBVerbs :: resizeMesgq( size_t size ) m_sges.reserve( size + m_minNrMsgs ); stageQPs(size); +#endif + +#ifdef LPF_CORE_MPI_USES_hicr + + m_cqSize = std::min(size,m_maxSrs/4); + size_t remote_size = std::min(m_cqSize*m_nprocs,m_maxSrs/4); + if (m_cqLocal) { + ibv_resize_cq(m_cqLocal.get(), m_cqSize); + } + if(remote_size >= m_postCount){ + if (m_cqRemote) { + ibv_resize_cq(m_cqRemote.get(), remote_size); + } + } + stageQPs(m_cqSize); + if(remote_size >= m_postCount){ + if (m_srq) { + struct ibv_recv_wr wr; + struct ibv_sge sg; + struct ibv_recv_wr *bad_wr; + sg.addr = (uint64_t) NULL; + sg.length = 0; + sg.lkey = 0; + wr.next = NULL; + wr.sg_list = &sg; + wr.num_sge = 0; + wr.wr_id = m_pid; + for(int i = m_postCount; i < (int)remote_size; ++i){ + ibv_post_srq_recv(m_srq.get(), &wr, &bad_wr); + m_postCount++; + } + } + } +#endif + LOG(4, "Message queue has been reallocated to size " << size ); } @@ -445,7 +657,12 @@ IBVerbs :: SlotID IBVerbs :: regLocal( void * addr, size_t size ) LOG(4, "Registering locally memory area at " << addr << " of size " << size ); struct ibv_mr * const ibv_mr_new_p = ibv_reg_mr( m_pd.get(), addr, size, +#ifdef LPF_CORE_MPI_USES_hicr + IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC +#endif +#ifdef LPF_CORE_MPI_USES_ibverbs IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE +#endif ); if( ibv_mr_new_p == NULL ) slot.mr.reset(); @@ -464,6 +681,9 @@ IBVerbs :: SlotID IBVerbs :: regLocal( void * addr, size_t size ) local.rkey = size?slot.mr->rkey:0; SlotID id = m_memreg.addLocalReg( slot ); +#ifdef LPF_CORE_MPI_USES_hicr + tryIncrement(Op::SEND/* <- dummy for init */, Phase::INIT, id); +#endif m_memreg.update( id ).glob.resize( m_nprocs ); m_memreg.update( id ).glob[m_pid] = local; @@ -480,7 +700,12 @@ IBVerbs :: SlotID IBVerbs :: regGlobal( void * addr, size_t size ) LOG(4, "Registering globally memory area at " << addr << " of size " << size ); struct ibv_mr * const ibv_mr_new_p = ibv_reg_mr( m_pd.get(), addr, size, +#ifdef LPF_CORE_MPI_USES_hicr + IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC +#endif +#ifdef LPF_CORE_MPI_USES_ibverbs IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE +#endif ); if( ibv_mr_new_p == NULL ) slot.mr.reset(); @@ -497,6 +722,9 @@ IBVerbs :: SlotID IBVerbs :: regGlobal( void * addr, size_t size ) throw Exception("Another process could not register memory area"); SlotID id = m_memreg.addGlobalReg( slot ); +#ifdef LPF_CORE_MPI_USES_hicr + tryIncrement(Op::SEND/* <- dummy for init */, Phase::INIT, id); +#endif MemorySlot & ref = m_memreg.update(id); // exchange memory registration info globally ref.glob.resize(m_nprocs); @@ -516,13 +744,150 @@ IBVerbs :: SlotID IBVerbs :: regGlobal( void * addr, size_t size ) void IBVerbs :: dereg( SlotID id ) { +#ifdef LPF_CORE_MPI_USES_hicr + slotActive[id] = false; + m_recvInitMsgCount[id] = 0; + m_getInitMsgCount[id] = 0; + m_sendInitMsgCount[id] = 0; + rcvdMsgCount[id] = 0; + sentMsgCount[id] = 0; +#endif m_memreg.removeReg( id ); LOG(4, "Memory area of slot " << id << " has been deregistered"); } + +void IBVerbs :: blockingCompareAndSwap(SlotID srcSlot, size_t srcOffset, int dstPid, SlotID dstSlot, size_t dstOffset, size_t size, uint64_t compare_add, uint64_t swap) +{ + const MemorySlot & src = m_memreg.lookup( srcSlot ); + const MemorySlot & dst = m_memreg.lookup( dstSlot); + + char * localAddr + = static_cast(src.glob[m_pid].addr) + srcOffset; + const char * remoteAddr + = static_cast(dst.glob[dstPid].addr) + dstOffset; + + struct ibv_sge sge; + memset(&sge, 0, sizeof(sge)); + sge.addr = reinterpret_cast( localAddr ); + sge.length = std::min(size, m_maxMsgSize ); + sge.lkey = src.mr->lkey; + + struct ibv_wc wcs[POLL_BATCH]; + struct ibv_send_wr wr; + memset(&wr, 0, sizeof(wr)); + wr.wr_id = srcSlot; + wr.sg_list = &sge; + wr.next = NULL; // this needs to be set, otherwise EINVAL return error in ibv_post_send + wr.num_sge = 1; + wr.opcode = IBV_WR_ATOMIC_CMP_AND_SWP; + wr.send_flags = IBV_SEND_SIGNALED; + wr.wr.atomic.remote_addr = reinterpret_cast(remoteAddr); + wr.wr.atomic.compare_add = compare_add; + wr.wr.atomic.swap = swap; + wr.wr.atomic.rkey = dst.glob[dstPid].rkey; + struct ibv_send_wr *bad_wr; + int error; + std::vector opcodes; + +blockingCompareAndSwap: + if (int err = ibv_post_send(m_connectedQps[dstPid].get(), &wr, &bad_wr )) + { + LOG(1, "Error while posting RDMA requests: " << std::strerror(err) ); + throw Exception("Error while posting RDMA requests"); + } + + /** + * Keep waiting on a completion of events until you + * register a completed atomic compare-and-swap + */ + do { + opcodes = wait_completion(error); + if (error) { + LOG(1, "Error in wait_completion"); + std::abort(); + } + } while (std::find(opcodes.begin(), opcodes.end(), IBV_WC_COMP_SWAP) == opcodes.end()); + + uint64_t * remoteValueFound = reinterpret_cast(localAddr); + /* + * if we fetched the value we expected, then + * we are holding the lock now (that is, we swapped successfully!) + * else, re-post your request for the lock + */ + if (remoteValueFound[0] != compare_add) { + LOG(4, "Process " << m_pid << " couldn't get the lock. remoteValue = " << remoteValueFound[0] << " compare_add = " << compare_add << " go on, iterate\n"); + goto blockingCompareAndSwap; + } + else { + LOG(4, "Process " << m_pid << " reads value " << remoteValueFound[0] << " and expected = " << compare_add <<" gets the lock, done\n"); + } + // else we hold the lock and swap value into the remote slot ... +} + void IBVerbs :: put( SlotID srcSlot, size_t srcOffset, int dstPid, SlotID dstSlot, size_t dstOffset, size_t size ) { +#ifdef LPF_CORE_MPI_USES_hicr + const MemorySlot & src = m_memreg.lookup( srcSlot ); + const MemorySlot & dst = m_memreg.lookup( dstSlot ); + + ASSERT( src.mr ); + + int numMsgs = size/m_maxMsgSize + (size % m_maxMsgSize > 0); //+1 if last msg size < m_maxMsgSize + if (size == 0) numMsgs = 1; + + struct ibv_sge sges[numMsgs]; + struct ibv_send_wr srs[numMsgs]; + struct ibv_sge *sge; + struct ibv_send_wr *sr; + for (int i=0; i < numMsgs; i++) { + sge = &sges[i]; std::memset(sge, 0, sizeof(ibv_sge)); + sr = &srs[i]; std::memset(sr, 0, sizeof(ibv_send_wr)); + const char * localAddr + = static_cast(src.glob[m_pid].addr) + srcOffset; + const char * remoteAddr + = static_cast(dst.glob[dstPid].addr) + dstOffset; + + sge->addr = reinterpret_cast( localAddr ); + sge->length = std::min(size, m_maxMsgSize ); + sge->lkey = src.mr->lkey; + + bool lastMsg = (i == numMsgs-1); + sr->next = lastMsg ? NULL : &m_srs[ i+1]; + // since reliable connection guarantees keeps packets in order, + // we only need a signal from the last message in the queue + sr->send_flags = lastMsg ? IBV_SEND_SIGNALED : 0; + sr->opcode = lastMsg? IBV_WR_RDMA_WRITE_WITH_IMM : IBV_WR_RDMA_WRITE; + /* use wr_id to later demultiplex srcSlot */ + sr->wr_id = srcSlot; + /* + * In HiCR, we need to know at receiver end which slot + * has received the message. But here is a trick: + */ + sr->imm_data = dstSlot; + + sr->sg_list = sge; + sr->num_sge = 1; + sr->wr.rdma.remote_addr = reinterpret_cast( remoteAddr ); + sr->wr.rdma.rkey = dst.glob[dstPid].rkey; + + size -= sge->length; + srcOffset += sge->length; + dstOffset += sge->length; + + LOG(4, "PID " << m_pid << ": Enqueued put message of " << sge->length << " bytes to " << dstPid << " on slot" << dstSlot ); + + } + struct ibv_send_wr *bad_wr = NULL; + if (int err = ibv_post_send(m_connectedQps[dstPid].get(), &srs[0], &bad_wr )) + { + LOG(1, "Error while posting RDMA requests: " << std::strerror(err) ); + throw Exception("Error while posting RDMA requests"); + } + tryIncrement(Op::SEND, Phase::PRE, srcSlot); +#endif +#ifdef LPF_CORE_MPI_USES_ibverbs const MemorySlot & src = m_memreg.lookup( srcSlot ); const MemorySlot & dst = m_memreg.lookup( dstSlot ); @@ -566,11 +931,70 @@ void IBVerbs :: put( SlotID srcSlot, size_t srcOffset, LOG(4, "Enqueued put message of " << sge.length << " bytes to " << dstPid ); } +#endif } void IBVerbs :: get( int srcPid, SlotID srcSlot, size_t srcOffset, SlotID dstSlot, size_t dstOffset, size_t size ) { + +#ifdef LPF_CORE_MPI_USES_hicr + const MemorySlot & src = m_memreg.lookup( srcSlot ); + const MemorySlot & dst = m_memreg.lookup( dstSlot ); + + ASSERT( dst.mr ); + + int numMsgs = size/m_maxMsgSize + (size % m_maxMsgSize > 0); //+1 if last msg size < m_maxMsgSize + + struct ibv_sge sges[numMsgs+1]; + struct ibv_send_wr srs[numMsgs+1]; + struct ibv_sge *sge; + struct ibv_send_wr *sr; + + + for(int i = 0; i< numMsgs; i++){ + sge = &sges[i]; std::memset(sge, 0, sizeof(ibv_sge)); + sr = &srs[i]; std::memset(sr, 0, sizeof(ibv_send_wr)); + + const char * localAddr + = static_cast(dst.glob[m_pid].addr) + dstOffset; + const char * remoteAddr + = static_cast(src.glob[srcPid].addr) + srcOffset; + + sge->addr = reinterpret_cast( localAddr ); + sge->length = std::min(size, m_maxMsgSize ); + sge->lkey = dst.mr->lkey; + + sr->next = NULL; // &srs[i+1]; + sr->send_flags = IBV_SEND_SIGNALED; //0; + + sr->sg_list = sge; + sr->num_sge = 1; + sr->opcode = IBV_WR_RDMA_READ; + sr->wr.rdma.remote_addr = reinterpret_cast( remoteAddr ); + sr->wr.rdma.rkey = src.glob[srcPid].rkey; + // This logic is reversed compared to ::put + // (not srcSlot, as this slot is remote) + sr->wr_id = dstSlot; // <= DO NOT CHANGE THIS !!! + sr->imm_data = srcSlot; // This is irrelevant as we don't send _WITH_IMM + + size -= sge->length; + srcOffset += sge->length; + dstOffset += sge->length; + } + struct ibv_send_wr *bad_wr = NULL; + if (int err = ibv_post_send(m_connectedQps[srcPid].get(), &srs[0], &bad_wr )) + { + + LOG(1, "Error while posting RDMA requests: " << std::strerror(err) ); + if (err == ENOMEM) { + LOG(1, "Specific error code: ENOMEM (send queue is full or no resources)"); + } + throw Exception("Error while posting RDMA requests"); + } + tryIncrement(Op::GET, Phase::PRE, dstSlot); +#endif +#ifdef LPF_CORE_MPI_USES_ibverbs const MemorySlot & src = m_memreg.lookup( srcSlot ); const MemorySlot & dst = m_memreg.lookup( dstSlot ); @@ -614,10 +1038,173 @@ void IBVerbs :: get( int srcPid, SlotID srcSlot, size_t srcOffset, dstOffset += sge.length; LOG(4, "Enqueued get message of " << sge.length << " bytes from " << srcPid ); } +#endif + +} + +void IBVerbs :: get_rcvd_msg_count(size_t * rcvd_msgs) { + *rcvd_msgs = m_recvdMsgs; +} + +void IBVerbs :: get_rcvd_msg_count_per_slot(size_t * rcvd_msgs, SlotID slot) +{ + *rcvd_msgs = rcvdMsgCount[slot]; +} + +void IBVerbs :: get_sent_msg_count_per_slot(size_t * sent_msgs, SlotID slot) +{ + *sent_msgs = sentMsgCount.at(slot); +} + +std::vector IBVerbs :: wait_completion(int& error) { + + error = 0; + LOG(5, "Polling for messages" ); + struct ibv_wc wcs[POLL_BATCH]; + int pollResult = ibv_poll_cq(m_cqLocal.get(), POLL_BATCH, wcs); + std::vector opcodes; + if ( pollResult > 0) { + LOG(3, "Process " << m_pid << ": Received " << pollResult << " acknowledgements"); + + for (int i = 0; i < pollResult ; ++i) { + if (wcs[i].status != IBV_WC_SUCCESS) + { + LOG( 2, "Got bad completion status from IB message." + " status = 0x" << std::hex << wcs[i].status + << ", vendor syndrome = 0x" << std::hex + << wcs[i].vendor_err ); + const char * status_descr; + status_descr = ibv_wc_status_str(wcs[i].status); + LOG( 2, "The work completion status string: " << status_descr); + error = 1; + } + else { + LOG(3, "Process " << m_pid << " Send wcs[" << i << "].src_qp = "<< wcs[i].src_qp); + LOG(3, "Process " << m_pid << " Send wcs[" << i << "].slid = "<< wcs[i].slid); + LOG(3, "Process " << m_pid << " Send wcs[" << i << "].wr_id = "<< wcs[i].wr_id); + LOG(3, "Process " << m_pid << " Send wcs[" << i << "].imm_data = "<< wcs[i].imm_data); + } + + SlotID slot = wcs[i].wr_id; + opcodes.push_back(wcs[i].opcode); + // Ignore compare-and-swap atomics! + if (wcs[i].opcode != IBV_WC_COMP_SWAP) { + // This receive is from a GET call! + if (wcs[i].opcode == IBV_WC_RDMA_READ) { + tryIncrement(Op::GET, Phase::POST, slot); + } + if (wcs[i].opcode == IBV_WC_RDMA_WRITE) + tryIncrement(Op::SEND, Phase::POST, slot); + + LOG(3, "Rank " << m_pid << " increments sent message count to " << sentMsgCount[slot] << " for LPF slot " << slot); + } + } + } + else if (pollResult < 0) + { + LOG( 5, "Failed to poll IB completion queue" ); + throw Exception("Poll CQ failure"); + } + return opcodes; +} + +void IBVerbs :: flushReceived() { + doRemoteProgress(); +} + +void IBVerbs :: flushSent() +{ + int error = 0; + + bool sendsComplete; + do { + sendsComplete = true; + for (size_t i = 0; i sentMsgCount[i]) { + sendsComplete = false; + wait_completion(error); + if (error) { + LOG(1, "Error in wait_completion. Most likely issue is that receiver is not calling ibv_post_srq!\n"); + std::abort(); + } + } + } + } + } while (!sendsComplete); + +} + +void IBVerbs :: countingSyncPerSlot(bool resized, SlotID slot, size_t expectedSent, size_t expectedRecvd) { + + if (resized) reconnectQPs(); + size_t actualRecvd; + size_t actualSent; + int error; + if (slotActive[slot]) { + do { + wait_completion(error); + if (error) { + LOG(1, "Error in wait_completion"); + std::abort(); + } + // this call triggers doRemoteProgress + doRemoteProgress(); + + } while ( + (rcvdMsgCount[slot] < m_recvInitMsgCount[slot]) || + (sentMsgCount[slot] < m_sendInitMsgCount[slot]) + ); + } +} + +void IBVerbs :: syncPerSlot(bool resized, SlotID slot) { + if (resized) reconnectQPs(); + int error; + + do { + wait_completion(error); + if (error) { + LOG(1, "Error in wait_completion"); + std::abort(); + } + doRemoteProgress(); + } + while ((rcvdMsgCount.at(slot) < m_recvInitMsgCount.at(slot)) || (sentMsgCount.at(slot) < m_sendInitMsgCount.at(slot))); + + /** + * A subsequent barrier is a controversial decision: + * - if we use it, the sync guarantees that + * receiver has received all that it is supposed to + * receive. However, it loses all performance advantages + * of waiting "only on certain tags" + * - if we do not barrier, we only make sure the slot + * completes all sends and receives that HAVE ALREADY + * BEEN ISSUED. However, a receiver of an RMA put + * cannot know if it is supposed to receive more messages. + * It can only know if it is receiving via an RMA get. + * Therefore, now this operation is commented + */ + //m_comm.barrier(); + } void IBVerbs :: sync( bool reconnect ) { + +#ifdef LPF_CORE_MPI_USES_hicr + if (reconnect) reconnectQPs(); + + int error = 0; + + // flush send queues + flushSent(); + // flush receive queues + flushReceived(); + + LOG(1, "Process " << m_pid << " will call barrier\n"); + m_comm.barrier(); +#else if (reconnect) reconnectQPs(); while ( !m_activePeers.empty() ) { @@ -717,7 +1304,10 @@ void IBVerbs :: sync( bool reconnect ) // synchronize m_comm.barrier(); +#endif + } +} // mpi -} } +} // lpf diff --git a/src/MPI/ibverbs.hpp b/src/MPI/ibverbs.hpp index b79ec53a..af3ca1b6 100644 --- a/src/MPI/ibverbs.hpp +++ b/src/MPI/ibverbs.hpp @@ -19,6 +19,7 @@ #define LPF_CORE_MPI_IBVERBS_HPP #include +#include #include #if __cplusplus >= 201103L #include @@ -33,6 +34,18 @@ #include "sparseset.hpp" #include "memreg.hpp" +typedef enum Op { + SEND, + RECV, + GET +} Op; + +typedef enum Phase { + INIT, + PRE, + POST +} Phase; + namespace lpf { class Communication; @@ -62,24 +75,49 @@ class _LPFLIB_LOCAL IBVerbs SlotID regGlobal( void * addr, size_t size ); void dereg( SlotID id ); + void blockingCompareAndSwap(SlotID srSlot, size_t srcOffset, int dstPid, SlotID dstSlot, size_t dstOffset, size_t size, uint64_t compare_add, uint64_t swap); + void put( SlotID srcSlot, size_t srcOffset, int dstPid, SlotID dstSlot, size_t dstOffset, size_t size ); void get( int srcPid, SlotID srcSlot, size_t srcOffset, SlotID dstSlot, size_t dstOffset, size_t size ); + void flushSent(); + + void flushReceived(); + + void doRemoteProgress(); + + void countingSyncPerSlot(bool resized, SlotID tag, size_t sent, size_t recvd); + /** + * @syncPerSlot only guarantees that all already scheduled sends (via put), + * or receives (via get) associated with a slot are completed. It does + * not guarantee that not scheduled operations will be scheduled (e.g. + * no guarantee that a remote process will wait til data is put into its + * memory, as it does schedule the operation (one-sided). + */ + void syncPerSlot(bool resized, SlotID slot); // Do the communication and synchronize // 'Reconnect' must be a globally replicated value void sync( bool reconnect); + void get_rcvd_msg_count(size_t * rcvd_msgs); + void get_rcvd_msg_count_per_slot(size_t * rcvd_msgs, SlotID slot); + void get_sent_msg_count_per_slot(size_t * sent_msgs, SlotID slot); private: IBVerbs & operator=(const IBVerbs & ); // assignment prohibited IBVerbs( const IBVerbs & ); // copying prohibited void stageQPs(size_t maxMsgs ); void reconnectQPs(); + void tryLock(SlotID id, int dstPid); + void tryUnlock(SlotID id, int dstPid); + std::vector wait_completion(int& error); + void doProgress(); + void tryIncrement(Op op, Phase phase, SlotID slot); struct MemoryRegistration { void * addr; @@ -95,6 +133,14 @@ class _LPFLIB_LOCAL IBVerbs int m_pid; // local process ID int m_nprocs; // number of processes + std::atomic_size_t m_numMsgs; + //std::atomic_size_t m_sendTotalInitMsgCount; + std::atomic_size_t m_recvTotalInitMsgCount; + std::atomic_size_t m_sentMsgs; + std::atomic_size_t m_recvdMsgs; + std::vector m_recvInitMsgCount; + std::vector m_getInitMsgCount; + std::vector m_sendInitMsgCount; std::string m_devName; // IB device name int m_ibPort; // local IB port to work with @@ -104,18 +150,28 @@ class _LPFLIB_LOCAL IBVerbs struct ibv_device_attr m_deviceAttr; size_t m_maxRegSize; size_t m_maxMsgSize; + size_t m_cqSize; size_t m_minNrMsgs; size_t m_maxSrs; // maximum number of sends requests per QP + size_t m_postCount; + size_t m_recvCount; shared_ptr< struct ibv_context > m_device; // device handle shared_ptr< struct ibv_pd > m_pd; // protection domain shared_ptr< struct ibv_cq > m_cq; // complation queue + shared_ptr< struct ibv_cq > m_cqLocal; // completion queue + shared_ptr< struct ibv_cq > m_cqRemote; // completion queue + shared_ptr< struct ibv_srq > m_srq; // shared receive queue // Disconnected queue pairs - std::vector< shared_ptr< struct ibv_qp > > m_stagedQps; + std::vector< shared_ptr > m_stagedQps; // Connected queue pairs - std::vector< shared_ptr< struct ibv_qp > > m_connectedQps; + std::vector< shared_ptr > m_connectedQps; + + std::vector rcvdMsgCount; + std::vector sentMsgCount; + std::vector slotActive; std::vector< struct ibv_send_wr > m_srs; // array of send requests diff --git a/src/MPI/init.cpp b/src/MPI/init.cpp index 68d16866..5971f925 100644 --- a/src/MPI/init.cpp +++ b/src/MPI/init.cpp @@ -54,9 +54,10 @@ namespace lpf { (engine.compare( "mpirma" ) == 0) || (engine.compare( "mpimsg" ) == 0) || (engine.compare( "ibverbs" ) == 0) || + (engine.compare( "hicr" ) == 0) || (engine.compare( "hybrid" ) == 0); if( !engine_is_MPI ) { - (void) std::fprintf( stderr, "Warning: program was compiled for the mpirma, mpimsg, ibverbs, or hybrid engine but run-time requests the %s engine instead. For stable results please compile the program into a universal LPF program (by omitting the -engine flag to the lpfcc/lpfcxx utilities).\n", engine.c_str() ); + (void) std::fprintf( stderr, "Warning: program was compiled for the mpirma, mpimsg, ibverbs, hicr, or hybrid engine but run-time requests the %s engine instead. For stable results please compile the program into a universal LPF program (by omitting the -engine flag to the lpfcc/lpfcxx utilities).\n", engine.c_str() ); } if( mpi_initializer_ran || !engine_is_MPI ) { diff --git a/src/MPI/interface.cpp b/src/MPI/interface.cpp index f1919f33..b1071c93 100644 --- a/src/MPI/interface.cpp +++ b/src/MPI/interface.cpp @@ -100,6 +100,76 @@ void Interface :: put( memslot_t srcSlot, size_t srcOffset, size ); } +// only for HiCR +//#ifdef + +void Interface :: lockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, + size_t size ) +{ + m_mesgQueue.lockSlot( srcSlot, srcOffset, + dstPid, dstSlot, dstOffset, + size ); +} + +void Interface :: unlockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, + size_t size ) +{ + m_mesgQueue.unlockSlot( srcSlot, srcOffset, + dstPid, dstSlot, dstOffset, + size ); +} + +void Interface :: getRcvdMsgCountPerSlot(size_t * msgs, SlotID slot) { + m_mesgQueue.getRcvdMsgCountPerSlot(msgs, slot); +} + +void Interface :: getSentMsgCountPerSlot(size_t * msgs, SlotID slot) { + m_mesgQueue.getSentMsgCountPerSlot(msgs, slot); +} + +void Interface :: flushSent() { + m_mesgQueue.flushSent(); +} + +void Interface :: flushReceived() { + m_mesgQueue.flushReceived(); +} + +void Interface :: getRcvdMsgCount(size_t * msgs) { + m_mesgQueue.getRcvdMsgCount(msgs); +} + +err_t Interface :: countingSyncPerSlot(memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + if ( 0 == m_aborted ) + { + m_aborted = m_mesgQueue.countingSyncPerSlot(slot, expected_sent, expected_rcvd); + return LPF_SUCCESS; + } + else + { + return LPF_ERR_FATAL; + } +} + +err_t Interface :: syncPerSlot(memslot_t slot) +{ + if ( 0 == m_aborted ) + { + m_aborted = m_mesgQueue.syncPerSlot(slot); + return LPF_SUCCESS; + } + else + { + return LPF_ERR_FATAL; + } +} + +// only for HiCR +//#endif + void Interface :: get( pid_t srcPid, memslot_t srcSlot, size_t srcOffset, memslot_t dstSlot, size_t dstOffset, size_t size ) @@ -137,9 +207,16 @@ err_t Interface :: resizeMesgQueue( size_t nMsgs ) void Interface :: abort() { ASSERT( 0 == m_aborted ); +#ifdef LPF_CORE_MPI_USES_hicr + int vote = 1; + int voted; + m_comm.allreduceSum(&vote, &voted, 1); + m_aborted = voted; +#else // signal all other processes at the start of the next 'sync' that // this process aborted. m_aborted = m_mesgQueue.sync( true ); +#endif } pid_t Interface :: isAborted() const diff --git a/src/MPI/interface.hpp b/src/MPI/interface.hpp index 732f0a9b..02e48b3c 100644 --- a/src/MPI/interface.hpp +++ b/src/MPI/interface.hpp @@ -70,6 +70,34 @@ class _LPFLIB_LOCAL Interface static err_t hook( const mpi::Comm & comm , spmd_t spmd, args_t args ); + // only for HiCR + // #if + err_t countingSyncPerSlot(memslot_t slot, size_t expected_sent, size_t expected_rcvd); + + err_t syncPerSlot(memslot_t slot); + + typedef size_t SlotID; + + void getRcvdMsgCountPerSlot(size_t * msgs, SlotID slot); + + void getSentMsgCountPerSlot(size_t * msgs, SlotID slot); + + void getRcvdMsgCount(size_t * msgs); + + void flushSent(); + + void flushReceived(); + + void lockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, + size_t size ); + + void unlockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, + size_t size ); + + // only for HiCR +//#endif err_t rehook( spmd_t spmd, args_t args); void probe( machine_t & machine ) ; diff --git a/src/MPI/memorytable.cpp b/src/MPI/memorytable.cpp index 3bb7a792..7fe0abc5 100644 --- a/src/MPI/memorytable.cpp +++ b/src/MPI/memorytable.cpp @@ -23,7 +23,7 @@ namespace lpf { MemoryTable :: MemoryTable( Communication & comm -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr , mpi::IBVerbs & ibverbs #endif ) @@ -34,7 +34,7 @@ MemoryTable :: MemoryTable( Communication & comm , m_removed( 0, 0 ) , m_comm( comm ) #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr , m_added( 0, 0 ) , m_ibverbs( ibverbs ) , m_comm( comm ) @@ -45,7 +45,7 @@ MemoryTable :: MemoryTable( Communication & comm MemoryTable :: Slot MemoryTable :: addLocal( void * mem, std::size_t size ) // nothrow { -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr Memory rec( mem, size, m_ibverbs.regLocal( mem, size)); #else Memory rec( mem, size); @@ -56,13 +56,13 @@ MemoryTable :: addLocal( void * mem, std::size_t size ) // nothrow MemoryTable :: Slot MemoryTable :: addGlobal( void * mem, std::size_t size ) // nothrow { -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr Memory rec(mem, size, -1); #else Memory rec(mem, size); #endif Slot slot = m_memreg.addGlobalReg(rec) ; -#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr m_added.insert( slot ); #endif return slot; @@ -92,7 +92,7 @@ void MemoryTable :: remove( Slot slot ) // nothrow m_memreg.removeReg( slot ); #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr if (m_added.contains(slot)) { m_added.erase(slot); } @@ -123,7 +123,7 @@ void MemoryTable :: reserve( size_t size ) // throws bad_alloc, strong safe m_memreg.reserve( size ); #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr m_memreg.reserve( size ); size_t range = m_memreg.range(); m_added.resize( range ); @@ -151,7 +151,7 @@ bool MemoryTable :: needsSync() const #ifdef LPF_CORE_MPI_USES_mpimsg return false; #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr return !m_added.empty(); #endif } @@ -194,7 +194,7 @@ void MemoryTable :: sync( ) } // if #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr if ( !m_added.empty() ) { // Register the global with IBverbs diff --git a/src/MPI/memorytable.hpp b/src/MPI/memorytable.hpp index 18dd5038..7e24e6e1 100644 --- a/src/MPI/memorytable.hpp +++ b/src/MPI/memorytable.hpp @@ -24,7 +24,7 @@ #include "assert.hpp" #include "linkage.hpp" -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr #include "ibverbs.hpp" #endif @@ -44,7 +44,7 @@ class _LPFLIB_LOCAL MemoryTable struct Memory { char *addr; size_t size; -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr mpi::IBVerbs::SlotID slot; Memory( void * a, size_t s, mpi::IBVerbs::SlotID sl) : addr(static_cast(a)) @@ -65,7 +65,7 @@ class _LPFLIB_LOCAL MemoryTable static Slot invalidSlot() { return Register::invalidSlot(); } -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr explicit MemoryTable( Communication & comm, mpi::IBVerbs & verbs ); #else explicit MemoryTable( Communication & comm ); @@ -90,7 +90,7 @@ class _LPFLIB_LOCAL MemoryTable { return m_windows[ slot ]; } #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr mpi::IBVerbs::SlotID getVerbID( Slot slot ) const { return m_memreg.lookup( slot ).slot; } #endif @@ -118,7 +118,7 @@ class _LPFLIB_LOCAL MemoryTable Communication & m_comm; #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr DirtyList m_added; mpi::IBVerbs & m_ibverbs; Communication & m_comm; diff --git a/src/MPI/mesgqueue.cpp b/src/MPI/mesgqueue.cpp index 0f610a52..e656a30c 100644 --- a/src/MPI/mesgqueue.cpp +++ b/src/MPI/mesgqueue.cpp @@ -97,13 +97,13 @@ MessageQueue :: MessageQueue( Communication & comm ) , m_edgeRecv() , m_edgeSend() , m_edgeBuffer() -#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr , m_edgeBufferSlot( m_memreg.invalidSlot() ) #endif , m_bodySends() , m_bodyRecvs() , m_comm( dynamic_cast(comm) ) -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr , m_ibverbs( m_comm ) , m_memreg( m_comm, m_ibverbs ) #else @@ -179,7 +179,7 @@ err_t MessageQueue :: resizeMesgQueue( size_t nMsgs ) #ifdef LPF_CORE_MPI_USES_mpimsg m_comm.reserveMsgs( 6* nMsgs ); //another factor three stems from sending edges separately . #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr m_ibverbs.resizeMesgq( 6*nMsgs); #endif @@ -270,6 +270,14 @@ void MessageQueue :: removeReg( memslot_t slot ) void MessageQueue :: get( pid_t srcPid, memslot_t srcSlot, size_t srcOffset, memslot_t dstSlot, size_t dstOffset, size_t size ) { +#ifdef LPF_CORE_MPI_USES_hicr + m_ibverbs.get(srcPid, + m_memreg.getVerbID( srcSlot), + srcOffset, + m_memreg.getVerbID( dstSlot), + dstOffset, + size ); +#else if (size > 0) { ASSERT( ! m_memreg.isLocalSlot( srcSlot ) ); @@ -310,11 +318,36 @@ void MessageQueue :: get( pid_t srcPid, memslot_t srcSlot, size_t srcOffset, } } } +#endif +} + +void MessageQueue :: lockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ) +{ +#ifdef LPF_CORE_MPI_USES_hicr +m_ibverbs.blockingCompareAndSwap(m_memreg.getVerbID(srcSlot), srcOffset, dstPid, m_memreg.getVerbID(dstSlot), dstOffset, size, 0ULL, 1ULL); +#endif +} + +void MessageQueue :: unlockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ) +{ +#ifdef LPF_CORE_MPI_USES_hicr +m_ibverbs.blockingCompareAndSwap(m_memreg.getVerbID(srcSlot), srcOffset, dstPid, m_memreg.getVerbID(dstSlot), dstOffset, size, 1ULL, 0ULL); +#endif } void MessageQueue :: put( memslot_t srcSlot, size_t srcOffset, pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ) { +#ifdef LPF_CORE_MPI_USES_hicr + m_ibverbs.put( m_memreg.getVerbID( srcSlot), + srcOffset, + dstPid, + m_memreg.getVerbID( dstSlot), + dstOffset, + size); +#else if (size > 0) { ASSERT( ! m_memreg.isLocalSlot( dstSlot ) ); @@ -348,10 +381,19 @@ void MessageQueue :: put( memslot_t srcSlot, size_t srcOffset, } } } +#endif + } int MessageQueue :: sync( bool abort ) { +#ifdef LPF_CORE_MPI_USES_hicr + // if not, deal with normal sync + m_memreg.sync(); + m_ibverbs.sync(m_resized); + m_resized = false; +#else + LOG(4, "mpi :: MessageQueue :: sync( abort " << (abort?"true":"false") << " )"); using mpi::ipc::newMsg; @@ -971,9 +1013,82 @@ int MessageQueue :: sync( bool abort ) ASSERT( m_bodyRecvs.empty() ); LOG(4, "End of synchronisation"); +#endif return 0; + } +int MessageQueue :: countingSyncPerSlot(SlotID slot, size_t expected_sent, size_t expected_rcvd) +{ + +#ifdef LPF_CORE_MPI_USES_hicr + + // if not, deal with normal sync + m_memreg.sync(); + + m_ibverbs.countingSyncPerSlot(m_resized, slot, expected_sent, expected_rcvd); + + m_resized = false; + +#endif + return 0; +} + +int MessageQueue :: syncPerSlot(SlotID slot) +{ + +#ifdef LPF_CORE_MPI_USES_hicr + + // if not, deal with normal sync + m_memreg.sync(); + + m_ibverbs.syncPerSlot(m_resized, slot); + + m_resized = false; + +#endif + return 0; +} + + +void MessageQueue :: getRcvdMsgCountPerSlot(size_t * msgs, SlotID slot) +{ + +#ifdef LPF_CORE_MPI_USES_hicr + *msgs = 0; + m_ibverbs.get_rcvd_msg_count_per_slot(msgs, slot); +#endif +} + +void MessageQueue :: getRcvdMsgCount(size_t * msgs) +{ +#ifdef LPF_CORE_MPI_USES_hicr + *msgs = 0; + m_ibverbs.get_rcvd_msg_count(msgs); +#endif +} + +void MessageQueue :: getSentMsgCountPerSlot(size_t * msgs, SlotID slot) +{ +#ifdef LPF_CORE_MPI_USES_hicr + *msgs = 0; + m_ibverbs.get_sent_msg_count_per_slot(msgs, slot); +#endif +} + +void MessageQueue :: flushSent() +{ +#ifdef LPF_CORE_MPI_USES_hicr + m_ibverbs.flushSent(); +#endif +} + +void MessageQueue :: flushReceived() +{ +#ifdef LPF_CORE_MPI_USES_hicr + m_ibverbs.flushReceived(); +#endif +} } // namespace lpf diff --git a/src/MPI/mesgqueue.hpp b/src/MPI/mesgqueue.hpp index 27e7beb5..5b9c70a1 100644 --- a/src/MPI/mesgqueue.hpp +++ b/src/MPI/mesgqueue.hpp @@ -33,14 +33,18 @@ #include #endif -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr #include "ibverbs.hpp" #endif +//only for HiCR +typedef size_t SlotID; + namespace lpf { class _LPFLIB_LOCAL MessageQueue { + public: explicit MessageQueue( Communication & comm ); @@ -62,6 +66,30 @@ class _LPFLIB_LOCAL MessageQueue // returns how many processes have entered in an aborted state int sync( bool abort ); +//only for HiCR +//#ifdef + void lockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ); + + void unlockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ); + + void getRcvdMsgCountPerSlot(size_t * msgs, SlotID slot); + + void getRcvdMsgCount(size_t * msgs); + + void getSentMsgCountPerSlot(size_t * msgs, SlotID slot); + + void flushSent(); + + void flushReceived(); + + int countingSyncPerSlot(SlotID slot, size_t expected_sent, size_t expected_rcvd); + + int syncPerSlot(SlotID slot); +// end only for HiCR +//#endif + private: enum Msgs { BufPut , BufGet, BufGetReply, @@ -126,13 +154,13 @@ class _LPFLIB_LOCAL MessageQueue std::vector< Edge > m_edgeRecv; std::vector< Edge > m_edgeSend; std::vector< char > m_edgeBuffer; -#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr memslot_t m_edgeBufferSlot; #endif std::vector< Body > m_bodySends; std::vector< Body > m_bodyRecvs; mpi::Comm m_comm; -#ifdef LPF_CORE_MPI_USES_ibverbs +#if defined LPF_CORE_MPI_USES_ibverbs || defined LPF_CORE_MPI_USES_hicr mpi::IBVerbs m_ibverbs; #endif MemoryTable m_memreg; diff --git a/src/core-libraries/collectives.c b/src/core-libraries/collectives.c index ff952e1f..cc80a69b 100644 --- a/src/core-libraries/collectives.c +++ b/src/core-libraries/collectives.c @@ -390,6 +390,41 @@ lpf_err_t lpf_allgather( return LPF_SUCCESS; } + +lpf_err_t lpf_allgatherv( + lpf_coll_t coll, + lpf_memslot_t src, + lpf_memslot_t dst, + size_t *sizes, + bool exclude_myself + ) { + + ASSERT( coll.P > 0 ); + ASSERT( coll.s < coll.P ); + + size_t allgatherv_start_addresses[coll.P]; + + for (size_t i=0; i 0) { + for (size_t i=0; isync(); } +_LPFLIB_API lpf_err_t lpf_counting_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + (void) attr; + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + return realContext(ctx)->countingSyncPerSlot(slot, expected_sent, expected_rcvd); +} + +_LPFLIB_API lpf_err_t lpf_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot) +{ + (void) attr; + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + return realContext(ctx)->syncPerSlot(slot); +} _LPFLIB_API lpf_err_t lpf_probe( lpf_t ctx, lpf_machine_t * params ) { @@ -384,4 +401,40 @@ _LPFLIB_API lpf_err_t lpf_resize_memory_register( lpf_t ctx, size_t max_regs ) return LPF_SUCCESS; } +_LPFLIB_API lpf_err_t lpf_get_rcvd_msg_count( lpf_t ctx, size_t * rcvd_msgs) +{ + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + ThreadState * t = realContext(ctx); + if (!t->error()) + return t->getRcvdMsgCount(rcvd_msgs); + else + return LPF_SUCCESS; +} + +_LPFLIB_API lpf_err_t lpf_get_rcvd_msg_count_per_slot( lpf_t ctx, size_t * rcvd_msgs, lpf_memslot_t slot ) +{ + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + ThreadState * t = realContext(ctx); + if (!t->error()) + return t->getRcvdMsgCount(rcvd_msgs, slot); + else + return LPF_SUCCESS; +} + +_LPFLIB_API lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t ctx, size_t * sent_msgs, lpf_memslot_t slot ) +{ + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + ThreadState * t = realContext(ctx); + if (!t->error()) + return t->getSentMsgCount(sent_msgs, slot); + else + return LPF_SUCCESS; +} + } // extern "C" diff --git a/src/hybrid/dispatch.hpp b/src/hybrid/dispatch.hpp index 1235e513..833746bf 100644 --- a/src/hybrid/dispatch.hpp +++ b/src/hybrid/dispatch.hpp @@ -112,6 +112,21 @@ namespace lpf { namespace hybrid { err_t deregister( memslot_t memslot) { return USE_THREAD( deregister)(m_ctx, memslot); } + err_t get_rcvd_msg_count_per_slot( size_t * rcvd_msgs, lpf_memslot_t slot) + { return USE_THREAD( get_rcvd_msg_count_per_slot)(m_ctx, rcvd_msgs, slot); } + + err_t get_sent_msg_count_per_slot( size_t * sent_msgs, lpf_memslot_t slot) + { return USE_THREAD( get_sent_msg_count_per_slot)(m_ctx, sent_msgs, slot); } + + err_t get_rcvd_msg_count( size_t * rcvd_msgs) + { return USE_THREAD( get_rcvd_msg_count)(m_ctx, rcvd_msgs); } + + err_t flush_sent() + { return USE_THREAD(flush_sent)(m_ctx); } + + err_t flush_received() + { return USE_THREAD(flush_received)(m_ctx); } + err_t put( memslot_t src_slot, size_t src_offset, pid_t dst_pid, memslot_t dst_slot, size_t dst_offset, size_t size, msg_attr_t attr = MSG_DEFAULT ) @@ -127,6 +142,12 @@ namespace lpf { namespace hybrid { err_t sync( sync_attr_t attr = SYNC_DEFAULT ) { return USE_THREAD(sync)( m_ctx, attr ); } + err_t sync_per_slot( sync_attr_t attr = SYNC_DEFAULT, memslot_t slot = LPF_INVALID_MEMSLOT) + { return USE_THREAD(sync_per_slot)( m_ctx, attr, slot); } + + err_t counting_sync_per_slot( sync_attr_t attr = SYNC_DEFAULT, lpf_memslot_t slot = LPF_INVALID_MEMSLOT, size_t expected_sent = 0, size_t expected_recvd = 0) + { return USE_THREAD(counting_sync_per_slot)(m_ctx, attr, slot, expected_sent, expected_recvd); } + err_t probe( machine_t * params ) { return USE_THREAD(probe)(m_ctx, params ); } @@ -202,6 +223,21 @@ namespace lpf { namespace hybrid { err_t deregister( memslot_t memslot) { return USE_MPI( deregister)(m_ctx, memslot); } + err_t get_rcvd_msg_count_per_slot(size_t *rcvd_msgs, lpf_memslot_t slot) + { return USE_MPI( get_rcvd_msg_count_per_slot)( m_ctx, rcvd_msgs, slot); } + + err_t get_sent_msg_count_per_slot(size_t *sent_msgs, lpf_memslot_t slot) + { return USE_MPI( get_sent_msg_count_per_slot)( m_ctx, sent_msgs, slot); } + + err_t get_rcvd_msg_count( size_t * rcvd_msgs) + { return USE_MPI( get_rcvd_msg_count)(m_ctx, rcvd_msgs); } + + err_t flush_sent() + {return USE_MPI( flush_sent)(m_ctx);} + + err_t flush_received() + {return USE_MPI( flush_received)(m_ctx);} + err_t put( memslot_t src_slot, size_t src_offset, pid_t dst_pid, memslot_t dst_slot, size_t dst_offset, size_t size, msg_attr_t attr = MSG_DEFAULT ) @@ -217,6 +253,12 @@ namespace lpf { namespace hybrid { err_t sync( sync_attr_t attr = SYNC_DEFAULT ) { return USE_MPI(sync)( m_ctx, attr ); } + err_t sync_per_slot( sync_attr_t attr = SYNC_DEFAULT, lpf_memslot_t slot = LPF_INVALID_MEMSLOT ) + { return USE_MPI(sync_per_slot)( m_ctx, attr, slot); } + + err_t counting_sync_per_slot( sync_attr_t attr = SYNC_DEFAULT, lpf_memslot_t slot = LPF_INVALID_MEMSLOT, size_t expected_sent = 0, size_t expected_recvd = 0) + { return USE_MPI(counting_sync_per_slot)(m_ctx, attr, slot, expected_sent, expected_recvd); } + err_t probe( machine_t * params ) { return USE_MPI(probe)(m_ctx, params ); } diff --git a/src/hybrid/state.hpp b/src/hybrid/state.hpp index 6ae1dd3a..06e8faf3 100644 --- a/src/hybrid/state.hpp +++ b/src/hybrid/state.hpp @@ -111,6 +111,13 @@ class _LPFLIB_LOCAL NodeState { return m_mpi.sync(); } +// MPI::err_t counting_sync_per_slot(lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +// { +// m_memreg.flush( m_mpi ); +// m_msgQueue.flush( m_mpi, m_memreg ); +// return m_mpi.counting_sync_per_slot(slot, expected_sent, expected_rcvd); +// } + static double messageGap( lpf_pid_t nprocs, size_t minMsgSize, lpf_sync_attr_t attr) { (void) nprocs; @@ -367,6 +374,16 @@ class _LPFLIB_LOCAL ThreadState { return LPF_SUCCESS; } + lpf_err_t countingSyncPerSlot(lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) + { + return m_nodeState.mpi().counting_sync_per_slot(slot, expected_sent, expected_rcvd); + } + + lpf_err_t syncPerSlot(lpf_memslot_t slot) + { + return m_nodeState.mpi().sync_per_slot(slot); + } + ThreadState( NodeState * nodeState, Thread thread ) : m_error(false) , m_threadId( thread.pid() ) @@ -405,6 +422,25 @@ class _LPFLIB_LOCAL ThreadState { bool error() const { return m_error; } + lpf_pid_t getRcvdMsgCount(size_t * rcvd_msgs, lpf_memslot_t slot) { + + return m_nodeState.mpi().get_rcvd_msg_count_per_slot(rcvd_msgs, slot); + } + + lpf_pid_t getSentMsgCount(size_t * sent_msgs, lpf_memslot_t slot) { + + return m_nodeState.mpi().get_sent_msg_count_per_slot(sent_msgs, slot); + } + + lpf_pid_t getRcvdMsgCount(size_t * rcvd_msgs) { + + return m_nodeState.mpi().get_rcvd_msg_count(rcvd_msgs); + } + + lpf_pid_t flush() { + return (m_nodeState.mpi().flush_sent() && m_nodeState.mpi().flush_received()); + } + private: bool m_error; diff --git a/src/imp/core.c b/src/imp/core.c index 990e267c..7d3dde9f 100644 --- a/src/imp/core.c +++ b/src/imp/core.c @@ -137,6 +137,39 @@ lpf_err_t lpf_sync( lpf_t lpf, lpf_sync_attr_t attr ) return LPF_SUCCESS; } +lpf_err_t lpf_counting_sync_per_slot( lpf_t lpf, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + (void) lpf; + (void) attr; + return LPF_SUCCESS; +} + +lpf_err_t lpf_lock_slot( + lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +) { + return LPF_SUCCESS; +} + +lpf_err_t lpf_unlock_slot( + lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +) { + return LPF_SUCCESS; +} + static double messageGap( lpf_pid_t p, size_t min_msg_size, lpf_sync_attr_t attr) { (void) p; @@ -179,3 +212,26 @@ lpf_err_t lpf_resize_memory_register( lpf_t lpf, size_t max_regs ) return LPF_SUCCESS; } +lpf_err_t lpf_get_rcvd_msg_count_per_slot( lpf_t lpf, size_t * rcvd_msgs, lpf_memslot_t slot) { + (void) lpf; + *rcvd_msgs = 0; + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_rcvd_msg_count( lpf_t lpf, size_t * rcvd_msgs) { + (void) lpf; + *rcvd_msgs = 0; + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t lpf, size_t * sent_msgs, lpf_memslot_t slot) { + (void) lpf; + *sent_msgs = 0; + return LPF_SUCCESS; +} + +lpf_err_t lpf_flush( lpf_t lpf) { + (void) lpf; + return LPF_SUCCESS; +} + diff --git a/src/pthreads/core.cpp b/src/pthreads/core.cpp index 1d90588a..bfe44c58 100644 --- a/src/pthreads/core.cpp +++ b/src/pthreads/core.cpp @@ -330,6 +330,13 @@ lpf_err_t lpf_sync( lpf_t ctx, lpf_sync_attr_t attr ) return realCtx(ctx)->sync(); } +lpf_err_t lpf_counting_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + (void) attr; // ignore attr parameter since this implementation only + // implements core functionality + return realCtx(ctx)->countingSyncPerSlot(slot, expected_sent, expected_rcvd); +} + namespace { double messageGap( lpf_pid_t p, size_t min_msg_size, @@ -378,3 +385,27 @@ lpf_err_t lpf_resize_memory_register( lpf_t ctx, size_t max_regs ) return t->resizeMemreg(max_regs); } +lpf_err_t lpf_get_rcvd_msg_count_per_slot(lpf_t ctx, size_t * msgs, lpf_memslot_t slot) { + *msgs = 0; + lpf::ThreadLocalData * t = realCtx(ctx); + if (t->isAborted()) + return LPF_SUCCESS; + return LPF_SUCCESS; +} + + +lpf_err_t lpf_get_rcvd_msg_count(lpf_t ctx, size_t * msgs) { + *msgs = 0; + lpf::ThreadLocalData * t = realCtx(ctx); + if (t->isAborted()) + return LPF_SUCCESS; + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_sent_msg_count_per_slot(lpf_t ctx, size_t * msgs, lpf_memslot_t slot) { + *msgs = 0; + lpf::ThreadLocalData * t = realCtx(ctx); + if (t->isAborted()) + return LPF_SUCCESS; + return LPF_SUCCESS; +} diff --git a/src/pthreads/threadlocaldata.cpp b/src/pthreads/threadlocaldata.cpp index 6bb358f1..6a62e4d3 100644 --- a/src/pthreads/threadlocaldata.cpp +++ b/src/pthreads/threadlocaldata.cpp @@ -423,7 +423,7 @@ err_t ThreadLocalData :: resizeMemreg( size_t nRegs ) // nothrow } } -err_t ThreadLocalData :: sync( bool expectExit ) +err_t ThreadLocalData :: sync( bool expectExit) { if ( m_state->sync(m_pid) ) { @@ -441,6 +441,10 @@ err_t ThreadLocalData :: sync( bool expectExit ) return LPF_SUCCESS; } +err_t ThreadLocalData :: countingSyncPerSlot(bool expectExit, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) { + return LPF_SUCCESS; +} + namespace { int getNumberOfProcs() { diff --git a/src/pthreads/threadlocaldata.hpp b/src/pthreads/threadlocaldata.hpp index 66d56160..c1a83706 100644 --- a/src/pthreads/threadlocaldata.hpp +++ b/src/pthreads/threadlocaldata.hpp @@ -105,6 +105,8 @@ class _LPFLIB_LOCAL ThreadLocalData { return m_atExit[0]; } err_t sync( bool expectExit = false ); // nothrow + err_t countingSyncPerSlot( bool expectExit = false, lpf_memslot_t slot = LPF_INVALID_MEMSLOT, size_t expected_sent = 0, size_t expected_rcvd = 0); // nothrow + err_t syncPerSlot( bool expectExit = false, lpf_memslot_t slot = LPF_INVALID_MEMSLOT); // nothrow private: ThreadLocalData( const ThreadLocalData & ) ; // prohibit copying diff --git a/tests/functional/func_bsplib_example_lpf_sum_unsafemode.c b/tests/functional/func_bsplib_example_lpf_sum_unsafemode.c deleted file mode 100644 index e81c1576..00000000 --- a/tests/functional/func_bsplib_example_lpf_sum_unsafemode.c +++ /dev/null @@ -1,85 +0,0 @@ - -/* - * Copyright 2021 Huawei Technologies Co., Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include "Test.h" - -#include - -void spmd( lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args) -{ - (void) args; // ignore any arguments passed through call to lpf_exec - - bsplib_err_t rc = BSPLIB_SUCCESS; - - bsplib_t bsplib; - rc = bsplib_create( lpf, pid, nprocs, 0, 2, &bsplib); - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); - - int i, j; - int n = 5; - int result = 0, p = bsplib_nprocs(bsplib); - int local_sums[p]; - int xs[n]; - memset( local_sums, 0, sizeof( local_sums ) ); - - for ( j = 0; j < n; ++j ) - xs[j] = j + bsplib_pid(bsplib); - - // All-set. Now compute the sum - - for ( j = 0; j < n; ++j ) - result += xs[j]; - - rc = bsplib_push_reg(bsplib, &result, sizeof( result ) ); - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); - rc = bsplib_sync(bsplib); - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); - - for ( i = 0; i < p; ++i ) { - rc = bsplib_hpget(bsplib, i, &result, 0, &local_sums[i], sizeof( int ) ); - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); - } - rc = bsplib_sync(bsplib); - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); - - result = 0; - for ( i = 0; i < p; ++i ) - result += local_sums[i]; - rc = bsplib_pop_reg(bsplib, &result ); - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); - - EXPECT_EQ( "%d", - p * ( n - 1 ) * n / 2 + n * ( p - 1 ) * p / 2, - result ); - - rc = bsplib_destroy( bsplib); - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); -} - -/** - * \test Tests an example from Hill's BSPlib paper in unsafe mode - * \pre P >= 1 - * \return Exit code: 0 - */ -TEST( func_bsplib_example_bsp_sum_unsafemode ) -{ - lpf_err_t rc = lpf_exec( LPF_ROOT, LPF_MAX_P, spmd, LPF_NO_ARGS); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - return 0; -} - diff --git a/tests/functional/func_bsplib_hpsend_many.c b/tests/functional/func_bsplib_hpsend_many.c deleted file mode 100644 index fc1f5089..00000000 --- a/tests/functional/func_bsplib_hpsend_many.c +++ /dev/null @@ -1,131 +0,0 @@ - -/* - * Copyright 2021 Huawei Technologies Co., Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include "Test.h" - -#include -#include - -void spmd( lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args) -{ - (void) args; // ignore any arguments passed through call to lpf_exec - - bsplib_err_t rc = BSPLIB_SUCCESS; - - bsplib_t bsplib; - size_t maxhpregs = (size_t) -1; - - const int pthread = 1, mpirma = 2, mpimsg = 3, hybrid = 4, ibverbs=5; - (void) pthread; (void) mpirma; (void) mpimsg; (void) hybrid; (void) ibverbs; - LPFLIB_IGNORE_TAUTOLOGIES - if (LPF_CORE_IMPL_ID == mpirma ) - { - maxhpregs = 10; // because MPI RMA only supports a limited number - // of memory registrations - } - LPFLIB_RESTORE_WARNINGS - - rc = bsplib_create( lpf, pid, nprocs, 1, maxhpregs, &bsplib); - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); - - int i, j; - size_t size; - const int m = 1000; - const int n = m*(m+1)/2; - uint32_t * memory = malloc( 2 + n *sizeof(uint32_t) ); - uint32_t *array = memory + 2; - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); - - for ( i = 0; i < n; ++i ) - { - memory[i] = 0xAAAAAAAAu; - } - - uint32_t value[m]; - for (i = 0; i < m; ++i) - { - value[i] = 0x12345678; - } - - size = bsplib_set_tagsize( bsplib, sizeof(j)); - EXPECT_EQ( "%zu", (size_t) 0, size); - - rc = bsplib_sync(bsplib); - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); - - - for (i = 1, j=0; i <= m; j += i, ++i) { - rc = bsplib_hpsend(bsplib, - ( bsplib_pid(bsplib) + 1 ) % bsplib_nprocs(bsplib), - &j, value, i*sizeof( uint32_t ) ); - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); - } - - - rc = bsplib_sync(bsplib); - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); - - const void * tag, *payload; - for ( i = 1; i <= m; ++i) { - size = bsplib_hpmove( bsplib, &tag, &payload); - EXPECT_NE("%zu", (size_t) -1, size ); - memcpy( &j, tag, sizeof(j)); - double size_approx = (1 + sqrt(1 + 8*j))/2; - size_t k = (size_t) (size_approx + 0.5*(1.0 - 1e-15)); - - EXPECT_EQ("%zu", k*sizeof(uint32_t), size ); - memcpy( array + j, payload, sizeof(uint32_t)*k); - } - size =bsplib_hpmove( bsplib, &tag, &payload); - EXPECT_EQ( "%zu", (size_t) -1, size ); - - for ( i = 0; i < n; ++i ) - { - if ( i < 2) - { - EXPECT_EQ( "%u", 0xAAAAAAAAu, memory[i] ); - } - else - { - EXPECT_EQ( "%u", 0x12345678u, memory[i] ); - } - } - - for ( i = 0; i < m; ++i ) { - EXPECT_EQ( "%u", 0x12345678u, value[i] ); - } - - rc = bsplib_destroy( bsplib); - EXPECT_EQ( "%d", BSPLIB_SUCCESS, rc ); - - free(memory); -} - -/** - * \test Tests sending a lot of messages through bsp_hpsend - * \pre P >= 1 - * \return Exit code: 0 - */ -TEST( func_bsplib_hpsend_many) -{ - lpf_err_t rc = lpf_exec( LPF_ROOT, LPF_MAX_P, spmd, LPF_NO_ARGS); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - return 0; -} - diff --git a/tests/functional/func_lpf_compare_and_swap.ibverbs.c b/tests/functional/func_lpf_compare_and_swap.ibverbs.c new file mode 100644 index 00000000..b4d84773 --- /dev/null +++ b/tests/functional/func_lpf_compare_and_swap.ibverbs.c @@ -0,0 +1,86 @@ + +/* + * Copyright 2021 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "Test.h" + +void spmd( lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args) +{ + (void) args; // ignore args parameter + lpf_err_t rc = LPF_SUCCESS; + + // local x is the compare-and-swap value and is important at non-root + uint64_t localSwap = 0ULL; + // global y is the global slot at 0, and should be initialized to 0ULL + uint64_t globalSwap = 0ULL; + int x = 0; + int y = 0; + lpf_memslot_t localSwapSlot = LPF_INVALID_MEMSLOT; + lpf_memslot_t globalSwapSlot = LPF_INVALID_MEMSLOT; + size_t maxmsgs = 2 , maxregs = 2; + rc = lpf_resize_message_queue( lpf, maxmsgs); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_resize_memory_register( lpf, maxregs ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_sync( lpf, LPF_SYNC_DEFAULT ); + lpf_memslot_t xslot = LPF_INVALID_MEMSLOT; + lpf_memslot_t yslot = LPF_INVALID_MEMSLOT; + rc = lpf_register_local( lpf, &localSwap, sizeof(localSwap), &localSwapSlot ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_register_local( lpf, &x, sizeof(x), &xslot ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_register_global( lpf, &globalSwap, sizeof(globalSwap), &globalSwapSlot ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_register_global( lpf, &y, sizeof(y), &yslot ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_sync( lpf, LPF_SYNC_DEFAULT); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + + + // BLOCKING + rc = lpf_lock_slot(lpf, localSwapSlot, 0, 0 /* rank where global slot to lock resides*/, globalSwapSlot, 0, sizeof(globalSwapSlot), LPF_MSG_DEFAULT); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_get( lpf, 0, yslot, 0, xslot, 0, sizeof(x), LPF_MSG_DEFAULT ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_sync_per_slot( lpf, LPF_SYNC_DEFAULT, xslot); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + x = x + 1; + rc = lpf_put( lpf, xslot, 0, 0, yslot, 0, sizeof(x), LPF_MSG_DEFAULT ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_sync_per_slot( lpf, LPF_SYNC_DEFAULT, xslot); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + // BLOCKING + lpf_unlock_slot(lpf, localSwapSlot, 0, 0 /* rank where global slot to lock resides*/, globalSwapSlot, 0, sizeof(globalSwapSlot), LPF_MSG_DEFAULT); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + lpf_sync(lpf, LPF_MSG_DEFAULT); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + if (pid == 0) + printf("Rank %d: y = %d\n", pid, y); +} + +/** + * \test Test atomic compare-and-swap on a global slot + * \pre P >= 1 + * \return Exit code: 0 + */ +TEST( func_lpf_compare_and_swap ) +{ + lpf_err_t rc = lpf_exec( LPF_ROOT, LPF_MAX_P, spmd, LPF_NO_ARGS); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + return 0; +} diff --git a/tests/functional/func_lpf_probe_parallel_nested.c b/tests/functional/func_lpf_probe_parallel_nested.c deleted file mode 100644 index bacafad8..00000000 --- a/tests/functional/func_lpf_probe_parallel_nested.c +++ /dev/null @@ -1,208 +0,0 @@ - -/* - * Copyright 2021 Huawei Technologies Co., Ltd. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include "Test.h" - -void spmd2( lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args) -{ - (void) args; // ignore any arguments passed through call to lpf_exec - - lpf_err_t rc = lpf_resize_message_queue( lpf, nprocs); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - rc = lpf_resize_memory_register( lpf, 1 ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - rc = lpf_sync(lpf, LPF_SYNC_DEFAULT ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - - lpf_machine_t machine[3] = { LPF_INVALID_MACHINE, LPF_INVALID_MACHINE, LPF_INVALID_MACHINE }; - lpf_memslot_t machineSlot = LPF_INVALID_MEMSLOT ; - rc = lpf_register_global( lpf, &machine[0], sizeof(machine), &machineSlot ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - rc = lpf_sync(lpf, LPF_SYNC_DEFAULT ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - if ( 0 == pid ) - { - machine[0] = ((lpf_machine_t * ) args.input)[0]; - machine[1] = ((lpf_machine_t * ) args.input)[1]; - EXPECT_EQ( "%zd", args.input_size, 2*sizeof(lpf_machine_t) ); - } - else - { - // broadcast machine info - rc = lpf_get( lpf, 0, machineSlot, 0, machineSlot, 0, 2*sizeof(machine[0]), LPF_MSG_DEFAULT ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - } - rc = lpf_sync(lpf, LPF_SYNC_DEFAULT ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - - - rc = lpf_probe( lpf, &machine[2] ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - - EXPECT_EQ( "%u", machine[0].p, machine[1].p ); - EXPECT_EQ( "%u", machine[0].p, machine[2].p ); - EXPECT_EQ( "%u", 1u, machine[2].free_p ); - EXPECT_LT( "%g", 0.0, (*(machine[2].g))(1, 0, LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(machine[2].l))(1, 0, LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(machine[2].g))(machine[0].p, 0, LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(machine[2].l))(machine[0].p, 0, LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(machine[2].g))(machine[0].p, (size_t)(-1), LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(machine[2].l))(machine[0].p, (size_t)(-1), LPF_SYNC_DEFAULT) ); - - rc = lpf_deregister( lpf, machineSlot ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); -} - -void spmd1( lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args) -{ - (void) args; // ignore any arguments passed through call to lpf_exec - - lpf_pid_t p = 0; - lpf_machine_t subMachine; - lpf_err_t rc = lpf_probe( lpf, &subMachine ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - - rc = lpf_resize_message_queue( lpf, nprocs); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - rc = lpf_resize_memory_register( lpf, 1 ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - rc = lpf_sync(lpf, LPF_SYNC_DEFAULT ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - - lpf_machine_t machine ; - lpf_memslot_t machineSlot = LPF_INVALID_MEMSLOT ; - rc = lpf_register_global( lpf, &machine, sizeof(machine), &machineSlot ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - rc = lpf_sync(lpf, LPF_SYNC_DEFAULT ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - if ( 0 == pid ) - { - machine = * ( lpf_machine_t * ) args.input; - EXPECT_EQ( "%zd", args.input_size, sizeof(lpf_machine_t) ); - } - else - { - // broadcast machine info - rc = lpf_get( lpf, 0, machineSlot, 0, machineSlot, 0, sizeof(machine), LPF_MSG_DEFAULT ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - } - rc = lpf_sync(lpf, LPF_SYNC_DEFAULT ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - rc = lpf_deregister( lpf, machineSlot ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - - - // Do some checks - EXPECT_EQ( "%u", nprocs, subMachine.p / 2 ); - EXPECT_EQ( "%u", nprocs, machine.p / 2 ); - EXPECT_LT( "%g", 0.0, (*(subMachine.g))(1, 0, LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(subMachine.l))(1, 0, LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(subMachine.g))(machine.p, 0, LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(subMachine.l))(machine.p, 0, LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(subMachine.g))(machine.p, (size_t)(-1), LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(subMachine.l))(machine.p, (size_t)(-1), LPF_SYNC_DEFAULT) ); - - const int pthread = 1, mpirma = 1, mpimsg = 1, hybrid = 0, ibverbs=1; - (void) pthread; (void) mpirma; (void) mpimsg; (void) hybrid; (void) ibverbs; - if (LPF_CORE_IMPL_ID) // this part is disabled for the hybrid implementation, because - { // that one doesn't do generic nesting of lpf_exec's - EXPECT_EQ( "%d", 1, subMachine.free_p == 2 || subMachine.free_p == 3 ); - - // compute the sum of all 'free_p' values - lpf_pid_t * vec = malloc(sizeof(lpf_pid_t)*nprocs); - EXPECT_NE( "%p", NULL, vec ); - vec[ pid ] = subMachine.free_p; - - lpf_memslot_t vecSlot = LPF_INVALID_MEMSLOT; - rc = lpf_register_global( lpf, vec, sizeof(lpf_pid_t)*nprocs, &vecSlot); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - rc = lpf_sync( lpf, LPF_SYNC_DEFAULT ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - for (p = 0 ; p < nprocs; ++p) - { - if ( pid != p ) - { - rc = lpf_put( lpf, - vecSlot, pid*sizeof(vec[0]), - p, vecSlot, pid*sizeof(vec[0]), sizeof(vec[0]), LPF_MSG_DEFAULT ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - } - } - rc = lpf_sync( lpf, LPF_SYNC_DEFAULT ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - rc = lpf_deregister( lpf, vecSlot ); - lpf_pid_t sum = 0; - for (p = 0; p < nprocs; ++p) - { - sum += vec[p]; - } - EXPECT_EQ( "%u", sum, machine.p ); - EXPECT_EQ( "%u", sum, subMachine.p ); - - free(vec); - } - - // When running this spmd1 section, only half of the processes was started - // This time we try to run spmd2 with a number of processes depending on the - // pid. Of course, only max free_p processes are started. - lpf_machine_t multiMachine[2] = { machine, subMachine }; - args.input = multiMachine; - args.input_size = sizeof(multiMachine); - rc = lpf_exec( lpf, pid + 3, &spmd2, args ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); -} - - -/** - * \test Test lpf_probe function on a parallel section where all processes are used immediately. - * \note Extra lpfrun parameters: -probe 1.0 - * \pre P >= 2 - * \return Exit code: 0 - */ -TEST( func_lpf_probe_parallel_nested ) -{ - lpf_err_t rc = LPF_SUCCESS; - - lpf_machine_t machine; - - rc = lpf_probe( LPF_ROOT, &machine ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - - EXPECT_LE( "%u", 1u, machine.p ); - EXPECT_LE( "%u", 1u, machine.free_p ); - EXPECT_LE( "%u", machine.p, machine.free_p ); - EXPECT_LT( "%g", 0.0, (*(machine.g))(1, 0, LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(machine.l))(1, 0, LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(machine.g))(machine.p, 0, LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(machine.l))(machine.p, 0, LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(machine.g))(machine.p, (size_t)(-1), LPF_SYNC_DEFAULT) ); - EXPECT_LT( "%g", 0.0, (*(machine.l))(machine.p, (size_t)(-1), LPF_SYNC_DEFAULT) ); - - lpf_args_t args; - args.input = &machine; - args.input_size = sizeof(machine); - args.output = NULL; - args.output_size = 0; - args.f_symbols = NULL; - args.f_size = 0; - - rc = lpf_exec( LPF_ROOT, machine.p / 2, &spmd1, args ); - EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - - return 0; -} diff --git a/tests/functional/func_lpf_put_parallel_single.c b/tests/functional/func_lpf_put_parallel_single.c index 9fa85d84..78794fcf 100644 --- a/tests/functional/func_lpf_put_parallel_single.c +++ b/tests/functional/func_lpf_put_parallel_single.c @@ -38,7 +38,6 @@ void spmd( lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args) EXPECT_EQ( "%d", LPF_SUCCESS, rc ); rc = lpf_register_global( lpf, &y, sizeof(y), &yslot ); EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - rc = lpf_sync( lpf, LPF_SYNC_DEFAULT); EXPECT_EQ( "%d", LPF_SUCCESS, rc );