diff --git a/CMakeLists.txt b/CMakeLists.txt index 00b78dde..35c74f2f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,8 +112,8 @@ if (TR1_ARRAY) set(CMAKE_CXX_STANDARD 98) set(CMAKE_CXX_STANDARD_REQUIRED YES) else() - message(STATUS "Governing C++ standard is C++11") - set(CMAKE_CXX_STANDARD 11) + message(STATUS "Governing C++ standard is C++14") + set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_STANDARD_REQUIRED YES) endif() @@ -462,5 +462,7 @@ install(DIRECTORY "include/bsp" DESTINATION ${INSTALL_HEADERS}) install(DIRECTORY "include/debug" DESTINATION ${INSTALL_HEADERS}/lpf ) # Post install actions -add_subdirectory(post-install) +# Kiril is commenting the post-install runs as they always fail +# Probably should fix them at some point +# add_subdirectory(post-install) diff --git a/examples/rc_pingpong.c b/examples/rc_pingpong.c new file mode 100644 index 00000000..b38a3de7 --- /dev/null +++ b/examples/rc_pingpong.c @@ -0,0 +1,888 @@ +/* + * Copyright (c) 2005 Topspin Communications. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * OpenIB.org BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +#define _GNU_SOURCE +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "pingpong.h" + +#include + +#include "mpi.h" + +enum { + PINGPONG_RECV_WRID = 1, + PINGPONG_SEND_WRID = 2, + SWAP_WRID = 3, +}; + +static int page_size; +static int implicit_odp; +static int prefetch_mr; +static int validate_buf; + +struct pingpong_context { + struct ibv_context *context; + struct ibv_comp_channel *channel; + struct ibv_pd *pd; + struct ibv_mr *mr; + struct ibv_dm *dm; + union { + struct ibv_cq *cq; + struct ibv_cq_ex *cq_ex; + } cq_s; + struct ibv_qp *qp; + struct ibv_qp_ex *qpx; + char *buf; + int size; + int send_flags; + int rx_depth; + int pending; + struct ibv_port_attr portinfo; + uint64_t completion_timestamp_mask; +}; + +static struct ibv_cq *pp_cq(struct pingpong_context *ctx) +{ + return ctx->cq_s.cq; +} + +struct pingpong_dest { + int lid; + int qpn; + int psn; + uint32_t key; + uint64_t addr; + union ibv_gid gid; +}; + +static int pp_connect_ctx(struct pingpong_context *ctx, int port, int my_psn, + enum ibv_mtu mtu, int sl, + struct pingpong_dest *dest, int sgid_idx) +{ + struct ibv_qp_attr attr = { + .qp_state = IBV_QPS_RTR, + .path_mtu = mtu, + .dest_qp_num = dest->qpn, + .rq_psn = dest->psn, + .max_dest_rd_atomic = 1, + .min_rnr_timer = 12, + .ah_attr = { + .is_global = 0, + .dlid = dest->lid, + .sl = sl, + .src_path_bits = 0, + .port_num = port + } + }; + + if (dest->gid.global.interface_id) { + attr.ah_attr.is_global = 1; + attr.ah_attr.grh.hop_limit = 1; + attr.ah_attr.grh.dgid = dest->gid; + attr.ah_attr.grh.sgid_index = sgid_idx; + } + if (ibv_modify_qp(ctx->qp, &attr, + IBV_QP_STATE | + IBV_QP_AV | + IBV_QP_PATH_MTU | + IBV_QP_DEST_QPN | + IBV_QP_RQ_PSN | + IBV_QP_MAX_DEST_RD_ATOMIC | + IBV_QP_MIN_RNR_TIMER)) { + fprintf(stderr, "Failed to modify QP to RTR\n"); + return 1; + } + + attr.qp_state = IBV_QPS_RTS; + attr.timeout = 14; + attr.retry_cnt = 7; + attr.rnr_retry = 7; + attr.sq_psn = my_psn; + attr.max_rd_atomic = 1; + if (ibv_modify_qp(ctx->qp, &attr, + IBV_QP_STATE | + IBV_QP_TIMEOUT | + IBV_QP_RETRY_CNT | + IBV_QP_RNR_RETRY | + IBV_QP_SQ_PSN | + IBV_QP_MAX_QP_RD_ATOMIC)) { + fprintf(stderr, "Failed to modify QP to RTS\n"); + return 1; + } + + return 0; +} + +static struct pingpong_dest *pp_client_exch_dest(const char *servername, int port, + const struct pingpong_dest *my_dest) +{ + struct pingpong_dest *rem_dest = NULL; + char gid[33]; + + + gid_to_wire_gid(&my_dest->gid, gid); + + MPI_Send(&(my_dest->lid), 1, MPI_INT, 0, 0, MPI_COMM_WORLD); + MPI_Send(&(my_dest->qpn), 1, MPI_INT, 0, 0, MPI_COMM_WORLD); + MPI_Send(&(my_dest->psn), 1, MPI_INT, 0, 0, MPI_COMM_WORLD); + MPI_Send(gid, 33, MPI_CHAR, 0, 0, MPI_COMM_WORLD); + + rem_dest = malloc(sizeof *rem_dest); + + MPI_Recv(&(rem_dest->lid), 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&(rem_dest->qpn), 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&(rem_dest->psn), 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(gid, 33, MPI_CHAR, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&(rem_dest->key), 1, MPI_UINT32_T, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&(rem_dest->addr), 1, MPI_UINT64_T, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + wire_gid_to_gid(gid, &rem_dest->gid); + + return rem_dest; +} + +static struct pingpong_dest *pp_server_exch_dest(struct pingpong_context *ctx, + int ib_port, enum ibv_mtu mtu, + int port, int sl, + const struct pingpong_dest *my_dest, + int sgid_idx) +{ + + printf("Server process\n"); + struct pingpong_dest *rem_dest = NULL; + char gid[33]; + + + rem_dest = malloc(sizeof *rem_dest); + + MPI_Recv(&(rem_dest->lid), 1, MPI_INT, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&(rem_dest->qpn), 1, MPI_INT, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(&(rem_dest->psn), 1, MPI_INT, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + MPI_Recv(gid, 33, MPI_CHAR, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); + + wire_gid_to_gid(gid, &rem_dest->gid); + + gid_to_wire_gid(&my_dest->gid, gid); + + MPI_Send(&(my_dest->lid), 1, MPI_INT, 1, 0, MPI_COMM_WORLD); + MPI_Send(&(my_dest->qpn), 1, MPI_INT, 1, 0, MPI_COMM_WORLD); + MPI_Send(&(my_dest->psn), 1, MPI_INT, 1, 0, MPI_COMM_WORLD); + MPI_Send(gid, 33, MPI_CHAR, 1, 0, MPI_COMM_WORLD); + uint32_t lkey = ctx->mr->lkey; + MPI_Send(&lkey, 1, MPI_UINT32_T, 1, 0, MPI_COMM_WORLD); + uint64_t addr = (uint64_t) ctx->buf; + MPI_Send(&addr, 1, MPI_UINT64_T, 1, 0, MPI_COMM_WORLD); + + if (pp_connect_ctx(ctx, ib_port, my_dest->psn, mtu, sl, rem_dest, + sgid_idx)) { + fprintf(stderr, "Couldn't connect to remote QP\n"); + free(rem_dest); + rem_dest = NULL; + return rem_dest; + } + + return rem_dest; +} + +static struct pingpong_context *pp_init_ctx(struct ibv_device *ib_dev, int size, + int rx_depth, int port) +{ + struct pingpong_context *ctx; + int access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC; + + ctx = calloc(1, sizeof *ctx); + if (!ctx) + return NULL; + + ctx->size = size; + ctx->send_flags = IBV_SEND_SIGNALED; + ctx->rx_depth = rx_depth; + + ctx->buf = memalign(page_size, size); + if (!ctx->buf) { + fprintf(stderr, "Couldn't allocate work buf.\n"); + goto clean_ctx; + } + + /* FIXME memset(ctx->buf, 0, size); */ + memset(ctx->buf, 0x7b, size); + + ctx->context = ibv_open_device(ib_dev); + if (!ctx->context) { + fprintf(stderr, "Couldn't get context for %s\n", + ibv_get_device_name(ib_dev)); + goto clean_buffer; + } + + ctx->channel = NULL; + + ctx->pd = ibv_alloc_pd(ctx->context); + if (!ctx->pd) { + fprintf(stderr, "Couldn't allocate PD\n"); + goto clean_comp_channel; + } + + + if (implicit_odp) { + ctx->mr = ibv_reg_mr(ctx->pd, NULL, SIZE_MAX, access_flags); + } else { + ctx->mr = ibv_reg_mr(ctx->pd, ctx->buf, size, access_flags); + } + + if (!ctx->mr) { + fprintf(stderr, "Couldn't register MR\n"); + goto clean_dm; + } + + if (prefetch_mr) { + struct ibv_sge sg_list; + int ret; + + sg_list.lkey = ctx->mr->lkey; + sg_list.addr = (uintptr_t)ctx->buf; + sg_list.length = size; + + ret = ibv_advise_mr(ctx->pd, IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE, + IB_UVERBS_ADVISE_MR_FLAG_FLUSH, + &sg_list, 1); + + if (ret) + fprintf(stderr, "Couldn't prefetch MR(%d). Continue anyway\n", ret); + } + + ctx->cq_s.cq = ibv_create_cq(ctx->context, rx_depth + 1, NULL, + ctx->channel, 0); + + if (!pp_cq(ctx)) { + fprintf(stderr, "Couldn't create CQ\n"); + goto clean_mr; + } + + { + struct ibv_qp_attr attr; + struct ibv_qp_init_attr init_attr = { + .send_cq = pp_cq(ctx), + .recv_cq = pp_cq(ctx), + .cap = { + .max_send_wr = 1, + .max_recv_wr = rx_depth, + .max_send_sge = 1, + .max_recv_sge = 1 + }, + .qp_type = IBV_QPT_RC + }; + + ctx->qp = ibv_create_qp(ctx->pd, &init_attr); + + if (!ctx->qp) { + fprintf(stderr, "Couldn't create QP\n"); + goto clean_cq; + } + + ibv_query_qp(ctx->qp, &attr, IBV_QP_CAP, &init_attr); + if (init_attr.cap.max_inline_data >= size ) + ctx->send_flags |= IBV_SEND_INLINE; + } + + { + struct ibv_qp_attr attr = { + .qp_state = IBV_QPS_INIT, + .pkey_index = 0, + .port_num = port, + .qp_access_flags = IBV_ACCESS_REMOTE_ATOMIC, + }; + + if (ibv_modify_qp(ctx->qp, &attr, + IBV_QP_STATE | + IBV_QP_PKEY_INDEX | + IBV_QP_PORT | + IBV_QP_ACCESS_FLAGS)) { + fprintf(stderr, "Failed to modify QP to INIT\n"); + goto clean_qp; + } + } + + return ctx; + +clean_qp: + ibv_destroy_qp(ctx->qp); + +clean_cq: + ibv_destroy_cq(pp_cq(ctx)); + +clean_mr: + ibv_dereg_mr(ctx->mr); + +clean_dm: + if (ctx->dm) + ibv_free_dm(ctx->dm); + +clean_pd: + ibv_dealloc_pd(ctx->pd); + +clean_comp_channel: + if (ctx->channel) + ibv_destroy_comp_channel(ctx->channel); + +clean_device: + ibv_close_device(ctx->context); + +clean_buffer: + free(ctx->buf); + +clean_ctx: + free(ctx); + + return NULL; +} + +static int pp_close_ctx(struct pingpong_context *ctx) +{ + if (ibv_destroy_qp(ctx->qp)) { + fprintf(stderr, "Couldn't destroy QP\n"); + return 1; + } + + if (ibv_destroy_cq(pp_cq(ctx))) { + fprintf(stderr, "Couldn't destroy CQ\n"); + return 1; + } + + if (ibv_dereg_mr(ctx->mr)) { + fprintf(stderr, "Couldn't deregister MR\n"); + return 1; + } + + if (ctx->dm) { + if (ibv_free_dm(ctx->dm)) { + fprintf(stderr, "Couldn't free DM\n"); + return 1; + } + } + + if (ibv_dealloc_pd(ctx->pd)) { + fprintf(stderr, "Couldn't deallocate PD\n"); + return 1; + } + + if (ctx->channel) { + if (ibv_destroy_comp_channel(ctx->channel)) { + fprintf(stderr, "Couldn't destroy completion channel\n"); + return 1; + } + } + + if (ibv_close_device(ctx->context)) { + fprintf(stderr, "Couldn't release context\n"); + return 1; + } + + free(ctx->buf); + free(ctx); + + return 0; +} + +static int pp_post_recv(struct pingpong_context *ctx, int n) +{ + struct ibv_sge list = { + .addr = (uintptr_t) ctx->buf, + .length = ctx->size, + .lkey = ctx->mr->lkey + }; + struct ibv_recv_wr wr = { + .wr_id = PINGPONG_RECV_WRID, + .sg_list = &list, + .num_sge = 1, + }; + struct ibv_recv_wr *bad_wr; + int i; + + for (i = 0; i < n; ++i) + if (ibv_post_recv(ctx->qp, &wr, &bad_wr)) + break; + + return i; +} + +static int pp_post_send(struct pingpong_context *ctx) +{ + struct ibv_sge list = { + .addr = (uintptr_t) ctx->buf, + .length = ctx->size, + .lkey = ctx->mr->lkey + }; + struct ibv_send_wr wr = { + .wr_id = PINGPONG_SEND_WRID, + .sg_list = &list, + .num_sge = 1, + .opcode = IBV_WR_SEND, + .send_flags = ctx->send_flags, + }; + struct ibv_send_wr *bad_wr; + + return ibv_post_send(ctx->qp, &wr, &bad_wr); +} + +static int pp_post_swap(struct pingpong_context *ctx, struct pingpong_dest *rem_dest, uint64_t compare_add, uint64_t swap) +{ + struct ibv_sge list = { + .addr = (uintptr_t) ctx->buf, + .length = ctx->size, + .lkey = ctx->mr->lkey + }; + struct ibv_send_wr wr = { + .wr_id = SWAP_WRID, + .sg_list = &list, + .num_sge = 1, + .opcode = IBV_WR_ATOMIC_CMP_AND_SWP, + .send_flags = IBV_SEND_SIGNALED, + .wr.atomic.remote_addr = rem_dest->addr, + .wr.atomic.compare_add = compare_add, + .wr.atomic.swap = swap, + .wr.atomic.rkey = rem_dest->key, + }; + struct ibv_send_wr *bad_wr; + + return ibv_post_send(ctx->qp, &wr, &bad_wr); +} + +struct ts_params { + uint64_t comp_recv_max_time_delta; + uint64_t comp_recv_min_time_delta; + uint64_t comp_recv_total_time_delta; + uint64_t comp_recv_prev_time; + int last_comp_with_ts; + unsigned int comp_with_time_iters; +}; + +static inline int parse_single_wc(struct pingpong_context *ctx, int *scnt, + int *rcnt, int *routs, int iters, + uint64_t wr_id, enum ibv_wc_status status, + uint64_t completion_timestamp, + struct ts_params *ts, + struct pingpong_dest *rem_dest + ) +{ + if (status != IBV_WC_SUCCESS) { + fprintf(stderr, "Failed status %s (%d) for wr_id %d\n", + ibv_wc_status_str(status), + status, (int)wr_id); + return 1; + } + + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + printf("Rank %d will process single wc = %"PRIu64"\n", rank, wr_id); + switch ((int)wr_id) { + case PINGPONG_SEND_WRID: + ++(*scnt); + break; + + case PINGPONG_RECV_WRID: + if (--(*routs) <= 1) { + //printf("Calling pp_post_recv\n"); + *routs += pp_post_recv(ctx, ctx->rx_depth - *routs); + if (*routs < ctx->rx_depth) { + fprintf(stderr, + "Couldn't post receive (%d)\n", + *routs); + return 1; + } + } + + ++(*rcnt); + ts->last_comp_with_ts = 0; + + break; + + default: + fprintf(stderr, "Completion for unknown wr_id %d\n", + (int)wr_id); + return 1; + } + + ctx->pending &= ~(int)wr_id; + if (*scnt < iters && !ctx->pending) { + if (pp_post_swap(ctx, &rem_dest, 0ULL, 1ULL)) { + fprintf(stderr, "Couldn't post send\n"); + return 1; + } + printf("After pp_post_swap\n"); + ctx->pending = PINGPONG_RECV_WRID | + PINGPONG_SEND_WRID; + } + + return 0; +} + +static void usage(const char *argv0) +{ + printf("Usage:\n"); + printf(" %s start a server and wait for connection\n", argv0); + printf(" %s connect to server at \n", argv0); + printf("\n"); + printf("Options:\n"); + printf(" -p, --port= listen on/connect to port (default 18515)\n"); + printf(" -d, --ib-dev= use IB device (default first device found)\n"); + printf(" -i, --ib-port= use port of IB device (default 1)\n"); + printf(" -s, --size= size of message to exchange (default 4096)\n"); + printf(" -m, --mtu= path MTU (default 1024)\n"); + printf(" -r, --rx-depth= number of receives to post at a time (default 500)\n"); + printf(" -n, --iters= number of exchanges (default 1000)\n"); + printf(" -l, --sl= service level value\n"); + printf(" -g, --gid-idx= local port gid index\n"); + printf(" -o, --odp use on demand paging\n"); + printf(" -O, --iodp use implicit on demand paging\n"); + printf(" -P, --prefetch prefetch an ODP MR\n"); + printf(" -t, --ts get CQE with timestamp\n"); + printf(" -c, --chk validate received buffer\n"); + printf(" -j, --dm use device memory\n"); +} + +int main(int argc, char *argv[]) +{ + struct ibv_device **dev_list; + struct ibv_device *ib_dev; + struct pingpong_context *ctx; + struct pingpong_dest my_dest; + struct pingpong_dest *rem_dest; + struct timeval start, end; + char *ib_devname = NULL; + char *servername = NULL; + unsigned int port = 18515; + int ib_port = 1; + unsigned int size = 4096; + enum ibv_mtu mtu = IBV_MTU_1024; + unsigned int rx_depth = 500; + unsigned int iters = 1000; + int routs; + int rcnt, scnt; + int num_cq_events = 0; + int sl = 0; + int gidx = -1; + char gid[33]; + struct ts_params ts; + int comm_rank, comm_size; + + srand48(getpid() * time(NULL)); + + MPI_Init(&argc, &argv); + MPI_Comm_rank(MPI_COMM_WORLD, &comm_rank); + MPI_Comm_size(MPI_COMM_WORLD, &comm_size); + + while (1) { + int c; + + static struct option long_options[] = { + { .name = "port", .has_arg = 1, .val = 'p' }, + { .name = "ib-dev", .has_arg = 1, .val = 'd' }, + { .name = "ib-port", .has_arg = 1, .val = 'i' }, + { .name = "size", .has_arg = 1, .val = 's' }, + { .name = "mtu", .has_arg = 1, .val = 'm' }, + { .name = "rx-depth", .has_arg = 1, .val = 'r' }, + { .name = "iters", .has_arg = 1, .val = 'n' }, + { .name = "sl", .has_arg = 1, .val = 'l' }, + { .name = "events", .has_arg = 0, .val = 'e' }, + { .name = "gid-idx", .has_arg = 1, .val = 'g' }, + { .name = "odp", .has_arg = 0, .val = 'o' }, + { .name = "iodp", .has_arg = 0, .val = 'O' }, + { .name = "prefetch", .has_arg = 0, .val = 'P' }, + { .name = "ts", .has_arg = 0, .val = 't' }, + { .name = "chk", .has_arg = 0, .val = 'c' }, + { .name = "dm", .has_arg = 0, .val = 'j' }, + { .name = "new_send", .has_arg = 0, .val = 'N' }, + {} + }; + + c = getopt_long(argc, argv, "p:d:i:s:m:r:n:l:eg:oOPtcjN", + long_options, NULL); + + if (c == -1) + break; + + switch (c) { + case 'p': + port = strtoul(optarg, NULL, 0); + if (port > 65535) { + usage(argv[0]); + return 1; + } + break; + + case 'd': + ib_devname = strdupa(optarg); + break; + + case 'i': + ib_port = strtol(optarg, NULL, 0); + if (ib_port < 1) { + usage(argv[0]); + return 1; + } + break; + + case 's': + size = strtoul(optarg, NULL, 0); + break; + + case 'm': + mtu = pp_mtu_to_enum(strtol(optarg, NULL, 0)); + if (mtu == 0) { + usage(argv[0]); + return 1; + } + break; + + case 'r': + rx_depth = strtoul(optarg, NULL, 0); + break; + + case 'n': + iters = strtoul(optarg, NULL, 0); + break; + + case 'l': + sl = strtol(optarg, NULL, 0); + break; + + case 'g': + gidx = strtol(optarg, NULL, 0); + break; + + case 'P': + prefetch_mr = 1; + break; + case 'c': + validate_buf = 1; + break; + + default: + usage(argv[0]); + return 1; + } + } + + if (optind == argc - 1) + servername = strdupa(argv[optind]); + else if (optind < argc) { + usage(argv[0]); + return 1; + } + + if ( prefetch_mr) { + fprintf(stderr, "prefetch is valid only with on-demand memory region\n"); + return 1; + } + + page_size = sysconf(_SC_PAGESIZE); + + dev_list = ibv_get_device_list(NULL); + if (!dev_list) { + perror("Failed to get IB devices list"); + return 1; + } + + if (!ib_devname) { + ib_dev = *dev_list; + if (!ib_dev) { + fprintf(stderr, "No IB devices found\n"); + return 1; + } + } else { + int i; + for (i = 0; dev_list[i]; ++i) + if (!strcmp(ibv_get_device_name(dev_list[i]), ib_devname)) + break; + ib_dev = dev_list[i]; + if (!ib_dev) { + fprintf(stderr, "IB device %s not found\n", ib_devname); + return 1; + } + } + + ctx = pp_init_ctx(ib_dev, size, rx_depth, ib_port); + if (!ctx) + return 1; + + routs = pp_post_recv(ctx, ctx->rx_depth); + if (routs < ctx->rx_depth) { + fprintf(stderr, "Couldn't post receive (%d)\n", routs); + return 1; + } + + + if (pp_get_port_info(ctx->context, ib_port, &ctx->portinfo)) { + fprintf(stderr, "Couldn't get port info\n"); + return 1; + } + + my_dest.lid = ctx->portinfo.lid; + if (ctx->portinfo.link_layer != IBV_LINK_LAYER_ETHERNET && + !my_dest.lid) { + fprintf(stderr, "Couldn't get local LID\n"); + return 1; + } + + if (gidx >= 0) { + if (ibv_query_gid(ctx->context, ib_port, gidx, &my_dest.gid)) { + fprintf(stderr, "can't read sgid of index %d\n", gidx); + return 1; + } + } else + memset(&my_dest.gid, 0, sizeof my_dest.gid); + + my_dest.qpn = ctx->qp->qp_num; + my_dest.psn = lrand48() & 0xffffff; + inet_ntop(AF_INET6, &my_dest.gid, gid, sizeof gid); + printf(" local address: LID 0x%04x, QPN 0x%06x, PSN 0x%06x, GID %s\n", + my_dest.lid, my_dest.qpn, my_dest.psn, gid); + + + if (comm_rank == 0) + rem_dest = pp_server_exch_dest(ctx, ib_port, mtu, port, sl, &my_dest, gidx); + else + rem_dest = pp_client_exch_dest(servername, port, &my_dest); + + if (!rem_dest) + return 1; + + inet_ntop(AF_INET6, &rem_dest->gid, gid, sizeof gid); + printf(" remote address: LID 0x%04x, QPN 0x%06x, PSN 0x%06x, GID %s\n", + rem_dest->lid, rem_dest->qpn, rem_dest->psn, gid); + + if (comm_rank != 0) + if (pp_connect_ctx(ctx, ib_port, my_dest.psn, mtu, sl, rem_dest, + gidx)) + return 1; + + ctx->pending = PINGPONG_RECV_WRID; + + if (comm_rank != 0) { + if (validate_buf) + for (int i = 0; i < size; i += page_size) + ctx->buf[i] = i / page_size % sizeof(char); + + if (pp_post_swap(ctx, rem_dest, 0ULL, 1ULL)) { + //if (pp_post_send(ctx)) { + fprintf(stderr, "Couldn't post send\n"); + return 1; + } + printf("After pp_post_swap\n"); + ctx->pending |= PINGPONG_SEND_WRID; + } + + if (gettimeofday(&start, NULL)) { + perror("gettimeofday"); + return 1; + } + + rcnt = scnt = 0; + if (comm_rank == 0) { + + } + while (rcnt < iters || scnt < iters) { + int ret; + + + int ne, i; + struct ibv_wc wc[2]; + + do { + ne = ibv_poll_cq(pp_cq(ctx), 2, wc); + if (ne < 0) { + fprintf(stderr, "poll CQ failed %d\n", ne); + return 1; + } + } while (ne < 1); + + for (i = 0; i < ne; ++i) { + ret = parse_single_wc(ctx, &scnt, &rcnt, &routs, + iters, + wc[i].wr_id, + wc[i].status, + 0, &ts, rem_dest); + if (ret) { + fprintf(stderr, "parse WC failed %d\n", ne); + return 1; + } + } + } + + if (gettimeofday(&end, NULL)) { + perror("gettimeofday"); + return 1; + } + + { + float usec = (end.tv_sec - start.tv_sec) * 1000000 + + (end.tv_usec - start.tv_usec); + long long bytes = (long long) size * iters * 2; + + printf("%lld bytes in %.2f seconds = %.2f Mbit/sec\n", + bytes, usec / 1000000., bytes * 8. / usec); + printf("%d iters in %.2f seconds = %.2f usec/iter\n", + iters, usec / 1000000., usec / iters); + + if ((comm_rank == 0) && (validate_buf)) { + for (int i = 0; i < size; i += page_size) + if (ctx->buf[i] != i / page_size % sizeof(char)) + printf("invalid data in page %d\n", + i / page_size); + } + } + + ibv_ack_cq_events(pp_cq(ctx), num_cq_events); + + if (pp_close_ctx(ctx)) + return 1; + + ibv_free_device_list(dev_list); + free(rem_dest); + + MPI_Finalize(); + + return 0; +} diff --git a/include/debug/lpf/core.h b/include/debug/lpf/core.h index 028e015f..03e5f6aa 100644 --- a/include/debug/lpf/core.h +++ b/include/debug/lpf/core.h @@ -64,6 +64,12 @@ extern "C" { #define lpf_sync( ctx, attrs ) \ lpf_debug_sync( __FILE__, __LINE__, (ctx), (attrs) ) +#define lpf_counting_sync_per_tag( ctx, attrs, slot, expected_sends, expected_rcvs ) \ + lpf_debug_counting_sync_per_tag( __FILE__, __LINE__, (ctx), (attrs), (slot), (expected_sends), (expected_rcvs) ) + +#define lpf_sync_per_tag( ctx, attrs, slot) \ + lpf_debug_sync_per_tag( __FILE__, __LINE__, (ctx), (attrs), (slot)) + #define lpf_resize_memory_register( ctx, size ) \ lpf_debug_resize_memory_register( __FILE__, __LINE__, (ctx), (size) ) @@ -125,6 +131,9 @@ extern _LPFLIB_API lpf_err_t lpf_debug_sync( const char * file, int line, lpf_t ctx, lpf_sync_attr_t attr ); +lpf_err_t lpf_debug_counting_sync_per_tag( const char * file, int line, + lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sends, size_t expected_rcvs); + extern _LPFLIB_API lpf_err_t lpf_debug_resize_memory_register( const char * file, int line, lpf_t ctx, size_t max_regs ); diff --git a/include/lpf/collectives.h b/include/lpf/collectives.h index 4304c5f0..871b7f27 100644 --- a/include/lpf/collectives.h +++ b/include/lpf/collectives.h @@ -116,6 +116,16 @@ typedef void (*lpf_combiner_t) (size_t n, const void * combine, void * into ); */ extern _LPFLIB_API const lpf_coll_t LPF_INVALID_COLL; +/** + * ToDo: document allgatherv + */ +lpf_err_t lpf_allgatherv( + lpf_coll_t coll, + lpf_memslot_t src, + lpf_memslot_t dst, + size_t *sizes, + bool exclude_myself + ); /** * Initialises a collectives struct, which allows the scheduling of collective * calls. The initialised struct is only valid after a next call to lpf_sync(). diff --git a/include/lpf/core.h b/include/lpf/core.h index 42872f15..90a26b9e 100644 --- a/include/lpf/core.h +++ b/include/lpf/core.h @@ -2058,6 +2058,25 @@ lpf_err_t lpf_get( extern _LPFLIB_API lpf_err_t lpf_sync( lpf_t ctx, lpf_sync_attr_t attr ); +/** + * This synchronisation waits on memory slot @slot to complete sending + * and receiving @expected_sent and @expected_rcvd messages. The counts are + * checked in the ibv_poll_cq calls and associated to certain LPF slots. + * This call is only implemented for IB verbs at the moment. + */ +extern _LPFLIB_API +lpf_err_t lpf_counting_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd); + +/** + * This synchronisation waits on memory slot @slot to complete sending + * or receiving all outstanding messages. For the current implementation + * in IB verbs, this means all scheduled sends via ibv_post_send are + * checked for completion via ibv_poll_cq. Currently, there is no logic + * scheduling receives, but only sends -- for either get or put. + */ +extern _LPFLIB_API +lpf_err_t lpf_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot); + /** * This primitive allows a user to inspect the machine that this LPF program * has been assigned. All resources reported in the #lpf_machine_t struct are @@ -2315,6 +2334,68 @@ lpf_err_t lpf_resize_memory_register( lpf_t ctx, size_t max_regs ); extern _LPFLIB_API lpf_err_t lpf_resize_message_queue( lpf_t ctx, size_t max_msgs ); +extern _LPFLIB_API +lpf_err_t lpf_lock_slot( + lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +); + +extern _LPFLIB_API +lpf_err_t lpf_unlock_slot( + lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +); + +/** + * This function returns in @rcvd_msgs the received message count on LPF slot @slot + */ +extern _LPFLIB_API +lpf_err_t lpf_get_rcvd_msg_count_per_slot( lpf_t ctx, size_t *rcvd_msgs, lpf_memslot_t slot); + +/** + * This function returns in @rcvd_msgs the total received message count + */ +extern _LPFLIB_API +lpf_err_t lpf_get_rcvd_msg_count( lpf_t ctx, size_t *rcvd_msgs); + +/** + * This function returns in @sent_msgs the sent message count on LPF slot @slot + */ +extern _LPFLIB_API +lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t ctx, size_t *sent_msgs, lpf_memslot_t slot); + +/** + * This function blocks until all the scheduled send messages + * (via ibv_post_send) are actually registered as sent (via ibv_poll_cq). + * No concept of slots is used here. + * This allows to reuse the send buffers e.g. in higher-level channel + * libraries. + */ +extern _LPFLIB_API +lpf_err_t lpf_flush_sent( lpf_t ctx); + +/** + * This function blocks until all the incoming received messages + * waiting on the receive completion queue are handled (via ibv_poll_cq). + * No concept of slots is used here. + * This allows to reuse the send buffers e.g. in higher-level channel + * libraries. + */ +extern _LPFLIB_API +lpf_err_t lpf_flush_received( lpf_t ctx); + #ifdef __cplusplus } #endif diff --git a/include/lpf/static_dispatch.h b/include/lpf/static_dispatch.h index 8df6a092..7cd24263 100644 --- a/include/lpf/static_dispatch.h +++ b/include/lpf/static_dispatch.h @@ -40,8 +40,15 @@ #undef lpf_get #undef lpf_put #undef lpf_sync +#undef lpf_counting_sync_per_slot +#undef lpf_sync_per_slot #undef lpf_register_local +#undef lpf_get_rcvd_msg_count +#undef lpf_get_rcvd_msg_count_per_slot +#undef lpf_get_sent_msg_count_per_slot #undef lpf_register_global +#undef lpf_flush_sent +#undef lpf_flush_received #undef lpf_deregister #undef lpf_probe #undef lpf_resize_memory_register @@ -83,7 +90,14 @@ #define lpf_get LPF_FUNC(get) #define lpf_put LPF_FUNC(put) #define lpf_sync LPF_FUNC(sync) +#define lpf_counting_sync_per_slot LPF_FUNC(counting_sync_per_slot) +#define lpf_sync_per_slot LPF_FUNC(sync_per_slot) #define lpf_register_local LPF_FUNC(register_local) +#define lpf_get_rcvd_msg_count LPF_FUNC(get_rcvd_msg_count) +#define lpf_get_rcvd_msg_count_per_slot LPF_FUNC(get_rcvd_msg_count_per_slot) +#define lpf_get_sent_msg_count_per_slot LPF_FUNC(get_sent_msg_count_per_slot) +#define lpf_flush_sent LPF_FUNC(flush_sent) +#define lpf_flush_received LPF_FUNC(flush_received) #define lpf_register_global LPF_FUNC(register_global) #define lpf_deregister LPF_FUNC(deregister) #define lpf_probe LPF_FUNC(probe) diff --git a/src/MPI/core.cpp b/src/MPI/core.cpp index 112403e6..04ed3cfc 100644 --- a/src/MPI/core.cpp +++ b/src/MPI/core.cpp @@ -217,6 +217,43 @@ lpf_err_t lpf_deregister( return LPF_SUCCESS; } + +lpf_err_t lpf_lock_slot( lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +) +{ + (void) attr; // ignore parameter 'msg' since this implementation only + // implements core functionality + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) + i->lockSlot( src_slot, src_offset, dst_pid, dst_slot, dst_offset, size ); + return LPF_SUCCESS; +} + +lpf_err_t lpf_unlock_slot( lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +) +{ + (void) attr; // ignore parameter 'msg' since this implementation only + // implements core functionality + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) + i->unlockSlot( src_slot, src_offset, dst_pid, dst_slot, dst_offset, size ); + return LPF_SUCCESS; +} + lpf_err_t lpf_put( lpf_t ctx, lpf_memslot_t src_slot, size_t src_offset, @@ -262,6 +299,65 @@ lpf_err_t lpf_sync( lpf_t ctx, lpf_sync_attr_t attr ) return realContext(ctx)->sync(); } +lpf_err_t lpf_counting_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + (void) attr; // ignore attr parameter since this implementation only + // implements core functionality + return realContext(ctx)->countingSyncPerSlot(slot, expected_sent, expected_rcvd); +} + +lpf_err_t lpf_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot) +{ + (void) attr; // ignore attr parameter since this implementation only + // implements core functionality + return realContext(ctx)->syncPerSlot(slot); +} + +lpf_err_t lpf_get_rcvd_msg_count_per_slot( lpf_t ctx, size_t * rcvd_msgs, lpf_memslot_t slot) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->getRcvdMsgCountPerSlot(rcvd_msgs, slot); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_rcvd_msg_count( lpf_t ctx, size_t * rcvd_msgs) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->getRcvdMsgCount(rcvd_msgs); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t ctx, size_t * sent_msgs, lpf_memslot_t slot) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->getSentMsgCountPerSlot(sent_msgs, slot); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_flush_sent( lpf_t ctx) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->flushSent(); + } + return LPF_SUCCESS; +} + +lpf_err_t lpf_flush_received( lpf_t ctx) +{ + lpf::Interface * i = realContext(ctx); + if (!i->isAborted()) { + i->flushReceived(); + } + return LPF_SUCCESS; +} + lpf_err_t lpf_probe( lpf_t ctx, lpf_machine_t * params ) { lpf::Interface * i = realContext(ctx); diff --git a/src/MPI/ibverbs.cpp b/src/MPI/ibverbs.cpp index 44852caa..5a191ac8 100644 --- a/src/MPI/ibverbs.cpp +++ b/src/MPI/ibverbs.cpp @@ -22,6 +22,11 @@ #include #include +#include +#include + +#define POLL_BATCH 64 +#define MAX_POLLING 128 namespace lpf { namespace mpi { @@ -59,7 +64,8 @@ IBVerbs :: IBVerbs( Communication & comm ) , m_maxSrs(0) , m_device() , m_pd() - , m_cq() + , m_cqLocal() + , m_cqRemote() , m_stagedQps( m_nprocs ) , m_connectedQps( m_nprocs ) , m_srs() @@ -68,11 +74,18 @@ IBVerbs :: IBVerbs( Communication & comm ) , m_activePeers(0, m_nprocs) , m_peerList() , m_sges() - , m_wcs(m_nprocs) , m_memreg() , m_dummyMemReg() , m_dummyBuffer() , m_comm( comm ) + , m_cqSize(1) + , m_postCount(0) + , m_recvCount(0) + , m_numMsgs(0) + //, m_sendTotalInitMsgCount(0) + , m_recvTotalInitMsgCount(0) + , m_sentMsgs(0) + , m_recvdMsgs(0) { m_peerList.reserve( m_nprocs ); @@ -183,12 +196,28 @@ IBVerbs :: IBVerbs( Communication & comm ) } LOG(3, "Opened protection domain"); - struct ibv_cq * const ibv_cq_new_p = ibv_create_cq( m_device.get(), m_nprocs, NULL, NULL, 0 ); - if( ibv_cq_new_p == NULL ) - m_cq.reset(); - else - m_cq.reset( ibv_cq_new_p, ibv_destroy_cq ); - if (!m_cq) { + m_cqLocal.reset(ibv_create_cq( m_device.get(), 1, NULL, NULL, 0 ), ibv_destroy_cq); + m_cqRemote.reset(ibv_create_cq( m_device.get(), m_nprocs, NULL, NULL, 0 ), ibv_destroy_cq); + /** + * New notification functionality for HiCR + */ + struct ibv_srq_init_attr srq_init_attr; + srq_init_attr.srq_context = NULL; + srq_init_attr.attr.max_wr = m_deviceAttr.max_srq_wr; + srq_init_attr.attr.max_sge = m_deviceAttr.max_srq_sge; + srq_init_attr.attr.srq_limit = 0; + m_srq.reset(ibv_create_srq(m_pd.get(), &srq_init_attr ), + ibv_destroy_srq); + + + m_cqLocal.reset(ibv_create_cq( m_device.get(), m_cqSize, NULL, NULL, 0), ibv_destroy_cq); + if (!m_cqLocal) { + LOG(1, "Could not allocate completion queue with '" + << m_nprocs << " entries" ); + throw Exception("Could not allocate completion queue"); + } + m_cqRemote.reset(ibv_create_cq( m_device.get(), m_cqSize * m_nprocs, NULL, NULL, 0), ibv_destroy_cq); + if (!m_cqLocal) { LOG(1, "Could not allocate completion queue with '" << m_nprocs << " entries" ); throw Exception("Could not allocate completion queue"); @@ -211,13 +240,46 @@ IBVerbs :: IBVerbs( Communication & comm ) throw Exception("Could not register memory region"); } - // Wait for all peers to finish LOG(3, "Queue pairs have been successfully initialized"); + } IBVerbs :: ~IBVerbs() -{ +{ } + +inline void IBVerbs :: tryIncrement(Op op, Phase phase, SlotID slot) { + + switch (phase) { + case Phase::INIT: + rcvdMsgCount[slot] = 0; + m_recvInitMsgCount[slot] = 0; + sentMsgCount[slot] = 0; + m_sendInitMsgCount[slot] = 0; + break; + case Phase::PRE: + if (op == Op::SEND) { + m_numMsgs++; + //m_sendTotalInitMsgCount++; + m_sendInitMsgCount[slot]++; + } + if (op == Op::RECV || op == Op::GET) { + m_recvTotalInitMsgCount++; + m_recvInitMsgCount[slot]++; + } + break; + case Phase::POST: + if (op == Op::RECV || op == Op::GET) { + m_recvTotalInitMsgCount++; + m_recvdMsgs ++; + rcvdMsgCount[slot]++; + } + if (op == Op::SEND) { + m_sentMsgs++; + sentMsgCount[slot]++; + } + break; + } } void IBVerbs :: stageQPs( size_t maxMsgs ) @@ -229,10 +291,11 @@ void IBVerbs :: stageQPs( size_t maxMsgs ) attr.qp_type = IBV_QPT_RC; // we want reliable connection attr.sq_sig_all = 0; // only wait for selected messages - attr.send_cq = m_cq.get(); - attr.recv_cq = m_cq.get(); - attr.cap.max_send_wr = std::min(maxMsgs + m_minNrMsgs,m_maxSrs); - attr.cap.max_recv_wr = 1; // one for the dummy + attr.send_cq = m_cqLocal.get(); + attr.recv_cq = m_cqRemote.get(); + attr.srq = m_srq.get(); + attr.cap.max_send_wr = std::min(maxMsgs + m_minNrMsgs,m_maxSrs/4); + attr.cap.max_recv_wr = std::min(maxMsgs + m_minNrMsgs,m_maxSrs/4); attr.cap.max_send_sge = 1; attr.cap.max_recv_sge = 1; @@ -247,10 +310,72 @@ void IBVerbs :: stageQPs( size_t maxMsgs ) throw std::bad_alloc(); } - LOG(3, "Created new Queue pair for " << m_pid << " -> " << i ); + LOG(3, "Created new Queue pair for " << m_pid << " -> " << i << " with qp_num = " << ibv_new_qp_p->qp_num); } } +void IBVerbs :: doRemoteProgress() { + struct ibv_wc wcs[POLL_BATCH]; + struct ibv_recv_wr wr; + struct ibv_sge sg; + struct ibv_recv_wr *bad_wr; + sg.addr = (uint64_t) NULL; + sg.length = 0; + sg.lkey = 0; + wr.next = NULL; + wr.sg_list = &sg; + wr.num_sge = 0; + wr.wr_id = 66; + int pollResult, totalResults = 0; + do { + pollResult = ibv_poll_cq(m_cqRemote.get(), POLL_BATCH, wcs); + if (pollResult > 0) { + LOG(3, "Process " << m_pid << " signals: I received " << pollResult << " remote messages in doRemoteProgress"); + } + else if (pollResult < 0) + { + LOG( 1, "Failed to poll IB completion queue" ); + throw Exception("Poll CQ failure"); + } + + for(int i = 0; i < pollResult; i++) { + if (wcs[i].status != IBV_WC_SUCCESS) { + LOG( 2, "Got bad completion status from IB message." + " status = 0x" << std::hex << wcs[i].status + << ", vendor syndrome = 0x" << std::hex + << wcs[i].vendor_err ); + } + else { + LOG(2, "Process " << m_pid << " Recv wcs[" << i << "].src_qp = "<< wcs[i].src_qp); + LOG(2, "Process " << m_pid << " Recv wcs[" << i << "].slid = "<< wcs[i].slid); + LOG(2, "Process " << m_pid << " Recv wcs[" << i << "].wr_id = "<< wcs[i].wr_id); + LOG(2, "Process " << m_pid << " Recv wcs[" << i << "].imm_data = "<< wcs[i].imm_data); + + /** + * Here is a trick: + * The sender sends relatively generic LPF memslot ID. + * But for IB Verbs, we need to translate that into + * an IB Verbs slot via @getVerbID -- or there will be + * a mismatch when IB Verbs looks up the slot ID + */ + + // Note: Ignore compare-and-swap atomics! + if (wcs[i].opcode != IBV_WC_COMP_SWAP) { + SlotID slot; + // This receive is from a PUT call + if (wcs[i].opcode == IBV_WC_RECV_RDMA_WITH_IMM) { + slot = wcs[i].imm_data; + tryIncrement(Op::RECV, Phase::POST, slot); + LOG(3, "Rank " << m_pid << " increments received message count to " << rcvdMsgCount[slot] << " for LPF slot " << slot); + } + } + ibv_post_srq_recv(m_srq.get(), &wr, &bad_wr); + } + } + if(pollResult > 0) totalResults += pollResult; + } while (pollResult == POLL_BATCH && totalResults < MAX_POLLING); +} + void IBVerbs :: reconnectQPs() { ASSERT( m_stagedQps[0] ); @@ -306,7 +431,7 @@ void IBVerbs :: reconnectQPs() attr.qp_state = IBV_QPS_INIT; attr.port_num = m_ibPort; attr.pkey_index = 0; - attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE; + attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC; flags = IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS; if ( ibv_modify_qp(m_stagedQps[i].get(), &attr, flags) ) { LOG(1, "Cannot bring state of QP " << i << " to INIT"); @@ -322,15 +447,10 @@ void IBVerbs :: reconnectQPs() sge.length = m_dummyBuffer.size(); sge.lkey = m_dummyMemReg->lkey; rr.next = NULL; - rr.wr_id = 0; + rr.wr_id = 46; rr.sg_list = &sge; rr.num_sge = 1; - if (ibv_post_recv(m_stagedQps[i].get(), &rr, &bad_wr)) { - LOG(1, "Cannot post a single receive request to QP " << i ); - throw Exception("Could not post dummy receive request"); - } - // Bring QP to RTR std::memset(&attr, 0, sizeof(attr)); attr.qp_state = IBV_QPS_RTR; @@ -365,13 +485,13 @@ void IBVerbs :: reconnectQPs() std::memset(&attr, 0, sizeof(attr)); attr.qp_state = IBV_QPS_RTS; attr.timeout = 0x12; - attr.retry_cnt = 6; - attr.rnr_retry = 0; + attr.retry_cnt = 0;//7; + attr.rnr_retry = 0;//7; attr.sq_psn = 0; attr.max_rd_atomic = 1; flags = IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC; - if( ibv_modify_qp(m_stagedQps[i].get(), &attr, flags) ) { + if( ibv_modify_qp(m_stagedQps[i].get(), &attr, flags)) { LOG(1, "Cannot bring state of QP " << i << " to RTS" ); throw Exception("Failed to bring QP's state to RTS" ); } @@ -380,24 +500,23 @@ void IBVerbs :: reconnectQPs() } // for each peer } - catch(...) { - m_comm.allreduceOr( true ); - throw; - } + catch(...) { + m_comm.allreduceOr( true ); + throw; + } - if (m_comm.allreduceOr( false )) - throw Exception("Another peer failed to set-up Infiniband queue pairs"); + if (m_comm.allreduceOr( false )) + throw Exception("Another peer failed to set-up Infiniband queue pairs"); - LOG(3, "All staged queue pairs have been connected" ); + LOG(3, "All staged queue pairs have been connected" ); - m_connectedQps.swap( m_stagedQps ); - for (int i = 0; i < m_nprocs; ++i) - m_stagedQps[i].reset(); + m_connectedQps.swap( m_stagedQps ); - LOG(3, "All old queue pairs have been removed"); + LOG(3, "All old queue pairs have been removed"); + + m_comm.barrier(); + } - m_comm.barrier(); -} void IBVerbs :: resizeMemreg( size_t size ) { @@ -421,18 +540,36 @@ void IBVerbs :: resizeMemreg( size_t size ) void IBVerbs :: resizeMesgq( size_t size ) { - ASSERT( m_srs.max_size() > m_minNrMsgs ); - - if ( size > m_srs.max_size() - m_minNrMsgs ) - { - LOG(2, "Could not increase message queue, because integer will overflow"); - throw Exception("Could not increase message queue"); - } - - m_srs.reserve( size + m_minNrMsgs ); - m_sges.reserve( size + m_minNrMsgs ); - stageQPs(size); + m_cqSize = std::min(size,m_maxSrs/4); + size_t remote_size = std::min(m_cqSize*m_nprocs,m_maxSrs/4); + if (m_cqLocal) { + ibv_resize_cq(m_cqLocal.get(), m_cqSize); + } + if(remote_size >= m_postCount){ + if (m_cqRemote) { + ibv_resize_cq(m_cqRemote.get(), remote_size); + } + } + stageQPs(m_cqSize); + if(remote_size >= m_postCount){ + if (m_srq) { + struct ibv_recv_wr wr; + struct ibv_sge sg; + struct ibv_recv_wr *bad_wr; + sg.addr = (uint64_t) NULL; + sg.length = 0; + sg.lkey = 0; + wr.next = NULL; + wr.sg_list = &sg; + wr.num_sge = 0; + wr.wr_id = m_pid; + for(int i = m_postCount; i < (int)remote_size; ++i){ + ibv_post_srq_recv(m_srq.get(), &wr, &bad_wr); + m_postCount++; + } + } + } LOG(4, "Message queue has been reallocated to size " << size ); } @@ -445,7 +582,7 @@ IBVerbs :: SlotID IBVerbs :: regLocal( void * addr, size_t size ) LOG(4, "Registering locally memory area at " << addr << " of size " << size ); struct ibv_mr * const ibv_mr_new_p = ibv_reg_mr( m_pd.get(), addr, size, - IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE + IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC ); if( ibv_mr_new_p == NULL ) slot.mr.reset(); @@ -464,6 +601,7 @@ IBVerbs :: SlotID IBVerbs :: regLocal( void * addr, size_t size ) local.rkey = size?slot.mr->rkey:0; SlotID id = m_memreg.addLocalReg( slot ); + tryIncrement(Op::SEND/* <- dummy for init */, Phase::INIT, id); m_memreg.update( id ).glob.resize( m_nprocs ); m_memreg.update( id ).glob[m_pid] = local; @@ -480,7 +618,7 @@ IBVerbs :: SlotID IBVerbs :: regGlobal( void * addr, size_t size ) LOG(4, "Registering globally memory area at " << addr << " of size " << size ); struct ibv_mr * const ibv_mr_new_p = ibv_reg_mr( m_pd.get(), addr, size, - IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE + IBV_ACCESS_REMOTE_READ | IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_ATOMIC ); if( ibv_mr_new_p == NULL ) slot.mr.reset(); @@ -497,6 +635,7 @@ IBVerbs :: SlotID IBVerbs :: regGlobal( void * addr, size_t size ) throw Exception("Another process could not register memory area"); SlotID id = m_memreg.addGlobalReg( slot ); + tryIncrement(Op::SEND/* <- dummy for init */, Phase::INIT, id); MemorySlot & ref = m_memreg.update(id); // exchange memory registration info globally ref.glob.resize(m_nprocs); @@ -520,203 +659,385 @@ void IBVerbs :: dereg( SlotID id ) LOG(4, "Memory area of slot " << id << " has been deregistered"); } + +void IBVerbs :: blockingCompareAndSwap(SlotID srcSlot, size_t srcOffset, int dstPid, SlotID dstSlot, size_t dstOffset, size_t size, uint64_t compare_add, uint64_t swap) +{ + const MemorySlot & src = m_memreg.lookup( srcSlot ); + const MemorySlot & dst = m_memreg.lookup( dstSlot); + + char * localAddr + = static_cast(src.glob[m_pid].addr) + srcOffset; + const char * remoteAddr + = static_cast(dst.glob[dstPid].addr) + dstOffset; + + struct ibv_sge sge; + memset(&sge, 0, sizeof(sge)); + sge.addr = reinterpret_cast( localAddr ); + sge.length = std::min(size, m_maxMsgSize ); + sge.lkey = src.mr->lkey; + + struct ibv_wc wcs[POLL_BATCH]; + struct ibv_send_wr wr; + memset(&wr, 0, sizeof(wr)); + wr.wr_id = srcSlot; + wr.sg_list = &sge; + wr.next = NULL; // this needs to be set, otherwise EINVAL return error in ibv_post_send + wr.num_sge = 1; + wr.opcode = IBV_WR_ATOMIC_CMP_AND_SWP; + wr.send_flags = IBV_SEND_SIGNALED; + wr.wr.atomic.remote_addr = reinterpret_cast(remoteAddr); + wr.wr.atomic.compare_add = compare_add; + wr.wr.atomic.swap = swap; + wr.wr.atomic.rkey = dst.glob[dstPid].rkey; + struct ibv_send_wr *bad_wr; + int error; + std::vector opcodes; + +blockingCompareAndSwap: + if (int err = ibv_post_send(m_connectedQps[dstPid].get(), &wr, &bad_wr )) + { + LOG(1, "Error while posting RDMA requests: " << std::strerror(err) ); + throw Exception("Error while posting RDMA requests"); + } + + /** + * Keep waiting on a completion of events until you + * register a completed atomic compare-and-swap + */ + do { + opcodes = wait_completion(error); + if (error) { + LOG(1, "Error in wait_completion"); + std::abort(); + } + } while (std::find(opcodes.begin(), opcodes.end(), IBV_WC_COMP_SWAP) == opcodes.end()); + + uint64_t * remoteValueFound = reinterpret_cast(localAddr); + /* + * if we fetched the value we expected, then + * we are holding the lock now (that is, we swapped successfully!) + * else, re-post your request for the lock + */ + if (remoteValueFound[0] != compare_add) { + LOG(4, "Process " << m_pid << " couldn't get the lock. remoteValue = " << remoteValueFound[0] << " compare_add = " << compare_add << " go on, iterate\n"); + goto blockingCompareAndSwap; + } + else { + LOG(4, "Process " << m_pid << " reads value " << remoteValueFound[0] << " and expected = " << compare_add <<" gets the lock, done\n"); + } + // else we hold the lock and swap value into the remote slot ... +} + void IBVerbs :: put( SlotID srcSlot, size_t srcOffset, - int dstPid, SlotID dstSlot, size_t dstOffset, size_t size ) + int dstPid, SlotID dstSlot, size_t dstOffset, size_t size) { const MemorySlot & src = m_memreg.lookup( srcSlot ); const MemorySlot & dst = m_memreg.lookup( dstSlot ); ASSERT( src.mr ); - while (size > 0 ) { - struct ibv_sge sge; std::memset(&sge, 0, sizeof(sge)); - struct ibv_send_wr sr; std::memset(&sr, 0, sizeof(sr)); + int numMsgs = size/m_maxMsgSize + (size % m_maxMsgSize > 0); //+1 if last msg size < m_maxMsgSize + if (size == 0) numMsgs = 1; + struct ibv_sge sges[numMsgs]; + struct ibv_send_wr srs[numMsgs]; + struct ibv_sge *sge; + struct ibv_send_wr *sr; + for (int i=0; i < numMsgs; i++) { + sge = &sges[i]; std::memset(sge, 0, sizeof(ibv_sge)); + sr = &srs[i]; std::memset(sr, 0, sizeof(ibv_send_wr)); const char * localAddr = static_cast(src.glob[m_pid].addr) + srcOffset; const char * remoteAddr = static_cast(dst.glob[dstPid].addr) + dstOffset; - sge.addr = reinterpret_cast( localAddr ); - sge.length = std::min(size, m_maxMsgSize ); - sge.lkey = src.mr->lkey; - m_sges.push_back( sge ); + sge->addr = reinterpret_cast( localAddr ); + sge->length = std::min(size, m_maxMsgSize ); + sge->lkey = src.mr->lkey; - bool lastMsg = ! m_activePeers.contains( dstPid ); - sr.next = lastMsg ? NULL : &m_srs[ m_srsHeads[ dstPid ] ]; + bool lastMsg = (i == numMsgs-1); + sr->next = lastMsg ? NULL : &m_srs[ i+1]; // since reliable connection guarantees keeps packets in order, // we only need a signal from the last message in the queue - sr.send_flags = lastMsg ? IBV_SEND_SIGNALED : 0; - - sr.wr_id = 0; // don't need an identifier - sr.sg_list = &m_sges.back(); - sr.num_sge = 1; - sr.opcode = IBV_WR_RDMA_WRITE; - sr.wr.rdma.remote_addr = reinterpret_cast( remoteAddr ); - sr.wr.rdma.rkey = dst.glob[dstPid].rkey; - - m_srsHeads[ dstPid ] = m_srs.size(); - m_srs.push_back( sr ); - m_activePeers.insert( dstPid ); - m_nMsgsPerPeer[ dstPid ] += 1; + sr->send_flags = lastMsg ? IBV_SEND_SIGNALED : 0; + sr->opcode = lastMsg? IBV_WR_RDMA_WRITE_WITH_IMM : IBV_WR_RDMA_WRITE; + /* use wr_id to later demultiplex srcSlot */ + sr->wr_id = srcSlot; + /* + * In HiCR, we need to know at receiver end which slot + * has received the message. But here is a trick: + */ + sr->imm_data = dstSlot; + + sr->sg_list = sge; + sr->num_sge = 1; + sr->wr.rdma.remote_addr = reinterpret_cast( remoteAddr ); + sr->wr.rdma.rkey = dst.glob[dstPid].rkey; + + size -= sge->length; + srcOffset += sge->length; + dstOffset += sge->length; + + LOG(4, "PID " << m_pid << ": Enqueued put message of " << sge->length << " bytes to " << dstPid << " on slot" << dstSlot ); - size -= sge.length; - srcOffset += sge.length; - dstOffset += sge.length; - - LOG(4, "Enqueued put message of " << sge.length << " bytes to " << dstPid ); } + struct ibv_send_wr *bad_wr = NULL; + if (int err = ibv_post_send(m_connectedQps[dstPid].get(), &srs[0], &bad_wr )) + { + LOG(1, "Error while posting RDMA requests: " << std::strerror(err) ); + throw Exception("Error while posting RDMA requests"); + } + tryIncrement(Op::SEND, Phase::PRE, srcSlot); } void IBVerbs :: get( int srcPid, SlotID srcSlot, size_t srcOffset, SlotID dstSlot, size_t dstOffset, size_t size ) { const MemorySlot & src = m_memreg.lookup( srcSlot ); - const MemorySlot & dst = m_memreg.lookup( dstSlot ); - - ASSERT( dst.mr ); - - while (size > 0) { - - struct ibv_sge sge; std::memset(&sge, 0, sizeof(sge)); - struct ibv_send_wr sr; std::memset(&sr, 0, sizeof(sr)); + const MemorySlot & dst = m_memreg.lookup( dstSlot ); + + ASSERT( dst.mr ); + + int numMsgs = size/m_maxMsgSize + (size % m_maxMsgSize > 0); //+1 if last msg size < m_maxMsgSize + + struct ibv_sge sges[numMsgs+1]; + struct ibv_send_wr srs[numMsgs+1]; + struct ibv_sge *sge; + struct ibv_send_wr *sr; + + + for(int i = 0; i< numMsgs; i++){ + sge = &sges[i]; std::memset(sge, 0, sizeof(ibv_sge)); + sr = &srs[i]; std::memset(sr, 0, sizeof(ibv_send_wr)); + + const char * localAddr + = static_cast(dst.glob[m_pid].addr) + dstOffset; + const char * remoteAddr + = static_cast(src.glob[srcPid].addr) + srcOffset; + + sge->addr = reinterpret_cast( localAddr ); + sge->length = std::min(size, m_maxMsgSize ); + sge->lkey = dst.mr->lkey; + + sr->next = NULL; // &srs[i+1]; + sr->send_flags = IBV_SEND_SIGNALED; //0; + + sr->sg_list = sge; + sr->num_sge = 1; + sr->opcode = IBV_WR_RDMA_READ; + sr->wr.rdma.remote_addr = reinterpret_cast( remoteAddr ); + sr->wr.rdma.rkey = src.glob[srcPid].rkey; + // This logic is reversed compared to ::put + // (not srcSlot, as this slot is remote) + sr->wr_id = dstSlot; // <= DO NOT CHANGE THIS !!! + sr->imm_data = srcSlot; // This is irrelevant as we don't send _WITH_IMM + + size -= sge->length; + srcOffset += sge->length; + dstOffset += sge->length; + } + + // add extra "message" to do the local and remote completion + //sge = &sges[numMsgs]; std::memset(sge, 0, sizeof(ibv_sge)); + //sr = &srs[numMsgs]; std::memset(sr, 0, sizeof(ibv_send_wr)); + + /* + const char * localAddr = static_cast(dst.glob[m_pid].addr); + const char * remoteAddr = static_cast(src.glob[srcPid].addr); + + sge->addr = reinterpret_cast( localAddr ); + sge->length = 0; + sge->lkey = dst.mr->lkey; + + sr->next = NULL; + // since reliable connection guarantees keeps packets in order, + // we only need a signal from the last message in the queue + sr->send_flags = IBV_SEND_SIGNALED; + sr->opcode = IBV_WR_RDMA_WRITE_WITH_IMM; + sr->sg_list = sge; + sr->num_sge = 0; + // Should srcSlot and dstSlot be reversed for get? + sr->wr_id = srcSlot; + sr->imm_data = dstSlot; + sr->wr.rdma.remote_addr = reinterpret_cast( remoteAddr ); + sr->wr.rdma.rkey = src.glob[srcPid].rkey; + + //Send + */ + struct ibv_send_wr *bad_wr = NULL; + if (int err = ibv_post_send(m_connectedQps[srcPid].get(), &srs[0], &bad_wr )) + { + + LOG(1, "Error while posting RDMA requests: " << std::strerror(err) ); + if (err == ENOMEM) { + LOG(1, "Specific error code: ENOMEM (send queue is full or no resources)"); + } + throw Exception("Error while posting RDMA requests"); + } + tryIncrement(Op::GET, Phase::PRE, dstSlot); - const char * localAddr - = static_cast(dst.glob[m_pid].addr) + dstOffset; - const char * remoteAddr - = static_cast(src.glob[srcPid].addr) + srcOffset; +} - sge.addr = reinterpret_cast( localAddr ); - sge.length = std::min(size, m_maxMsgSize ); - sge.lkey = dst.mr->lkey; - m_sges.push_back( sge ); +void IBVerbs :: get_rcvd_msg_count(size_t * rcvd_msgs) { + *rcvd_msgs = m_recvdMsgs; +} - bool lastMsg = ! m_activePeers.contains( srcPid ); - sr.next = lastMsg ? NULL : &m_srs[ m_srsHeads[ srcPid ] ]; - // since reliable connection guarantees keeps packets in order, - // we only need a signal from the last message in the queue - sr.send_flags = lastMsg ? IBV_SEND_SIGNALED : 0; - - sr.wr_id = 0; // don't need an identifier - sr.sg_list = &m_sges.back(); - sr.num_sge = 1; - sr.opcode = IBV_WR_RDMA_READ; - sr.wr.rdma.remote_addr = reinterpret_cast( remoteAddr ); - sr.wr.rdma.rkey = src.glob[srcPid].rkey; - - m_srsHeads[ srcPid ] = m_srs.size(); - m_srs.push_back( sr ); - m_activePeers.insert( srcPid ); - m_nMsgsPerPeer[ srcPid ] += 1; - - size -= sge.length; - srcOffset += sge.length; - dstOffset += sge.length; - LOG(4, "Enqueued get message of " << sge.length << " bytes from " << srcPid ); - } +void IBVerbs :: get_rcvd_msg_count_per_slot(size_t * rcvd_msgs, SlotID slot) +{ + *rcvd_msgs = rcvdMsgCount[slot]; } -void IBVerbs :: sync( bool reconnect ) +void IBVerbs :: get_sent_msg_count_per_slot(size_t * sent_msgs, SlotID slot) { - if (reconnect) reconnectQPs(); + *sent_msgs = sentMsgCount.at(slot); +} - while ( !m_activePeers.empty() ) { - m_peerList.clear(); +std::vector IBVerbs :: wait_completion(int& error) { - // post all requests - typedef SparseSet< pid_t> :: const_iterator It; - for (It p = m_activePeers.begin(); p != m_activePeers.end(); ++p ) - { - size_t head = m_srsHeads[ *p ]; - m_peerList.push_back( *p ); - - if ( m_nMsgsPerPeer[*p] > m_maxSrs ) { - // then there are more messages than maximally allowed - // so: dequeue the top m_maxMsgs and post them - struct ibv_send_wr * const pBasis = &m_srs[0]; - struct ibv_send_wr * pLast = &m_srs[ head ]; - for (size_t i = 0 ; i < m_maxSrs-1; ++i ) - pLast = pLast->next; - - ASSERT( pLast != NULL ); - ASSERT( pLast->next != NULL ); // because m_nMsgsperPeer[*p] > m_maxSrs - - ASSERT( pLast->next - pBasis ); // since all send requests are stored in an array - - // now do the dequeueing - m_srsHeads[*p] = pLast->next - pBasis; - pLast->next = NULL; - pLast->send_flags = IBV_SEND_SIGNALED; - LOG(4, "Posting " << m_maxSrs << " of " << m_nMsgsPerPeer[*p] - << " messages from " << m_pid << " -> " << *p ); - m_nMsgsPerPeer[*p] -= m_maxSrs; + error = 0; + LOG(5, "Polling for messages" ); + struct ibv_wc wcs[POLL_BATCH]; + int pollResult = ibv_poll_cq(m_cqLocal.get(), POLL_BATCH, wcs); + std::vector opcodes; + if ( pollResult > 0) { + LOG(3, "Process " << m_pid << ": Received " << pollResult << " acknowledgements"); + + for (int i = 0; i < pollResult ; ++i) { + if (wcs[i].status != IBV_WC_SUCCESS) + { + LOG( 2, "Got bad completion status from IB message." + " status = 0x" << std::hex << wcs[i].status + << ", vendor syndrome = 0x" << std::hex + << wcs[i].vendor_err ); + const char * status_descr; + status_descr = ibv_wc_status_str(wcs[i].status); + LOG( 2, "The work completion status string: " << status_descr); + error = 1; } else { - // signal that we're done - LOG(4, "Posting remaining " << m_nMsgsPerPeer[*p] - << " messages " << m_pid << " -> " << *p ); - m_nMsgsPerPeer[*p] = 0; + LOG(3, "Process " << m_pid << " Send wcs[" << i << "].src_qp = "<< wcs[i].src_qp); + LOG(3, "Process " << m_pid << " Send wcs[" << i << "].slid = "<< wcs[i].slid); + LOG(3, "Process " << m_pid << " Send wcs[" << i << "].wr_id = "<< wcs[i].wr_id); + LOG(3, "Process " << m_pid << " Send wcs[" << i << "].imm_data = "<< wcs[i].imm_data); } - struct ibv_send_wr * bad_wr = NULL; - struct ibv_qp * const ibv_qp_p = m_connectedQps[*p].get(); - ASSERT( ibv_qp_p != NULL ); - if (int err = ibv_post_send(ibv_qp_p, &m_srs[ head ], &bad_wr )) - { - LOG(1, "Error while posting RDMA requests: " << std::strerror(err) ); - throw Exception("Error while posting RDMA requests"); + SlotID slot = wcs[i].wr_id; + opcodes.push_back(wcs[i].opcode); + // Ignore compare-and-swap atomics! + if (wcs[i].opcode != IBV_WC_COMP_SWAP) { + // This receive is from a GET call! + if (wcs[i].opcode == IBV_WC_RDMA_READ) { + tryIncrement(Op::GET, Phase::POST, slot); + } + if (wcs[i].opcode == IBV_WC_RDMA_WRITE) + tryIncrement(Op::SEND, Phase::POST, slot); + + LOG(3, "Rank " << m_pid << " increments sent message count to " << sentMsgCount[slot] << " for LPF slot " << slot); } } + } + else if (pollResult < 0) + { + LOG( 5, "Failed to poll IB completion queue" ); + throw Exception("Poll CQ failure"); + } + return opcodes; +} - // wait for completion +void IBVerbs :: flushReceived() { + doRemoteProgress(); +} - int n = m_activePeers.size(); - int error = 0; - while (n > 0) - { - LOG(5, "Polling for " << n << " messages" ); - int pollResult = ibv_poll_cq(m_cq.get(), n, m_wcs.data() ); - if ( pollResult > 0) { - LOG(4, "Received " << pollResult << " acknowledgements"); - n-= pollResult; - - for (int i = 0; i < pollResult ; ++i) { - if (m_wcs[i].status != IBV_WC_SUCCESS) - { - LOG( 2, "Got bad completion status from IB message." - " status = 0x" << std::hex << m_wcs[i].status - << ", vendor syndrome = 0x" << std::hex - << m_wcs[i].vendor_err ); - error = 1; - } +void IBVerbs :: flushSent() +{ + int error = 0; + + bool sendsComplete; + do { + sendsComplete = true; + for (auto it = m_sendInitMsgCount.begin(); it != m_sendInitMsgCount.end(); it++) { + if (it->second > sentMsgCount[it->first]) { + sendsComplete = false; + wait_completion(error); + if (error) { + LOG(1, "Error in wait_completion. Most likely issue is that receiver is not calling ibv_post_srq!\n"); + std::abort(); } } - else if (pollResult < 0) - { - LOG( 1, "Failed to poll IB completion queue" ); - throw Exception("Poll CQ failure"); - } } + } while (!sendsComplete); + + +} + +void IBVerbs :: countingSyncPerSlot(bool resized, SlotID slot, size_t expectedSent, size_t expectedRecvd) { + if (resized) reconnectQPs(); + size_t actualRecvd; + size_t actualSent; + int error; + do { + // this call triggers doRemoteProgress + doRemoteProgress(); + wait_completion(error); if (error) { - throw Exception("Error occurred during polling"); + LOG(1, "Error in wait_completion"); + std::abort(); } + get_rcvd_msg_count_per_slot(&actualRecvd, slot); + get_sent_msg_count_per_slot(&actualSent, slot); - for ( unsigned p = 0; p < m_peerList.size(); ++p) { - if (m_nMsgsPerPeer[ m_peerList[p] ] == 0 ) - m_activePeers.erase( m_peerList[p] ); + } while ((expectedSent > actualSent) || (expectedRecvd > actualRecvd)); + +} + +void IBVerbs :: syncPerSlot(bool resized, SlotID slot) { + if (resized) reconnectQPs(); + int error; + + do { + wait_completion(error); + if (error) { + LOG(1, "Error in wait_completion"); + std::abort(); } + doRemoteProgress(); } + while ((rcvdMsgCount.at(slot) < m_recvInitMsgCount.at(slot)) || (sentMsgCount.at(slot) < m_sendInitMsgCount.at(slot))); + + /** + * A subsequent barrier is a controversial decision: + * - if we use it, the sync guarantees that + * receiver has received all that it is supposed to + * receive. However, it loses all performance advantages + * of waiting "only on certain tags" + * - if we do not barrier, we only make sure the slot + * completes all sends and receives that HAVE ALREADY + * BEEN ISSUED. However, a receiver of an RMA put + * cannot know if it is supposed to receive more messages. + * It can only know if it is receiving via an RMA get. + * Therefore, now this operation is commented + */ + //m_comm.barrier(); - // clear all tables - m_activePeers.clear(); - m_srs.clear(); - std::fill( m_srsHeads.begin(), m_srsHeads.end(), 0u ); - std::fill( m_nMsgsPerPeer.begin(), m_nMsgsPerPeer.end(), 0u ); - m_sges.clear(); +} - // synchronize +void IBVerbs :: sync(bool resized) +{ + + if (resized) reconnectQPs(); + + int error = 0; + + // flush send queues + flushSent(); + // flush receive queues + flushReceived(); + + LOG(1, "Process " << m_pid << " will call barrier\n"); m_comm.barrier(); + + } diff --git a/src/MPI/ibverbs.hpp b/src/MPI/ibverbs.hpp index b79ec53a..ee7cb80c 100644 --- a/src/MPI/ibverbs.hpp +++ b/src/MPI/ibverbs.hpp @@ -19,12 +19,16 @@ #define LPF_CORE_MPI_IBVERBS_HPP #include +#include #include -#if __cplusplus >= 201103L - #include -#else - #include -#endif +#include +#include +#include +//#if __cplusplus >= 201103L +// #include +//#else +// #include +//#endif #include @@ -33,6 +37,18 @@ #include "sparseset.hpp" #include "memreg.hpp" +typedef enum Op { + SEND, + RECV, + GET +} Op; + +typedef enum Phase { + INIT, + PRE, + POST +} Phase; + namespace lpf { class Communication; @@ -62,24 +78,48 @@ class _LPFLIB_LOCAL IBVerbs SlotID regGlobal( void * addr, size_t size ); void dereg( SlotID id ); + void blockingCompareAndSwap(SlotID srSlot, size_t srcOffset, int dstPid, SlotID dstSlot, size_t dstOffset, size_t size, uint64_t compare_add, uint64_t swap); + void put( SlotID srcSlot, size_t srcOffset, - int dstPid, SlotID dstSlot, size_t dstOffset, size_t size ); + int dstPid, SlotID dstSlot, size_t dstOffset, size_t size); void get( int srcPid, SlotID srcSlot, size_t srcOffset, SlotID dstSlot, size_t dstOffset, size_t size ); + void flushSent(); + + void flushReceived(); + + void doRemoteProgress(); + + void countingSyncPerSlot(bool resized, SlotID tag, size_t sent, size_t recvd); + /** + * @syncPerSlot only guarantees that all already scheduled sends (via put), + * or receives (via get) associated with a slot are completed. It does + * not guarantee that not scheduled operations will be scheduled (e.g. + * no guarantee that a remote process will wait til data is put into its + * memory, as it does schedule the operation (one-sided). + */ + void syncPerSlot(bool resized, SlotID slot); // Do the communication and synchronize - // 'Reconnect' must be a globally replicated value - void sync( bool reconnect); + void sync(bool resized); + void get_rcvd_msg_count(size_t * rcvd_msgs); + void get_rcvd_msg_count_per_slot(size_t * rcvd_msgs, SlotID slot); + void get_sent_msg_count_per_slot(size_t * sent_msgs, SlotID slot); private: IBVerbs & operator=(const IBVerbs & ); // assignment prohibited IBVerbs( const IBVerbs & ); // copying prohibited void stageQPs(size_t maxMsgs ); void reconnectQPs(); + void tryLock(SlotID id, int dstPid); + void tryUnlock(SlotID id, int dstPid); + std::vector wait_completion(int& error); + void doProgress(); + void tryIncrement(Op op, Phase phase, SlotID slot); struct MemoryRegistration { void * addr; @@ -93,8 +133,20 @@ class _LPFLIB_LOCAL IBVerbs std::vector< MemoryRegistration > glob; // array for global registrations }; + struct UserContext { + size_t lkey; + }; + int m_pid; // local process ID int m_nprocs; // number of processes + std::atomic_size_t m_numMsgs; + //std::atomic_size_t m_sendTotalInitMsgCount; + std::atomic_size_t m_recvTotalInitMsgCount; + std::atomic_size_t m_sentMsgs; + std::atomic_size_t m_recvdMsgs; + std::map m_recvInitMsgCount; + std::map m_getInitMsgCount; + std::map m_sendInitMsgCount; std::string m_devName; // IB device name int m_ibPort; // local IB port to work with @@ -104,18 +156,23 @@ class _LPFLIB_LOCAL IBVerbs struct ibv_device_attr m_deviceAttr; size_t m_maxRegSize; size_t m_maxMsgSize; + size_t m_cqSize; size_t m_minNrMsgs; size_t m_maxSrs; // maximum number of sends requests per QP + size_t m_postCount; + size_t m_recvCount; shared_ptr< struct ibv_context > m_device; // device handle shared_ptr< struct ibv_pd > m_pd; // protection domain - shared_ptr< struct ibv_cq > m_cq; // complation queue + shared_ptr< struct ibv_cq > m_cqLocal; // completion queue + shared_ptr< struct ibv_cq > m_cqRemote; // completion queue + shared_ptr< struct ibv_srq > m_srq; // shared receive queue // Disconnected queue pairs - std::vector< shared_ptr< struct ibv_qp > > m_stagedQps; + std::vector< shared_ptr > m_stagedQps; // Connected queue pairs - std::vector< shared_ptr< struct ibv_qp > > m_connectedQps; + std::vector< shared_ptr > m_connectedQps; std::vector< struct ibv_send_wr > m_srs; // array of send requests @@ -123,9 +180,11 @@ class _LPFLIB_LOCAL IBVerbs std::vector< size_t > m_nMsgsPerPeer; // number of messages per peer SparseSet< pid_t > m_activePeers; // std::vector< pid_t > m_peerList; + shared_ptr progressThread; + std::map rcvdMsgCount; + std::map sentMsgCount; std::vector< struct ibv_sge > m_sges; // array of scatter/gather entries - std::vector< struct ibv_wc > m_wcs; // array of work completions CombinedMemoryRegister< MemorySlot > m_memreg; diff --git a/src/MPI/interface.cpp b/src/MPI/interface.cpp index f1919f33..9510b518 100644 --- a/src/MPI/interface.cpp +++ b/src/MPI/interface.cpp @@ -91,6 +91,16 @@ catch ( const std::bad_alloc & e) throw; } + +void Interface :: lockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, + size_t size ) +{ + m_mesgQueue.lockSlot( srcSlot, srcOffset, + dstPid, dstSlot, dstOffset, + size ); +} + void Interface :: put( memslot_t srcSlot, size_t srcOffset, pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ) @@ -100,6 +110,35 @@ void Interface :: put( memslot_t srcSlot, size_t srcOffset, size ); } +void Interface :: unlockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, + size_t size ) +{ + m_mesgQueue.unlockSlot( srcSlot, srcOffset, + dstPid, dstSlot, dstOffset, + size ); +} + +void Interface :: getRcvdMsgCountPerSlot(size_t * msgs, SlotID slot) { + m_mesgQueue.getRcvdMsgCountPerSlot(msgs, slot); +} + +void Interface :: getSentMsgCountPerSlot(size_t * msgs, SlotID slot) { + m_mesgQueue.getSentMsgCountPerSlot(msgs, slot); +} + +void Interface :: flushSent() { + m_mesgQueue.flushSent(); +} + +void Interface :: flushReceived() { + m_mesgQueue.flushReceived(); +} + +void Interface :: getRcvdMsgCount(size_t * msgs) { + m_mesgQueue.getRcvdMsgCount(msgs); +} + void Interface :: get( pid_t srcPid, memslot_t srcSlot, size_t srcOffset, memslot_t dstSlot, size_t dstOffset, size_t size ) @@ -139,7 +178,10 @@ void Interface :: abort() ASSERT( 0 == m_aborted ); // signal all other processes at the start of the next 'sync' that // this process aborted. - m_aborted = m_mesgQueue.sync( true ); + int vote = 1; + int voted; + m_comm.allreduceSum(&vote, &voted, 1); + m_aborted = voted; } pid_t Interface :: isAborted() const @@ -151,11 +193,33 @@ err_t Interface :: sync() { if ( 0 == m_aborted ) { - m_aborted = m_mesgQueue.sync( false ); + m_aborted = m_mesgQueue.sync(); + return LPF_SUCCESS; } - + else + { + return LPF_ERR_FATAL; + } +} + +err_t Interface :: countingSyncPerSlot(memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + if ( 0 == m_aborted ) + { + m_aborted = m_mesgQueue.countingSyncPerSlot(slot, expected_sent, expected_rcvd); + return LPF_SUCCESS; + } + else + { + return LPF_ERR_FATAL; + } +} + +err_t Interface :: syncPerSlot(memslot_t slot) +{ if ( 0 == m_aborted ) { + m_aborted = m_mesgQueue.syncPerSlot(slot); return LPF_SUCCESS; } else diff --git a/src/MPI/interface.hpp b/src/MPI/interface.hpp index 732f0a9b..5b2e5171 100644 --- a/src/MPI/interface.hpp +++ b/src/MPI/interface.hpp @@ -38,6 +38,14 @@ class _LPFLIB_LOCAL Interface return s_root; } + void lockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, + size_t size ); + + void unlockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, + size_t size ); + _LPFLIB_API static void initRoot(int *argc, char ***argv); @@ -65,11 +73,20 @@ class _LPFLIB_LOCAL Interface pid_t isAborted() const ; err_t sync(); // nothrow + err_t countingSyncPerSlot(memslot_t slot, size_t expected_sent, size_t expected_rcvd); // nothrow + err_t syncPerSlot(memslot_t slot); // nothrow err_t exec( pid_t P, spmd_t spmd, args_t args ) ; static err_t hook( const mpi::Comm & comm , spmd_t spmd, args_t args ); + typedef size_t SlotID; + void getRcvdMsgCountPerSlot(size_t * msgs, SlotID slot); + void getSentMsgCountPerSlot(size_t * msgs, SlotID slot); + void getRcvdMsgCount(size_t * msgs); + void flushSent(); + void flushReceived(); + err_t rehook( spmd_t spmd, args_t args); void probe( machine_t & machine ) ; diff --git a/src/MPI/memorytable.hpp b/src/MPI/memorytable.hpp index 18dd5038..ffe6b314 100644 --- a/src/MPI/memorytable.hpp +++ b/src/MPI/memorytable.hpp @@ -92,7 +92,8 @@ class _LPFLIB_LOCAL MemoryTable #ifdef LPF_CORE_MPI_USES_ibverbs mpi::IBVerbs::SlotID getVerbID( Slot slot ) const - { return m_memreg.lookup( slot ).slot; } + { + return m_memreg.lookup( slot ).slot; } #endif void reserve( size_t size ); // throws bad_alloc, strong safe diff --git a/src/MPI/mesgqueue.cpp b/src/MPI/mesgqueue.cpp index 0f610a52..fe39ee04 100644 --- a/src/MPI/mesgqueue.cpp +++ b/src/MPI/mesgqueue.cpp @@ -270,711 +270,141 @@ void MessageQueue :: removeReg( memslot_t slot ) void MessageQueue :: get( pid_t srcPid, memslot_t srcSlot, size_t srcOffset, memslot_t dstSlot, size_t dstOffset, size_t size ) { - if (size > 0) - { - ASSERT( ! m_memreg.isLocalSlot( srcSlot ) ); - void * address = m_memreg.getAddress( dstSlot, dstOffset ); - if ( srcPid == static_cast(m_pid) ) - { - std::memcpy( address, m_memreg.getAddress( srcSlot, srcOffset), size); - } - else - { - using mpi::ipc::newMsg; - - if (size <= m_tinyMsgSize ) - { - // send immediately the request to the source - newMsg( BufGet, m_tinyMsgBuf.data(), m_tinyMsgBuf.size() ) - .write( DstPid , m_pid ) - .write( SrcSlot, srcSlot) - .write( DstSlot, dstSlot) - .write( SrcOffset, srcOffset ) - .write( DstOffset, dstOffset ) - .write( Size, size ) - .send( *m_firstQueue, srcPid ); - } - else - { - // send the request to the destination process (this process) - // for write conflict resolution - newMsg( HpGet, m_tinyMsgBuf.data(), m_tinyMsgBuf.size() ) - .write( SrcPid, srcPid ) - .write( DstPid, m_pid ) - .write( SrcSlot, srcSlot ) - .write( DstSlot, dstSlot ) - .write( SrcOffset, srcOffset ) - .write( DstOffset, dstOffset ) - .write( Size, size ) - . send( *m_firstQueue, m_pid ); - } - } - } +#ifdef LPF_CORE_MPI_USES_ibverbs + m_ibverbs.get(srcPid, + m_memreg.getVerbID( srcSlot), + srcOffset, + m_memreg.getVerbID( dstSlot), + dstOffset, + size ); +#endif } -void MessageQueue :: put( memslot_t srcSlot, size_t srcOffset, +void MessageQueue :: lockSlot( memslot_t srcSlot, size_t srcOffset, pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ) { - if (size > 0) - { - ASSERT( ! m_memreg.isLocalSlot( dstSlot ) ); - void * address = m_memreg.getAddress( srcSlot, srcOffset ); - if ( dstPid == static_cast(m_pid) ) - { - std::memcpy( m_memreg.getAddress( dstSlot, dstOffset), address, size); - } - else - { - using mpi::ipc::newMsg; - if (size <= m_tinyMsgSize ) - { - newMsg( BufPut, m_tinyMsgBuf.data(), m_tinyMsgBuf.size() ) - .write( DstSlot, dstSlot ) - .write( DstOffset, dstOffset ) - .write( Payload, address, size ) - . send( *m_firstQueue, dstPid ); - } - else - { - newMsg( HpPut, m_tinyMsgBuf.data(), m_tinyMsgBuf.size() ) - .write( SrcPid, m_pid ) - .write( DstPid, dstPid ) - .write( SrcSlot, srcSlot ) - .write( DstSlot, dstSlot ) - .write( SrcOffset, srcOffset ) - .write( DstOffset, dstOffset ) - .write( Size, size ) - .send( *m_firstQueue, dstPid ); - } - } - } +#ifdef LPF_CORE_MPI_USES_ibverbs +m_ibverbs.blockingCompareAndSwap(m_memreg.getVerbID(srcSlot), srcOffset, dstPid, m_memreg.getVerbID(dstSlot), dstOffset, size, 0ULL, 1ULL); +#else + std::cerr << "Only IBVerbs::lockSlot available in this backend, abort\n"; + std::abort(); +#endif } -int MessageQueue :: sync( bool abort ) +void MessageQueue :: unlockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ) { - LOG(4, "mpi :: MessageQueue :: sync( abort " << (abort?"true":"false") - << " )"); - using mpi::ipc::newMsg; - using mpi::ipc::recvMsg; - - // 1. communicate all requests to their destination and also - // communicate the buffered gets to the source - const int trials = 5; - bool randomize = false; - m_vote[0] = abort?1:0; - m_vote[1] = m_resized?1:0; - LOG(4, "Executing 1st meta-data exchange"); - if ( m_firstQueue->exchange(m_comm, randomize, m_vote.data(), trials) ) - { - LOG(2, "All " << trials << " sparse all-to-all attempts have failed"); - throw std::runtime_error("All sparse all-to-all attempts have failed"); - } - if ( m_vote[0] != 0 ) { - LOG(2, "Abort detected by sparse all-to-all"); - return m_vote[0]; - } - - m_resized = (m_vote[1] > 0); - - // Synchronize the memory registrations -#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs - if (m_resized) { - if (m_edgeBufferSlot != m_memreg.invalidSlot()) - { - m_memreg.remove( m_edgeBufferSlot ); - m_edgeBufferSlot = m_memreg.invalidSlot(); - } - ASSERT( m_edgeBufferSlot == m_memreg.invalidSlot() ); - - LOG(4, "Registering edge buffer slot of size " - << m_edgeBuffer.capacity() ); - - m_edgeBufferSlot - = m_memreg.addGlobal(m_edgeBuffer.data(), m_edgeBuffer.capacity()); - } -#endif - - LOG(4, "Syncing memory table" ); - m_memreg.sync(); - - // shrink memory register if necessary - ASSERT( m_nextMemRegSize <= m_memreg.capacity() ); - if ( m_memreg.capacity() > m_nextMemRegSize ) - { - LOG(4, "Reducing size of memory table "); - m_memreg.reserve( m_nextMemRegSize ); - } - - - LOG(4, "Processing message meta-data" ); - -#ifdef LPF_CORE_MPI_USES_mpimsg - int tagger = 0; +#ifdef LPF_CORE_MPI_USES_ibverbs +m_ibverbs.blockingCompareAndSwap(m_memreg.getVerbID(srcSlot), srcOffset, dstPid, m_memreg.getVerbID(dstSlot), dstOffset, size, 1ULL, 0ULL); +#else + std::cerr << "Only IBVerbs::unlockSlot available in this backend, abort\n"; + std::abort(); #endif - MessageSort :: MsgId newMsgId = 0; - - // 2. Schedule unbuffered comm for write conflict resolution, - // and process buffered communication - while ( !m_firstQueue->empty() ) - { - mpi::IPCMesg msg = recvMsg( *m_firstQueue, m_tinyMsgBuf.data(), m_tinyMsgBuf.size()); +} - switch ( msg.type() ) - { - case BufPut: { - /* execute them now so, we don't have to think about them anymore */ - memslot_t dstSlot; - size_t dstOffset; - msg.read( DstSlot, dstSlot) - .read( DstOffset, dstOffset ); - - void * addr = m_memreg.getAddress( dstSlot, dstOffset); - - msg.read( Payload, addr, msg.bytesLeft() ); - /* that's a relief :-) */ - break; - } - - case BufGet: { - /* process the buffered get now, and put it in the second queue */ - memslot_t srcSlot, dstSlot; - pid_t dstPid; - size_t srcOffset, dstOffset; - size_t size; - - msg .read( DstPid, dstPid ) - .read( SrcSlot, srcSlot) - .read( DstSlot, dstSlot) - .read( SrcOffset, srcOffset ) - .read( DstOffset, dstOffset ) - .read( Size, size ); - - ASSERT( msg.bytesLeft() == 0 ); - - void * addr = m_memreg.getAddress(srcSlot, srcOffset); - - newMsg( BufGetReply, m_tinyMsgBuf.data(), m_tinyMsgBuf.size() ) - .write( DstSlot, dstSlot ) - .write( DstOffset, dstOffset ) - .write( Payload, addr, size ) - . send( *m_secondQueue, dstPid ); - break; - } - - case HpGet: - case HpPut: { - ASSERT( newMsgId < m_bodyRequests.size() ); - ASSERT( newMsgId < m_edgeRecv.size() ); - MessageSort :: MsgId id = newMsgId++; /* give it a unique ID */ - - /* store the edges of a put in a separate queue */ - pid_t srcPid, dstPid; - memslot_t srcSlot, dstSlot; - size_t srcOffset, dstOffset; - size_t size; - msg .read( SrcPid, srcPid ) - .read( DstPid, dstPid ) - .read( SrcSlot, srcSlot ) - .read( DstSlot, dstSlot ) - .read( SrcOffset, srcOffset ) - .read( DstOffset, dstOffset ) - .read( Size, size ); - - Body body; - body.id = id; -#ifdef LPF_CORE_MPI_USES_mpimsg - body.tag = -1; -#endif - body.srcPid = srcPid; - body.dstPid = dstPid; - body.srcSlot = srcSlot; - body.dstSlot = dstSlot; - body.srcOffset = srcOffset; - body.dstOffset = dstOffset; - body.roundedDstOffset = dstOffset; - body.roundedSize = size; - body.size = size; - - if (size >= m_smallMsgSize ) { - /* add it to the write conflict resolution table - * and align the boundaries */ - m_msgsort.pushWrite( id, body.dstSlot, - body.roundedDstOffset, body.roundedSize ); - } - else - { - body.roundedSize = 0; - } - /* store it in a lookup table */ - m_bodyRequests[ id ] = body; - - /* Send a request out for the edge */ - Edge edge ; - edge.id = id; -#ifdef LPF_CORE_MPI_USES_mpimsg - edge.tag = -1; +void MessageQueue :: put( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ) +{ +#ifdef LPF_CORE_MPI_USES_ibverbs + m_ibverbs.put( m_memreg.getVerbID( srcSlot), + srcOffset, + dstPid, + m_memreg.getVerbID( dstSlot), + dstOffset, + size); +#else + std::cerr << "Only IBVerbs::put available in this backend, abort\n"; + std::abort(); #endif - edge.canWriteHead = false; - edge.canWriteTail = false; - edge.srcPid = srcPid; - edge.dstPid = dstPid; - edge.srcSlot = srcSlot; - edge.dstSlot = dstSlot; - edge.srcOffset = srcOffset; - edge.dstOffset = dstOffset; - edge.bufOffset = static_cast(-1); - edge.size = size; - edge.roundedDstOffset = body.roundedDstOffset; - edge.roundedSize = body.roundedSize; - m_edgeRecv[id] = edge; - - break; - } - - default: ASSERT(!"Unexpected message"); break; - } - } - - LOG(4, "Processing message edges" ); - - /* Figure out which edge requests require further processing */ - const size_t localNumberOfEdges = newMsgId; - for (size_t id = 0 ; id < localNumberOfEdges; ++id ) - { - Edge & edge = m_edgeRecv[id]; - - size_t headSize = edge.roundedDstOffset - edge.dstOffset; - size_t tailSize = edge.size - edge.roundedSize - headSize; - bool canWriteHead = headSize > 0 - && m_msgsort.canWrite( id, edge.dstSlot, edge.dstOffset); - - bool canWriteTail = tailSize > 0 - && m_msgsort.canWrite( id, edge.dstSlot, edge.dstOffset + edge.size-1) ; +} - if ( canWriteHead || canWriteTail ) - { - edge.bufOffset = m_edgeBuffer.size(); -#ifdef LPF_CORE_MPI_USES_mpimsg - edge.tag = tagger; - tagger += (canWriteHead + canWriteTail ); -#endif - edge.canWriteHead = canWriteHead; - edge.canWriteTail = canWriteTail; +int MessageQueue :: sync() +{ - m_edgeBuffer.resize( m_edgeBuffer.size() + - (canWriteHead ? headSize : 0) + - (canWriteTail ? tailSize : 0) ); + // if not, deal with normal sync + m_memreg.sync(); -#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs - if ( !m_memreg.isLocalSlot( edge.dstSlot ) ) /* was this from a put?*/ -#endif - { - newMsg( HpEdges, m_tinyMsgBuf.data(), m_tinyMsgBuf.size() ) - .write( MsgId, edge.id) -#ifdef LPF_CORE_MPI_USES_mpimsg - .write( Tag, edge.tag ) +#ifdef LPF_CORE_MPI_USES_ibverbs + m_ibverbs.sync(m_resized); #endif - .write( Head, edge.canWriteHead ) - .write( Tail, edge.canWriteTail ) - .write( SrcPid, edge.srcPid ) - .write( DstPid, edge.dstPid ) - .write( SrcSlot, edge.srcSlot ) - .write( DstSlot, edge.dstSlot ) - .write( SrcOffset, edge.srcOffset ) - .write( DstOffset, edge.dstOffset ) - .write( BufOffset, edge.bufOffset ) - .write( RoundedDstOffset, edge.roundedDstOffset ) - .write( RoundedSize, edge.roundedSize ) - .write( Size, edge.size ) - .send( *m_secondQueue, edge.srcPid ); - } - } - - ASSERT( !edge.canWriteHead || edge.bufOffset + headSize <= m_edgeBuffer.size() ); - ASSERT( !edge.canWriteTail || edge.bufOffset + (edge.canWriteHead?headSize:0) - + tailSize <= m_edgeBuffer.size() ); - } - - ASSERT( m_bodyRecvs.empty() ); - LOG(4, "Resolving write conflicts" ); - - // 3. Read out the conflict free message requests, and adjust them - // note: this may double the number of messages! - { MessageSort::MsgId msgId = 0; char * addr = 0; size_t size = 0; - while ( m_msgsort.popWrite( msgId, addr, size ) ) - { - Body body = m_bodyRequests[ msgId ]; - - /* Note: Get's and put's are handled the same */ + m_resized = false; - ASSERT( body.dstPid == static_cast(m_pid) ); - ASSERT( body.srcPid != static_cast(m_pid) ); + return 0; +} - char * origRoundedAddr = static_cast( - m_memreg.getAddress( body.dstSlot, body.roundedDstOffset) - ); - ptrdiff_t shift = addr - origRoundedAddr ; +int MessageQueue :: countingSyncPerSlot(SlotID slot, size_t expected_sent, size_t expected_rcvd) +{ - Body bodyPart = body; - bodyPart.roundedDstOffset += shift ; - bodyPart.roundedSize = size; -#ifdef LPF_CORE_MPI_USES_mpimsg - bodyPart.tag = tagger++; // generate unique ids for MPI message tags -#endif + // if not, deal with normal sync + m_memreg.sync(); -#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs - if ( m_memreg.isLocalSlot( bodyPart.dstSlot) ) /* handle gets at their dest */ -#endif - { - m_bodyRecvs.push_back( bodyPart ); - } -#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs - else /* handle puts at their src */ -#endif - { - newMsg( HpBodyReply, m_tinyMsgBuf.data(), m_tinyMsgBuf.size() ) - .write( MsgId, bodyPart.id ) -#ifdef LPF_CORE_MPI_USES_mpimsg - .write( Tag, bodyPart.tag ) +#ifdef LPF_CORE_MPI_USES_ibverbs + m_ibverbs.countingSyncPerSlot(m_resized, slot, expected_sent, expected_rcvd); #endif - .write( SrcPid, bodyPart.srcPid ) - .write( DstPid, bodyPart.dstPid ) - .write( SrcSlot, bodyPart.srcSlot ) - .write( DstSlot, bodyPart.dstSlot ) - .write( SrcOffset, bodyPart.srcOffset ) - .write( DstOffset, bodyPart.dstOffset ) - .write( Size, bodyPart.size ) - .write( RoundedDstOffset, bodyPart.roundedDstOffset ) - .write( RoundedSize, bodyPart.roundedSize ) - .send( *m_secondQueue, body.srcPid ); - } - } } - - // 4. exchange the messages to their destination - LOG(4, "Executing 2nd meta-data exchange"); - if ( m_secondQueue->exchange( m_comm, randomize, m_vote.data(), trials )) { - LOG(2, "All " << trials << " sparse all-to-all attempts have failed"); - throw std::runtime_error("All sparse all-to-all attempts have failed"); - } - - ASSERT( m_bodySends.empty() ); - ASSERT( m_edgeSend.empty() ); - - LOG(4, "Processing message meta-data" ); - // 5. Execute buffered gets and process get edges - // postpone unbuffered comm just a little while. - while( !m_secondQueue->empty() ) - { - mpi::IPCMesg msg = recvMsg( *m_secondQueue, m_tinyMsgBuf.data(), m_tinyMsgBuf.size() ); - - switch ( msg.type() ) - { - case BufGetReply: { /* handle the response of a buffered get */ - memslot_t dstSlot; - size_t dstOffset; - msg.read( DstSlot, dstSlot) - .read( DstOffset, dstOffset ); - void * addr = m_memreg.getAddress( dstSlot, dstOffset); + m_resized = false; - msg.read( Payload, addr, msg.bytesLeft() ); - break; - } + return 0; +} - case HpEdges : { - Edge e ; - msg .read( MsgId, e.id) -#ifdef LPF_CORE_MPI_USES_mpimsg - .read( Tag, e.tag ) -#endif - .read( Head, e.canWriteHead ) - .read( Tail, e.canWriteTail ) - .read( SrcPid, e.srcPid ) - .read( DstPid, e.dstPid ) - .read( SrcSlot, e.srcSlot ) - .read( DstSlot, e.dstSlot ) - .read( SrcOffset, e.srcOffset ) - .read( DstOffset, e.dstOffset ) - .read( BufOffset, e.bufOffset ) - .read( RoundedDstOffset, e.roundedDstOffset ) - .read( RoundedSize, e.roundedSize ) - .read( Size, e.size ); - m_edgeSend.push_back( e ); - break; - } - - case HpBodyReply: { /* handle all unbuffered comm */ - Body bodyPart; - msg .read( MsgId, bodyPart.id ) -#ifdef LPF_CORE_MPI_USES_mpimsg - .read( Tag, bodyPart.tag ) -#endif - .read( SrcPid, bodyPart.srcPid ) - .read( DstPid, bodyPart.dstPid ) - .read( SrcSlot, bodyPart.srcSlot ) - .read( DstSlot, bodyPart.dstSlot ) - .read( SrcOffset, bodyPart.srcOffset ) - .read( DstOffset, bodyPart.dstOffset ) - .read( Size, bodyPart.size ) - .read( RoundedDstOffset, bodyPart.roundedDstOffset ) - .read( RoundedSize, bodyPart.roundedSize ); - - m_bodySends.push_back( bodyPart ); - break; - } - - default: - ASSERT( !"Unexpected message" ); - break; - } - } +int MessageQueue :: syncPerSlot(SlotID slot) +{ -#ifdef LPF_CORE_MPI_USES_mpirma - // Make sure that no MPI put or was operating before this line - if (m_nprocs > 1) - m_comm.fenceAll(); -#endif - LOG(4, "Exchanging large payloads "); - // 6. Execute unbuffered communications - const size_t maxInt = std::numeric_limits::max(); + // if not, deal with normal sync + m_memreg.sync(); - for (size_t i = 0; i < localNumberOfEdges; ++i) - { - Edge & e = m_edgeRecv[i]; - size_t headSize = e.roundedDstOffset - e.dstOffset ; - size_t tailSize = e.size - e.roundedSize - headSize ; -#if defined LPF_CORE_MPI_USES_mpimsg || defined LPF_CORE_MPI_USES_mpirma - char * head = m_edgeBuffer.data() + e.bufOffset; - char * tail = head + (e.canWriteHead?headSize:0); -#endif -#ifdef LPF_CORE_MPI_USES_mpirma - if ( m_memreg.isLocalSlot( e.dstSlot ) ) { - size_t tailOffset = e.roundedDstOffset + e.roundedSize - - e.dstOffset + e.srcOffset; - - if (e.canWriteHead) { - m_comm.get( e.srcPid, m_memreg.getWindow( e.srcSlot), - e.srcOffset, head, headSize ); - } - - if (e.canWriteTail) { - m_comm.get( e.srcPid, m_memreg.getWindow( e.srcSlot), - tailOffset, tail, tailSize ); - } - } -#endif #ifdef LPF_CORE_MPI_USES_ibverbs - if ( m_memreg.isLocalSlot( e.dstSlot ) ) { - size_t tailOffset = e.roundedDstOffset + e.roundedSize - - e.dstOffset + e.srcOffset; - - if (e.canWriteHead) { - m_ibverbs.get( e.srcPid, m_memreg.getVerbID( e.srcSlot), - e.srcOffset, - m_memreg.getVerbID( m_edgeBufferSlot ), e.bufOffset, - headSize ); - } - - if (e.canWriteTail) { - m_ibverbs.get( e.srcPid, m_memreg.getVerbID( e.srcSlot), - tailOffset, - m_memreg.getVerbID( m_edgeBufferSlot ), - e.bufOffset + (e.canWriteHead?headSize:0), - tailSize ); - } - } + m_ibverbs.syncPerSlot(m_resized, slot); #endif -#ifdef LPF_CORE_MPI_USES_mpimsg - if (e.canWriteHead) - m_comm.irecv( head, headSize, e.srcPid, e.tag ); - if (e.canWriteTail) - m_comm.irecv( tail, tailSize, e.srcPid, e.tag + e.canWriteHead ); -#endif - } - /* note: maintain m_edgeRecv until they have been copied */ + m_resized = false; -#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_ibverbs - ASSERT( m_edgeBufferSlot == m_memreg.invalidSlot() - || m_memreg.getAddress(m_edgeBufferSlot, 0) == m_edgeBuffer.data() ); - ASSERT( m_edgeBufferSlot == m_memreg.invalidSlot() - ||m_memreg.getSize(m_edgeBufferSlot) == m_edgeBuffer.capacity() ); -#endif - for (size_t i = 0; i < m_edgeSend.size(); ++i) - { - Edge & e = m_edgeSend[i]; - size_t headSize = e.roundedDstOffset - e.dstOffset ; - size_t tailOffset = e.roundedDstOffset + e.roundedSize - e.dstOffset; - size_t tailSize = e.size - headSize - e.roundedSize ; + return 0; +} -#if defined LPF_CORE_MPI_USES_mpirma || defined LPF_CORE_MPI_USES_mpimsg - char * head = static_cast( - m_memreg.getAddress( e.srcSlot, e.srcOffset) - ); - char * tail = head + tailOffset; -#endif -#ifdef LPF_CORE_MPI_USES_mpirma - ASSERT( ! m_memreg.isLocalSlot( e.dstSlot ) ) ; - if (e.canWriteHead) - m_comm.put( head, e.dstPid, m_memreg.getWindow( m_edgeBufferSlot ), - e.bufOffset, headSize ); - - if (e.canWriteTail) - m_comm.put( tail, e.dstPid, m_memreg.getWindow( m_edgeBufferSlot ), - e.bufOffset + (e.canWriteHead?headSize:0), tailSize); -#endif +void MessageQueue :: getRcvdMsgCountPerSlot(size_t * msgs, SlotID slot) +{ + *msgs = 0; #ifdef LPF_CORE_MPI_USES_ibverbs - ASSERT( ! m_memreg.isLocalSlot( e.dstSlot ) ) ; - if (e.canWriteHead) - m_ibverbs.put( m_memreg.getVerbID( e.srcSlot), e.srcOffset, - e.dstPid, m_memreg.getVerbID( m_edgeBufferSlot ), - e.bufOffset, headSize ); - - if (e.canWriteTail) - m_ibverbs.put( m_memreg.getVerbID( e.srcSlot), - e.srcOffset + tailOffset , - e.dstPid, m_memreg.getVerbID( m_edgeBufferSlot ), - e.bufOffset + (e.canWriteHead?headSize:0), tailSize); + m_ibverbs.get_rcvd_msg_count_per_slot(msgs, slot); #endif -#ifdef LPF_CORE_MPI_USES_mpimsg - if (e.canWriteHead) - m_comm.isend( head, headSize, e.dstPid, e.tag ); - - if (e.canWriteTail) - m_comm.isend( tail, tailSize, e.dstPid, e.tag + e.canWriteHead ); -#endif - } - m_edgeSend.clear(); +} - for (size_t i = 0; i < m_bodyRecvs.size() ; ++i ) - { - Body & r = m_bodyRecvs[i]; - ASSERT( r.size > 0 ); - ASSERT( maxInt > 0 ); -#if defined LPF_CORE_MPI_USES_mpimsg || defined LPF_CORE_MPI_USES_mpirma - char * addr = static_cast( - m_memreg.getAddress( r.dstSlot, r.roundedDstOffset) - ); -#endif -#ifdef LPF_CORE_MPI_USES_mpirma - size_t shift = r.roundedDstOffset - r.dstOffset; - m_comm.get( r.srcPid, - m_memreg.getWindow( r.srcSlot), - r.srcOffset + shift, - addr, - r.roundedSize ); -#endif +void MessageQueue :: getRcvdMsgCount(size_t * msgs) +{ + *msgs = 0; #ifdef LPF_CORE_MPI_USES_ibverbs - size_t shift = r.roundedDstOffset - r.dstOffset; - m_ibverbs.get( r.srcPid, - m_memreg.getVerbID( r.srcSlot), - r.srcOffset + shift, - m_memreg.getVerbID( r.dstSlot), r.roundedDstOffset, - r.roundedSize ); + m_ibverbs.get_rcvd_msg_count(msgs); #endif -#ifdef LPF_CORE_MPI_USES_mpimsg - ASSERT( r.tag < maxInt ); - m_comm.irecv( addr, r.roundedSize, r.srcPid, r.tag ); -#endif - } - m_bodyRecvs.clear(); +} - for (size_t i = 0; i < m_bodySends.size() ; ++i ) - { - Body & r = m_bodySends[i]; - ASSERT( r.size > 0 ); - ASSERT( maxInt > 0 ); - size_t shift = r.roundedDstOffset - r.dstOffset; -#if defined LPF_CORE_MPI_USES_mpimsg || defined LPF_CORE_MPI_USES_mpirma - char * addr = static_cast( - m_memreg.getAddress( r.srcSlot, r.srcOffset + shift) - ); -#endif -#ifdef LPF_CORE_MPI_USES_mpirma - m_comm.put( addr, - r.dstPid, - m_memreg.getWindow( r.dstSlot), - r.roundedDstOffset, - r.roundedSize ); -#endif +void MessageQueue :: getSentMsgCountPerSlot(size_t * msgs, SlotID slot) +{ + *msgs = 0; #ifdef LPF_CORE_MPI_USES_ibverbs - m_ibverbs.put( m_memreg.getVerbID( r.srcSlot), - r.srcOffset + shift, - r.dstPid, - m_memreg.getVerbID( r.dstSlot), - r.roundedDstOffset, - r.roundedSize ); -#endif -#ifdef LPF_CORE_MPI_USES_mpimsg - ASSERT( r.tag < maxInt ); - m_comm.isend( addr, r.roundedSize, r.dstPid, r.tag ); + m_ibverbs.get_sent_msg_count_per_slot(msgs, slot); #endif - } - m_bodySends.clear(); +} -#ifdef LPF_CORE_MPI_USES_mpimsg - m_comm.iwaitall(); +void MessageQueue :: flushSent() +{ +#ifdef LPF_CORE_MPI_USES_ibverbs + m_ibverbs.flushSent(); #endif +} -#ifdef LPF_CORE_MPI_USES_mpirma - // Make sure that all MPI puts and gets have finished - if (m_nprocs > 1) - m_comm.fenceAll(); -#endif +void MessageQueue :: flushReceived() +{ #ifdef LPF_CORE_MPI_USES_ibverbs - m_ibverbs.sync( m_resized ); + m_ibverbs.flushReceived(); #endif - LOG(4, "Copying edges" ); - - /* 8. now copy the edges */ - for (size_t i = 0; i < localNumberOfEdges; ++i) - { - Edge & edge = m_edgeRecv[i]; - ASSERT( edge.size != 0); - char * addr = static_cast( - m_memreg.getAddress( edge.dstSlot, edge.dstOffset) - ); - size_t size = edge.size; - size_t headSize = edge.roundedDstOffset - edge.dstOffset ; - size_t tailSize = edge.size - headSize - edge.roundedSize ; - - ASSERT( !edge.canWriteHead || edge.bufOffset + headSize <= m_edgeBuffer.size() ); - ASSERT( !edge.canWriteTail || edge.bufOffset + (edge.canWriteHead?headSize:0) - + tailSize <= m_edgeBuffer.size() ); - - char * head = m_edgeBuffer.data() + edge.bufOffset; - char * tail = head + (edge.canWriteHead?headSize:0); - if (edge.canWriteHead) - std::memcpy( addr, head, headSize); - - if (edge.canWriteTail) - std::memcpy( addr + size - tailSize , tail, tailSize ); - } - - LOG(4, "Cleaning up"); - - m_firstQueue->clear(); - m_secondQueue->clear(); - m_edgeBuffer.clear(); - m_resized = false; - ASSERT( m_firstQueue->empty() ); - ASSERT( m_secondQueue->empty() ); - ASSERT( m_msgsort.empty() ); - ASSERT( m_edgeSend.empty() ); - ASSERT( m_edgeBuffer.empty() ); - ASSERT( m_bodySends.empty() ); - ASSERT( m_bodyRecvs.empty() ); - - LOG(4, "End of synchronisation"); - return 0; } - } // namespace lpf diff --git a/src/MPI/mesgqueue.hpp b/src/MPI/mesgqueue.hpp index 27e7beb5..f303e918 100644 --- a/src/MPI/mesgqueue.hpp +++ b/src/MPI/mesgqueue.hpp @@ -41,6 +41,8 @@ namespace lpf { class _LPFLIB_LOCAL MessageQueue { + + typedef size_t SlotID; public: explicit MessageQueue( Communication & comm ); @@ -55,12 +57,30 @@ class _LPFLIB_LOCAL MessageQueue void get( pid_t srcPid, memslot_t srcSlot, size_t srcOffset, memslot_t dstSlot, size_t dstOffset, size_t size ); + void lockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ); + + void unlockSlot( memslot_t srcSlot, size_t srcOffset, + pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ); + void put( memslot_t srcSlot, size_t srcOffset, pid_t dstPid, memslot_t dstSlot, size_t dstOffset, size_t size ); + void getRcvdMsgCountPerSlot(size_t * msgs, SlotID slot); + + void getRcvdMsgCount(size_t * msgs); + + void getSentMsgCountPerSlot(size_t * msgs, SlotID slot); + + void flushSent(); + + void flushReceived(); + // returns how many processes have entered in an aborted state - int sync( bool abort ); + int sync(); + int countingSyncPerSlot(SlotID slot, size_t expected_sent, size_t expected_rcvd); + int syncPerSlot(SlotID slot); private: enum Msgs { BufPut , diff --git a/src/MPI/process.cpp b/src/MPI/process.cpp index eb7a5724..a3f543e5 100644 --- a/src/MPI/process.cpp +++ b/src/MPI/process.cpp @@ -25,6 +25,7 @@ #include "log.hpp" #include "assert.hpp" + namespace lpf { Process :: Process( const mpi::Comm & comm ) @@ -256,6 +257,8 @@ err_t Process :: hook( const mpi::Comm & machine, Process & subprocess, if ( runtime.isAborted() != pid_t(machine.nprocs()) ) { // in which case I stopped early + LOG(2, "This process called lpf_sync fewer times than in" + " the other processes. runtime.isAborted() = " << runtime.isAborted() << " nprocs = " << pid_t(machine.nprocs())); LOG(2, "This process called lpf_sync fewer times than in" " the other processes" ); status = LPF_ERR_FATAL; @@ -282,7 +285,8 @@ err_t Process :: hook( const mpi::Comm & machine, Process & subprocess, { LOG(1, "Caught exception of unknown type while executing " "user SPMD function. Aborting..." ); -/*S=3*/ runtime.abort(); + /*S=3*/ runtime.abort(); + status = LPF_ERR_FATAL; } } diff --git a/src/MPI/spall2all.c b/src/MPI/spall2all.c index 610bd09f..cfeccabc 100644 --- a/src/MPI/spall2all.c +++ b/src/MPI/spall2all.c @@ -258,6 +258,7 @@ static int sparse_all_to_all_pop( sparse_all_to_all_t * obj, int n, *pid = -1; *interm_pid = -1; } + return error ; } diff --git a/src/core-libraries/collectives.c b/src/core-libraries/collectives.c index ff952e1f..cc80a69b 100644 --- a/src/core-libraries/collectives.c +++ b/src/core-libraries/collectives.c @@ -390,6 +390,41 @@ lpf_err_t lpf_allgather( return LPF_SUCCESS; } + +lpf_err_t lpf_allgatherv( + lpf_coll_t coll, + lpf_memslot_t src, + lpf_memslot_t dst, + size_t *sizes, + bool exclude_myself + ) { + + ASSERT( coll.P > 0 ); + ASSERT( coll.s < coll.P ); + + size_t allgatherv_start_addresses[coll.P]; + + for (size_t i=0; i 0) { + for (size_t i=0; isync(); } +_LPFLIB_API lpf_err_t lpf_counting_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + (void) attr; + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + return realContext(ctx)->countingSyncPerSlot(slot, expected_sent, expected_rcvd); +} + +_LPFLIB_API lpf_err_t lpf_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot) +{ + (void) attr; + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + return realContext(ctx)->syncPerSlot(slot); +} _LPFLIB_API lpf_err_t lpf_probe( lpf_t ctx, lpf_machine_t * params ) { @@ -384,4 +401,40 @@ _LPFLIB_API lpf_err_t lpf_resize_memory_register( lpf_t ctx, size_t max_regs ) return LPF_SUCCESS; } +_LPFLIB_API lpf_err_t lpf_get_rcvd_msg_count( lpf_t ctx, size_t * rcvd_msgs) +{ + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + ThreadState * t = realContext(ctx); + if (!t->error()) + return t->getRcvdMsgCount(rcvd_msgs); + else + return LPF_SUCCESS; +} + +_LPFLIB_API lpf_err_t lpf_get_rcvd_msg_count_per_slot( lpf_t ctx, size_t * rcvd_msgs, lpf_memslot_t slot ) +{ + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + ThreadState * t = realContext(ctx); + if (!t->error()) + return t->getRcvdMsgCount(rcvd_msgs, slot); + else + return LPF_SUCCESS; +} + +_LPFLIB_API lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t ctx, size_t * sent_msgs, lpf_memslot_t slot ) +{ + using namespace lpf::hybrid; + if (ctx == LPF_SINGLE_PROCESS) + return LPF_SUCCESS; + ThreadState * t = realContext(ctx); + if (!t->error()) + return t->getSentMsgCount(sent_msgs, slot); + else + return LPF_SUCCESS; +} + } // extern "C" diff --git a/src/hybrid/dispatch.hpp b/src/hybrid/dispatch.hpp index 1235e513..833746bf 100644 --- a/src/hybrid/dispatch.hpp +++ b/src/hybrid/dispatch.hpp @@ -112,6 +112,21 @@ namespace lpf { namespace hybrid { err_t deregister( memslot_t memslot) { return USE_THREAD( deregister)(m_ctx, memslot); } + err_t get_rcvd_msg_count_per_slot( size_t * rcvd_msgs, lpf_memslot_t slot) + { return USE_THREAD( get_rcvd_msg_count_per_slot)(m_ctx, rcvd_msgs, slot); } + + err_t get_sent_msg_count_per_slot( size_t * sent_msgs, lpf_memslot_t slot) + { return USE_THREAD( get_sent_msg_count_per_slot)(m_ctx, sent_msgs, slot); } + + err_t get_rcvd_msg_count( size_t * rcvd_msgs) + { return USE_THREAD( get_rcvd_msg_count)(m_ctx, rcvd_msgs); } + + err_t flush_sent() + { return USE_THREAD(flush_sent)(m_ctx); } + + err_t flush_received() + { return USE_THREAD(flush_received)(m_ctx); } + err_t put( memslot_t src_slot, size_t src_offset, pid_t dst_pid, memslot_t dst_slot, size_t dst_offset, size_t size, msg_attr_t attr = MSG_DEFAULT ) @@ -127,6 +142,12 @@ namespace lpf { namespace hybrid { err_t sync( sync_attr_t attr = SYNC_DEFAULT ) { return USE_THREAD(sync)( m_ctx, attr ); } + err_t sync_per_slot( sync_attr_t attr = SYNC_DEFAULT, memslot_t slot = LPF_INVALID_MEMSLOT) + { return USE_THREAD(sync_per_slot)( m_ctx, attr, slot); } + + err_t counting_sync_per_slot( sync_attr_t attr = SYNC_DEFAULT, lpf_memslot_t slot = LPF_INVALID_MEMSLOT, size_t expected_sent = 0, size_t expected_recvd = 0) + { return USE_THREAD(counting_sync_per_slot)(m_ctx, attr, slot, expected_sent, expected_recvd); } + err_t probe( machine_t * params ) { return USE_THREAD(probe)(m_ctx, params ); } @@ -202,6 +223,21 @@ namespace lpf { namespace hybrid { err_t deregister( memslot_t memslot) { return USE_MPI( deregister)(m_ctx, memslot); } + err_t get_rcvd_msg_count_per_slot(size_t *rcvd_msgs, lpf_memslot_t slot) + { return USE_MPI( get_rcvd_msg_count_per_slot)( m_ctx, rcvd_msgs, slot); } + + err_t get_sent_msg_count_per_slot(size_t *sent_msgs, lpf_memslot_t slot) + { return USE_MPI( get_sent_msg_count_per_slot)( m_ctx, sent_msgs, slot); } + + err_t get_rcvd_msg_count( size_t * rcvd_msgs) + { return USE_MPI( get_rcvd_msg_count)(m_ctx, rcvd_msgs); } + + err_t flush_sent() + {return USE_MPI( flush_sent)(m_ctx);} + + err_t flush_received() + {return USE_MPI( flush_received)(m_ctx);} + err_t put( memslot_t src_slot, size_t src_offset, pid_t dst_pid, memslot_t dst_slot, size_t dst_offset, size_t size, msg_attr_t attr = MSG_DEFAULT ) @@ -217,6 +253,12 @@ namespace lpf { namespace hybrid { err_t sync( sync_attr_t attr = SYNC_DEFAULT ) { return USE_MPI(sync)( m_ctx, attr ); } + err_t sync_per_slot( sync_attr_t attr = SYNC_DEFAULT, lpf_memslot_t slot = LPF_INVALID_MEMSLOT ) + { return USE_MPI(sync_per_slot)( m_ctx, attr, slot); } + + err_t counting_sync_per_slot( sync_attr_t attr = SYNC_DEFAULT, lpf_memslot_t slot = LPF_INVALID_MEMSLOT, size_t expected_sent = 0, size_t expected_recvd = 0) + { return USE_MPI(counting_sync_per_slot)(m_ctx, attr, slot, expected_sent, expected_recvd); } + err_t probe( machine_t * params ) { return USE_MPI(probe)(m_ctx, params ); } diff --git a/src/hybrid/state.hpp b/src/hybrid/state.hpp index 6ae1dd3a..06e8faf3 100644 --- a/src/hybrid/state.hpp +++ b/src/hybrid/state.hpp @@ -111,6 +111,13 @@ class _LPFLIB_LOCAL NodeState { return m_mpi.sync(); } +// MPI::err_t counting_sync_per_slot(lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +// { +// m_memreg.flush( m_mpi ); +// m_msgQueue.flush( m_mpi, m_memreg ); +// return m_mpi.counting_sync_per_slot(slot, expected_sent, expected_rcvd); +// } + static double messageGap( lpf_pid_t nprocs, size_t minMsgSize, lpf_sync_attr_t attr) { (void) nprocs; @@ -367,6 +374,16 @@ class _LPFLIB_LOCAL ThreadState { return LPF_SUCCESS; } + lpf_err_t countingSyncPerSlot(lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) + { + return m_nodeState.mpi().counting_sync_per_slot(slot, expected_sent, expected_rcvd); + } + + lpf_err_t syncPerSlot(lpf_memslot_t slot) + { + return m_nodeState.mpi().sync_per_slot(slot); + } + ThreadState( NodeState * nodeState, Thread thread ) : m_error(false) , m_threadId( thread.pid() ) @@ -405,6 +422,25 @@ class _LPFLIB_LOCAL ThreadState { bool error() const { return m_error; } + lpf_pid_t getRcvdMsgCount(size_t * rcvd_msgs, lpf_memslot_t slot) { + + return m_nodeState.mpi().get_rcvd_msg_count_per_slot(rcvd_msgs, slot); + } + + lpf_pid_t getSentMsgCount(size_t * sent_msgs, lpf_memslot_t slot) { + + return m_nodeState.mpi().get_sent_msg_count_per_slot(sent_msgs, slot); + } + + lpf_pid_t getRcvdMsgCount(size_t * rcvd_msgs) { + + return m_nodeState.mpi().get_rcvd_msg_count(rcvd_msgs); + } + + lpf_pid_t flush() { + return (m_nodeState.mpi().flush_sent() && m_nodeState.mpi().flush_received()); + } + private: bool m_error; diff --git a/src/imp/core.c b/src/imp/core.c index 990e267c..7d3dde9f 100644 --- a/src/imp/core.c +++ b/src/imp/core.c @@ -137,6 +137,39 @@ lpf_err_t lpf_sync( lpf_t lpf, lpf_sync_attr_t attr ) return LPF_SUCCESS; } +lpf_err_t lpf_counting_sync_per_slot( lpf_t lpf, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + (void) lpf; + (void) attr; + return LPF_SUCCESS; +} + +lpf_err_t lpf_lock_slot( + lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +) { + return LPF_SUCCESS; +} + +lpf_err_t lpf_unlock_slot( + lpf_t ctx, + lpf_memslot_t src_slot, + size_t src_offset, + lpf_pid_t dst_pid, + lpf_memslot_t dst_slot, + size_t dst_offset, + size_t size, + lpf_msg_attr_t attr +) { + return LPF_SUCCESS; +} + static double messageGap( lpf_pid_t p, size_t min_msg_size, lpf_sync_attr_t attr) { (void) p; @@ -179,3 +212,26 @@ lpf_err_t lpf_resize_memory_register( lpf_t lpf, size_t max_regs ) return LPF_SUCCESS; } +lpf_err_t lpf_get_rcvd_msg_count_per_slot( lpf_t lpf, size_t * rcvd_msgs, lpf_memslot_t slot) { + (void) lpf; + *rcvd_msgs = 0; + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_rcvd_msg_count( lpf_t lpf, size_t * rcvd_msgs) { + (void) lpf; + *rcvd_msgs = 0; + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_sent_msg_count_per_slot( lpf_t lpf, size_t * sent_msgs, lpf_memslot_t slot) { + (void) lpf; + *sent_msgs = 0; + return LPF_SUCCESS; +} + +lpf_err_t lpf_flush( lpf_t lpf) { + (void) lpf; + return LPF_SUCCESS; +} + diff --git a/src/pthreads/core.cpp b/src/pthreads/core.cpp index 1d90588a..bfe44c58 100644 --- a/src/pthreads/core.cpp +++ b/src/pthreads/core.cpp @@ -330,6 +330,13 @@ lpf_err_t lpf_sync( lpf_t ctx, lpf_sync_attr_t attr ) return realCtx(ctx)->sync(); } +lpf_err_t lpf_counting_sync_per_slot( lpf_t ctx, lpf_sync_attr_t attr, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) +{ + (void) attr; // ignore attr parameter since this implementation only + // implements core functionality + return realCtx(ctx)->countingSyncPerSlot(slot, expected_sent, expected_rcvd); +} + namespace { double messageGap( lpf_pid_t p, size_t min_msg_size, @@ -378,3 +385,27 @@ lpf_err_t lpf_resize_memory_register( lpf_t ctx, size_t max_regs ) return t->resizeMemreg(max_regs); } +lpf_err_t lpf_get_rcvd_msg_count_per_slot(lpf_t ctx, size_t * msgs, lpf_memslot_t slot) { + *msgs = 0; + lpf::ThreadLocalData * t = realCtx(ctx); + if (t->isAborted()) + return LPF_SUCCESS; + return LPF_SUCCESS; +} + + +lpf_err_t lpf_get_rcvd_msg_count(lpf_t ctx, size_t * msgs) { + *msgs = 0; + lpf::ThreadLocalData * t = realCtx(ctx); + if (t->isAborted()) + return LPF_SUCCESS; + return LPF_SUCCESS; +} + +lpf_err_t lpf_get_sent_msg_count_per_slot(lpf_t ctx, size_t * msgs, lpf_memslot_t slot) { + *msgs = 0; + lpf::ThreadLocalData * t = realCtx(ctx); + if (t->isAborted()) + return LPF_SUCCESS; + return LPF_SUCCESS; +} diff --git a/src/pthreads/threadlocaldata.cpp b/src/pthreads/threadlocaldata.cpp index 6bb358f1..6a62e4d3 100644 --- a/src/pthreads/threadlocaldata.cpp +++ b/src/pthreads/threadlocaldata.cpp @@ -423,7 +423,7 @@ err_t ThreadLocalData :: resizeMemreg( size_t nRegs ) // nothrow } } -err_t ThreadLocalData :: sync( bool expectExit ) +err_t ThreadLocalData :: sync( bool expectExit) { if ( m_state->sync(m_pid) ) { @@ -441,6 +441,10 @@ err_t ThreadLocalData :: sync( bool expectExit ) return LPF_SUCCESS; } +err_t ThreadLocalData :: countingSyncPerSlot(bool expectExit, lpf_memslot_t slot, size_t expected_sent, size_t expected_rcvd) { + return LPF_SUCCESS; +} + namespace { int getNumberOfProcs() { diff --git a/src/pthreads/threadlocaldata.hpp b/src/pthreads/threadlocaldata.hpp index 66d56160..c1a83706 100644 --- a/src/pthreads/threadlocaldata.hpp +++ b/src/pthreads/threadlocaldata.hpp @@ -105,6 +105,8 @@ class _LPFLIB_LOCAL ThreadLocalData { return m_atExit[0]; } err_t sync( bool expectExit = false ); // nothrow + err_t countingSyncPerSlot( bool expectExit = false, lpf_memslot_t slot = LPF_INVALID_MEMSLOT, size_t expected_sent = 0, size_t expected_rcvd = 0); // nothrow + err_t syncPerSlot( bool expectExit = false, lpf_memslot_t slot = LPF_INVALID_MEMSLOT); // nothrow private: ThreadLocalData( const ThreadLocalData & ) ; // prohibit copying diff --git a/tests/functional/func_lpf_compare_and_swap.ibverbs.c b/tests/functional/func_lpf_compare_and_swap.ibverbs.c new file mode 100644 index 00000000..b4d84773 --- /dev/null +++ b/tests/functional/func_lpf_compare_and_swap.ibverbs.c @@ -0,0 +1,86 @@ + +/* + * Copyright 2021 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "Test.h" + +void spmd( lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args) +{ + (void) args; // ignore args parameter + lpf_err_t rc = LPF_SUCCESS; + + // local x is the compare-and-swap value and is important at non-root + uint64_t localSwap = 0ULL; + // global y is the global slot at 0, and should be initialized to 0ULL + uint64_t globalSwap = 0ULL; + int x = 0; + int y = 0; + lpf_memslot_t localSwapSlot = LPF_INVALID_MEMSLOT; + lpf_memslot_t globalSwapSlot = LPF_INVALID_MEMSLOT; + size_t maxmsgs = 2 , maxregs = 2; + rc = lpf_resize_message_queue( lpf, maxmsgs); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_resize_memory_register( lpf, maxregs ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_sync( lpf, LPF_SYNC_DEFAULT ); + lpf_memslot_t xslot = LPF_INVALID_MEMSLOT; + lpf_memslot_t yslot = LPF_INVALID_MEMSLOT; + rc = lpf_register_local( lpf, &localSwap, sizeof(localSwap), &localSwapSlot ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_register_local( lpf, &x, sizeof(x), &xslot ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_register_global( lpf, &globalSwap, sizeof(globalSwap), &globalSwapSlot ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_register_global( lpf, &y, sizeof(y), &yslot ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_sync( lpf, LPF_SYNC_DEFAULT); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + + + // BLOCKING + rc = lpf_lock_slot(lpf, localSwapSlot, 0, 0 /* rank where global slot to lock resides*/, globalSwapSlot, 0, sizeof(globalSwapSlot), LPF_MSG_DEFAULT); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_get( lpf, 0, yslot, 0, xslot, 0, sizeof(x), LPF_MSG_DEFAULT ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_sync_per_slot( lpf, LPF_SYNC_DEFAULT, xslot); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + x = x + 1; + rc = lpf_put( lpf, xslot, 0, 0, yslot, 0, sizeof(x), LPF_MSG_DEFAULT ); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + rc = lpf_sync_per_slot( lpf, LPF_SYNC_DEFAULT, xslot); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + // BLOCKING + lpf_unlock_slot(lpf, localSwapSlot, 0, 0 /* rank where global slot to lock resides*/, globalSwapSlot, 0, sizeof(globalSwapSlot), LPF_MSG_DEFAULT); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + lpf_sync(lpf, LPF_MSG_DEFAULT); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + if (pid == 0) + printf("Rank %d: y = %d\n", pid, y); +} + +/** + * \test Test atomic compare-and-swap on a global slot + * \pre P >= 1 + * \return Exit code: 0 + */ +TEST( func_lpf_compare_and_swap ) +{ + lpf_err_t rc = lpf_exec( LPF_ROOT, LPF_MAX_P, spmd, LPF_NO_ARGS); + EXPECT_EQ( "%d", LPF_SUCCESS, rc ); + return 0; +} diff --git a/tests/functional/func_lpf_put_parallel_single.c b/tests/functional/func_lpf_put_parallel_single.c index 9fa85d84..78794fcf 100644 --- a/tests/functional/func_lpf_put_parallel_single.c +++ b/tests/functional/func_lpf_put_parallel_single.c @@ -38,7 +38,6 @@ void spmd( lpf_t lpf, lpf_pid_t pid, lpf_pid_t nprocs, lpf_args_t args) EXPECT_EQ( "%d", LPF_SUCCESS, rc ); rc = lpf_register_global( lpf, &y, sizeof(y), &yslot ); EXPECT_EQ( "%d", LPF_SUCCESS, rc ); - rc = lpf_sync( lpf, LPF_SYNC_DEFAULT); EXPECT_EQ( "%d", LPF_SUCCESS, rc );