diff --git a/media-proxy/CMakeLists.txt b/media-proxy/CMakeLists.txt index 5c2b423fb..79d1bbe94 100644 --- a/media-proxy/CMakeLists.txt +++ b/media-proxy/CMakeLists.txt @@ -139,7 +139,7 @@ include_directories( add_library(media_proxy_lib STATIC ${proxy_srcs} ${PROTO_SRCS} ${PROTO_HDRS} ${GRPC_SRCS} ${GRPC_HDRS}) target_link_libraries(media_proxy_lib PUBLIC m ${MTL_LIB} ${MEMIF_LIB} ${LIBFABRIC} - gRPC::grpc++ protobuf::libprotobuf) + gRPC::grpc++ protobuf::libprotobuf rdmacm) add_executable(media_proxy ${proxy_srcs} src/media_proxy.cc) target_link_libraries(media_proxy PRIVATE media_proxy_lib) diff --git a/media-proxy/include/libfabric_cq.h b/media-proxy/include/libfabric_cq.h index 549597384..6f886c845 100644 --- a/media-proxy/include/libfabric_cq.h +++ b/media-proxy/include/libfabric_cq.h @@ -12,6 +12,7 @@ extern "C" { #endif #include +#include /* forward declaration */ typedef struct ep_ctx_t ep_ctx_t; @@ -30,6 +31,7 @@ typedef struct { uint64_t cq_cntr; int cq_fd; int (*eq_read)(ep_ctx_t *ep_ctx, struct fi_cq_err_entry *entry, int timeout); + bool external; } cq_ctx_t; #ifdef UNIT_TESTS_ENABLED diff --git a/media-proxy/include/libfabric_dev.h b/media-proxy/include/libfabric_dev.h index 4444d5087..1dd2ef126 100644 --- a/media-proxy/include/libfabric_dev.h +++ b/media-proxy/include/libfabric_dev.h @@ -115,12 +115,35 @@ extern "C" { } \ } while (0) +/** + * Kind + * + * Definition of connection kinds. + */ +typedef enum { + FI_KIND_UNDEFINED = 0, + FI_KIND_TRANSMITTER, + FI_KIND_RECEIVER, +} conn_kind; typedef struct { struct fid_fabric *fabric; struct fid_domain *domain; struct fi_info *info; + conn_kind kind; + const char *local_ip; + const char *remote_ip; + const char *local_port; + const char *remote_port; + bool is_initialized; // Indicates if context is fully initialized + int ep_attr_type; // Store EP type for endpoint creation + int addr_format; // Store address format for endpoint creation + const char *provider_name; // Provider name (e.g., "tcp", "verbs") } libfabric_ctx; +static void rdma_free_res(libfabric_ctx *rdma_ctx); +static void rdma_freehints(struct fi_info *hints); +static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints); + #ifdef UNIT_TESTS_ENABLED int rdma_init(libfabric_ctx **ctx); /* Deinitialize RDMA */ diff --git a/media-proxy/include/libfabric_ep.h b/media-proxy/include/libfabric_ep.h index 66ab5a781..6f60eadc9 100644 --- a/media-proxy/include/libfabric_ep.h +++ b/media-proxy/include/libfabric_ep.h @@ -43,6 +43,7 @@ typedef struct { rdma_addr remote_addr; rdma_addr local_addr; enum direction dir; + struct fid_cq *shared_rx_cq; } ep_cfg_t; /** diff --git a/media-proxy/include/mesh/conn_rdma.h b/media-proxy/include/mesh/conn_rdma.h index a49ff3398..a19c8a753 100644 --- a/media-proxy/include/mesh/conn_rdma.h +++ b/media-proxy/include/mesh/conn_rdma.h @@ -14,6 +14,7 @@ #include #include #include +#include #ifndef RDMA_DEFAULT_TIMEOUT #define RDMA_DEFAULT_TIMEOUT 1 @@ -27,6 +28,9 @@ #ifndef PAGE_SIZE #define PAGE_SIZE 4096 #endif +#ifndef RDMA_NUM_EPS +#define RDMA_NUM_EPS 1 +#endif namespace mesh::connection { @@ -79,7 +83,12 @@ class Rdma : public Connection { // RDMA-specific members libfabric_ctx *m_dev_handle; // RDMA device handle - ep_ctx_t *ep_ctx; // RDMA endpoint context + // Configurable number of RDMA endpoints (default RDMA_NUM_EPS) + size_t rdma_num_eps = RDMA_NUM_EPS; + // Configurable provider: "tcp" or "verbs" + std::string rdma_provider = "verbs"; // Default provider + // Endpoint contexts (one pointer per EP) + std::vector ep_ctxs; ep_cfg_t ep_cfg; // RDMA endpoint configuration size_t trx_sz; // Data transfer size bool init; // Indicates if RDMA is initialized @@ -87,6 +96,8 @@ class Rdma : public Connection { int queue_size; // Number of buffers in the queue static std::atomic active_connections; // Number of active RDMA connections + static constexpr size_t TRAILER = sizeof(uint64_t); // Size of the trailer for sequence number + // Queue for managing buffers std::queue buffer_queue; // Queue holding available buffers std::mutex queue_mutex; // Mutex for buffer queue synchronization diff --git a/media-proxy/include/mesh/conn_rdma_rx.h b/media-proxy/include/mesh/conn_rdma_rx.h index c8695fb90..4dceb8fbc 100644 --- a/media-proxy/include/mesh/conn_rdma_rx.h +++ b/media-proxy/include/mesh/conn_rdma_rx.h @@ -28,6 +28,10 @@ class RdmaRx : public Rdma { // Receive data using RDMA void process_buffers_thread(context::Context& ctx); void rdma_cq_thread(context::Context& ctx); + std::atomic next_rx_idx; + static constexpr size_t REORDER_WINDOW = 256; // > max expected out-of-order + std::array reorder_ring{{nullptr}}; + uint64_t reorder_head = UINT64_MAX; }; } // namespace mesh::connection diff --git a/media-proxy/include/mesh/conn_rdma_tx.h b/media-proxy/include/mesh/conn_rdma_tx.h index faae89f44..ce065af53 100644 --- a/media-proxy/include/mesh/conn_rdma_tx.h +++ b/media-proxy/include/mesh/conn_rdma_tx.h @@ -28,6 +28,9 @@ class RdmaTx : public Rdma { protected: virtual Result start_threads(context::Context& ctx); void rdma_cq_thread(context::Context& ctx); + // one 64-bit counter shared by all RdmaTx + inline static std::atomic global_seq{0}; + std::atomic next_tx_idx; }; } // namespace mesh::connection diff --git a/media-proxy/src/libfabric_dev.c b/media-proxy/src/libfabric_dev.c index f63858a7a..8632e7be3 100644 --- a/media-proxy/src/libfabric_dev.c +++ b/media-proxy/src/libfabric_dev.c @@ -9,6 +9,8 @@ #include #include +#include +#include #include "libfabric_dev.h" @@ -23,6 +25,15 @@ static void rdma_freehints(struct fi_info *hints) if (!hints) return; + if (hints->tx_attr) { + free(hints->tx_attr); + hints->tx_attr = NULL; + } + if (hints->rx_attr) { + free(hints->rx_attr); + hints->rx_attr = NULL; + } + if (hints->domain_attr->name) { free(hints->domain_attr->name); hints->domain_attr->name = NULL; @@ -53,7 +64,7 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints) { int ret; - ret = fi_getinfo(FI_VERSION(1, 21), NULL, NULL, 0, hints, &rdma_ctx->info); + ret = fi_getinfo(FI_VERSION(2, 0), NULL, NULL, 0, hints, &rdma_ctx->info); if (ret) { RDMA_PRINTERR("fi_getinfo", ret); return ret; @@ -65,6 +76,12 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints) return ret; } + ret = fi_domain(rdma_ctx->fabric, rdma_ctx->info, &rdma_ctx->domain, NULL); + if (ret) { + RDMA_PRINTERR("fi_domain", ret); + return ret; + } + /* TODO: future improvement: Add and monitor EQ to catch errors. * ret = fi_eq_open(rdma_ctx->fabric, &eq_attr, &eq, NULL); * if (ret) { @@ -72,12 +89,6 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints) * return ret; * } */ - ret = fi_domain(rdma_ctx->fabric, rdma_ctx->info, &rdma_ctx->domain, NULL); - if (ret) { - RDMA_PRINTERR("fi_domain", ret); - return ret; - } - /* TODO: future improvement: Add and monitor EQ to catch errors. * ret = fi_domain_bind(rdma_ctx->domain, &eq->fid, 0); * if (ret) { @@ -90,42 +101,178 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints) int rdma_init(libfabric_ctx **ctx) { - struct fi_info *hints; - int op, ret = 0; - *ctx = calloc(1, sizeof(libfabric_ctx)); - if (*ctx == NULL) { - RDMA_PRINTERR("calloc", -ENOMEM); - return -ENOMEM; + printf("[rdma_init] Entering function\n"); + struct fi_info *hints = NULL; + struct rdma_addrinfo *rai_res = NULL; + struct rdma_addrinfo rai_hints; + int ret = 0; + + if (!ctx || !(*ctx)) { + printf("[rdma_init] ERROR: ctx is NULL\n"); + return -EINVAL; + } + + // Clear any previous initialization + if ((*ctx)->fabric || (*ctx)->domain || (*ctx)->info) { + printf("[rdma_init] WARNING: Context already contains initialized resources\n"); + rdma_free_res(*ctx); } + memset(&rai_hints, 0, sizeof(rai_hints)); + printf("[rdma_init] Initialized rai_hints to zero\n"); + + /* Allocate fi_info hints */ hints = fi_allocinfo(); if (!hints) { - RDMA_PRINTERR("fi_allocinfo", ret); - libfabric_dev_ops.rdma_deinit(ctx); + printf("[rdma_init] ERROR: fi_allocinfo failed\n"); return -ENOMEM; } + printf("[rdma_init] fi_allocinfo returned valid hints at %p\n", hints); - hints->domain_attr->mr_mode = - FI_MR_LOCAL | FI_MR_ENDPOINT | (FI_MR_ALLOCATED | FI_MR_PROV_KEY | FI_MR_VIRT_ADDR); - hints->ep_attr->type = FI_EP_RDM; - hints->caps = FI_MSG; - hints->addr_format = FI_SOCKADDR_IN; - hints->fabric_attr->prov_name = strdup(LIB_FABRIC_ATTR_PROV_NAME_TCP); - hints->tx_attr->tclass = FI_TC_BULK_DATA; - hints->domain_attr->resource_mgmt = FI_RM_ENABLED; - hints->mode = FI_OPT_ENDPOINT; - hints->tx_attr->size = 32; // Transmit queue size - hints->rx_attr->size = 32; // Receive queue size + if ((*ctx)->provider_name && strcmp((*ctx)->provider_name, "verbs") == 0) { + hints->fabric_attr->prov_name = strdup(LIB_FABRIC_ATTR_PROV_NAME_VERBS); + hints->ep_attr->type = FI_EP_RDM; // Reliable datagram + hints->ep_attr->protocol = FI_PROTO_RXM; + hints->addr_format = FI_SOCKADDR_IN; // IPv4 + hints->domain_attr->av_type = FI_AV_UNSPEC; + hints->domain_attr->mr_mode = + FI_MR_LOCAL | FI_MR_ALLOCATED | FI_MR_VIRT_ADDR | FI_MR_PROV_KEY; + hints->domain_attr->data_progress = FI_PROGRESS_AUTO; // Use auto progress - ret = rdma_init_fabric(*ctx, hints); - rdma_freehints(hints); + /* Adjust capabilities based on the direction */ + if ((*ctx)->kind == FI_KIND_RECEIVER) { + hints->caps = FI_MSG | FI_RECV | FI_RMA | FI_REMOTE_READ | FI_LOCAL_COMM; + if (hints->rx_attr) { + hints->rx_attr->size = 1024; // Larger receive queue + } + } else { + hints->caps = FI_MSG | FI_SEND | FI_RMA | FI_REMOTE_WRITE | FI_LOCAL_COMM; + if (hints->tx_attr) { + hints->tx_attr->size = 1024; // Larger send queue + } + } + } else { + hints->domain_attr->mr_mode = + FI_MR_LOCAL | FI_MR_VIRT_ADDR; + hints->ep_attr->type = FI_EP_RDM; + hints->caps = FI_MSG; + hints->addr_format = FI_SOCKADDR_IN; + hints->fabric_attr->prov_name = strdup(LIB_FABRIC_ATTR_PROV_NAME_TCP); + hints->tx_attr->tclass = FI_TC_BULK_DATA; + hints->domain_attr->resource_mgmt = FI_RM_ENABLED; + hints->mode = FI_OPT_ENDPOINT; + hints->tx_attr->size = 1024; // Transmit queue size + hints->rx_attr->size = 1024; // Receive queue size + hints->domain_attr->threading = FI_THREAD_UNSPEC; + } + + /* Store configuration for later endpoint use */ + (*ctx)->is_initialized = false; + (*ctx)->ep_attr_type = hints->ep_attr->type; + (*ctx)->addr_format = hints->addr_format; + + /* Adjust capabilities based on the connection kind */ + if ((*ctx)->kind == FI_KIND_RECEIVER) { + /* For RX, resolve the local address */ + ret = rdma_getaddrinfo((*ctx)->local_ip, (*ctx)->local_port, &rai_hints, &rai_res); + if (ret) { + printf("[rdma_init] ERROR: rdma_getaddrinfo (RX) returned %d\n", ret); + goto err; + } + + hints->src_addr = malloc(rai_res->ai_src_len); + if (!hints->src_addr) { + ret = -ENOMEM; + goto err; + } + + // Set the FI_SOURCE flag to tell provider to use src_addr for binding + hints->caps |= FI_SOURCE; + + memcpy(hints->src_addr, rai_res->ai_src_addr, rai_res->ai_src_len); + hints->src_addrlen = rai_res->ai_src_len; + } else { + /* For TX, resolve the remote address */ + ret = rdma_getaddrinfo((*ctx)->remote_ip, (*ctx)->remote_port, &rai_hints, &rai_res); + if (ret) { + printf("[rdma_init] ERROR: rdma_getaddrinfo (TX) returned %d\n", ret); + goto err; + } + + // Handle source address + if (rai_res->ai_src_len > 0) { + hints->src_addr = malloc(rai_res->ai_src_len); + if (!hints->src_addr) { + ret = -ENOMEM; + goto err; + } + memcpy(hints->src_addr, rai_res->ai_src_addr, rai_res->ai_src_len); + hints->src_addrlen = rai_res->ai_src_len; + } + + // Handle destination address + if (rai_res->ai_dst_len > 0) { + hints->dest_addr = malloc(rai_res->ai_dst_len); + if (!hints->dest_addr) { + ret = -ENOMEM; + goto err; + } + memcpy(hints->dest_addr, rai_res->ai_dst_addr, rai_res->ai_dst_len); + hints->dest_addrlen = rai_res->ai_dst_len; + } + } + + /* Create fabric info, domain and fabric */ + // For RX, pass hostname and port to enforce binding + if ((*ctx)->kind == FI_KIND_RECEIVER) { + ret = fi_getinfo(FI_VERSION(2, 0), (*ctx)->local_ip, (*ctx)->local_port, + FI_SOURCE, hints, &(*ctx)->info); + } else { + // For TX, use the remote address info to force connectivity to specific endpoint + printf("[rdma_init] TX - Enforcing connection to remote %s:%s\n", (*ctx)->remote_ip, + (*ctx)->remote_port); + + ret = fi_getinfo(FI_VERSION(2, 0), (*ctx)->remote_ip, (*ctx)->remote_port, 0, hints, + &(*ctx)->info); + } if (ret) { - libfabric_dev_ops.rdma_deinit(ctx); - ERROR("Failed to initialize RDMA device"); - return ret; + fprintf(stderr, "[rdma_init] ERROR: fi_getinfo returned %d (%s)\n", ret, fi_strerror(-ret)); + goto err; + } + + ret = fi_fabric((*ctx)->info->fabric_attr, &(*ctx)->fabric, NULL); + if (ret) { + printf("[rdma_init] ERROR: fi_fabric returned %d\n", ret); + goto err_info; } + ret = fi_domain((*ctx)->fabric, (*ctx)->info, &(*ctx)->domain, NULL); + if (ret) { + printf("[rdma_init] ERROR: fi_domain returned %d\n", ret); + goto err_fabric; + } + + /* Clean up temporary resources */ + rdma_freeaddrinfo(rai_res); + rdma_freehints(hints); // We've already stored the info in ctx->info + + (*ctx)->is_initialized = true; + printf("[rdma_init] Device context successfully initialized\n"); return 0; + +err_fabric: + fi_close(&(*ctx)->fabric->fid); + +err_info: + fi_freeinfo((*ctx)->info); + free(*ctx); + *ctx = NULL; + +err: + rdma_freeaddrinfo(rai_res); + rdma_freehints(hints); + fprintf(stderr, "[rdma_init] Failed with error %d\n", ret); + return ret; } static void rdma_free_res(libfabric_ctx *rdma_ctx) diff --git a/media-proxy/src/libfabric_ep.c b/media-proxy/src/libfabric_ep.c index 1f465f312..ae08b716a 100644 --- a/media-proxy/src/libfabric_ep.c +++ b/media-proxy/src/libfabric_ep.c @@ -67,6 +67,10 @@ #include #include +#include +#include +#include + int get_interface_name_by_ip(const char *ip_str, char *if_name, size_t if_name_len) { struct ifaddrs *ifaddr, *ifa; int family; @@ -110,11 +114,26 @@ static int enable_ep(ep_ctx_t *ep_ctx) { int ret; - RDMA_EP_BIND(ep_ctx->ep, ep_ctx->av, 0); + // Only bind to AV if it exists + if (ep_ctx->av) { + ret = fi_ep_bind(ep_ctx->ep, &ep_ctx->av->fid, 0); + if (ret) { + RDMA_PRINTERR("fi_ep_bind (AV)", ret); + return ret; + } + } else { + // For endpoints that don't require AV (like FI_EP_MSG) + printf("[enable_ep] Note: No AV to bind (normal for some endpoint types)\n"); + } - /* TODO: Find out if unidirectional endpoint can be created */ - RDMA_EP_BIND(ep_ctx->ep, ep_ctx->cq_ctx.cq, FI_SEND | FI_RECV); + // CQ binding is required for all endpoint types + ret = fi_ep_bind(ep_ctx->ep, &ep_ctx->cq_ctx.cq->fid, FI_SEND | FI_RECV); + if (ret) { + RDMA_PRINTERR("fi_ep_bind (CQ)", ret); + return ret; + } + // Now enable the endpoint ret = fi_enable(ep_ctx->ep); if (ret) { RDMA_PRINTERR("fi_enable", ret); @@ -188,53 +207,54 @@ int ep_reg_mr(ep_ctx_t *ep_ctx, void *data_buf, size_t data_buf_size) return ret; } -int ep_send_buf(ep_ctx_t* ep_ctx, void* buf, size_t buf_size) { - if (!ep_ctx || !buf || buf_size == 0) { - ERROR("Invalid parameters provided to ep_send_buf: ep_ctx=%p, buf=%p, buf_size=%zu", - ep_ctx, buf, buf_size); +int ep_send_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size) +{ + if (!ep_ctx || !buf || buf_size == 0) + return -EINVAL; + + if (ep_ctx->dest_av_entry == FI_ADDR_UNSPEC) { + fprintf(stderr, "[ep_send_buf] ERROR: Invalid destination address\n"); return -EINVAL; } - int ret; - struct timespec start_time, current_time; - long elapsed_time_ms; + const int max_retries = 30; + const int backoff_us[5] = { 50, 100, 250, 500, 1000 }; /* capped */ - // Get the current time as the start time - clock_gettime(CLOCK_MONOTONIC, &start_time); + void *desc = ep_ctx->data_desc ? ep_ctx->data_desc : NULL; + int retry = 0; + int ret; - for (;;) { - // Check if the stop flag is set - if (ep_ctx->stop_flag) { - ERROR("RDMA stop flag is set. Aborting send."); + do { + if (ep_ctx->stop_flag) return -ECANCELED; - } - // Pass the buffer address as the context to fi_send - ret = fi_send(ep_ctx->ep, buf, buf_size, ep_ctx->data_desc, ep_ctx->dest_av_entry, buf); - if (ret == -EAGAIN) { - struct fi_cq_entry cq_entry; - (void)fi_cq_read(ep_ctx->cq_ctx.cq, &cq_entry, 1); // Drain CQ - } + ret = fi_send(ep_ctx->ep, buf, buf_size, desc, + ep_ctx->dest_av_entry, buf); + if (ret == -FI_EAGAIN || ret == -FI_ENODATA) { + + (void)fi_cq_read(ep_ctx->cq_ctx.cq, NULL, 0); - // Get the current time and calculate the elapsed time - clock_gettime(CLOCK_MONOTONIC, ¤t_time); - elapsed_time_ms = (current_time.tv_sec - start_time.tv_sec) * 1000 + - (current_time.tv_nsec - start_time.tv_nsec) / 1000000; + if (retry == 0 || retry % 5 == 0) + fprintf(stderr, + "[ep_send_buf] Connection not ready, retry %d/%d\n", + retry, max_retries); - if (ret == -EAGAIN) { - // Check if the elapsed time exceeds the timeout interval - if (elapsed_time_ms > 100) + if (++retry > max_retries) return -ETIMEDOUT; - else - continue; + + /* short exponential back-off without polling the CQ */ + int idx = retry < 5 ? retry : 4; + usleep(backoff_us[idx]); } + } while (ret == -FI_EAGAIN || ret == -FI_ENODATA); - break; - } + if (ret) + fprintf(stderr, "[ep_send_buf] fi_send failed: %s\n", fi_strerror(-ret)); - return ret; + return ret; /* 0 on success; <0 on error */ } + int ep_recv_buf(ep_ctx_t *ep_ctx, void *buf, size_t buf_size, void *buf_ctx) { int ret; @@ -269,122 +289,156 @@ int ep_cq_read(ep_ctx_t *ep_ctx, void **buf_ctx, int timeout) return 0; } +static void print_libfabric_error(const char *msg, int ret) +{ + fprintf(stderr, "%s: ret=%d (%s)\n", msg, ret, fi_strerror(-ret)); +} + int ep_init(ep_ctx_t **ep_ctx, ep_cfg_t *cfg) { int ret; - struct fi_info *fi; - struct fi_info *hints; + struct fi_info *old_info = NULL; + + if (!ep_ctx || !cfg) { + fprintf(stderr, "[ep_init] ERROR: ep_ctx/cfg is NULL\n"); + return -EINVAL; + } - if (!ep_ctx || !cfg) + if (!cfg->rdma_ctx) { + fprintf(stderr, "[ep_init] ERROR: cfg->rdma_ctx is NULL\n"); return -EINVAL; + } - if (cfg->rdma_ctx == NULL) + // Validate device context is properly initialized + if (!cfg->rdma_ctx->is_initialized || !cfg->rdma_ctx->fabric || !cfg->rdma_ctx->domain || + !cfg->rdma_ctx->info) { + fprintf(stderr, "[ep_init] ERROR: Device context not properly initialized\n"); return -EINVAL; + } + // Allocate endpoint context *ep_ctx = calloc(1, sizeof(ep_ctx_t)); if (!(*ep_ctx)) { - RDMA_PRINTERR("libfabric ep context malloc fail\n", -ENOMEM); + fprintf(stderr, "[ep_init] ERROR: Failed to allocate endpoint context\n"); return -ENOMEM; } + + // Store reference to device context (*ep_ctx)->rdma_ctx = cfg->rdma_ctx; (*ep_ctx)->stop_flag = false; - hints = fi_dupinfo((*ep_ctx)->rdma_ctx->info); - if (!hints) { - RDMA_PRINTERR("fi_dupinfo failed\n", -ENOMEM); - libfabric_ep_ops.ep_destroy(ep_ctx); - return -ENOMEM; + // Create the endpoint using the domain from the device context + ret = fi_endpoint(cfg->rdma_ctx->domain, cfg->rdma_ctx->info, &(*ep_ctx)->ep, NULL); + if (ret) { + fprintf(stderr, "[ep_init] ERROR: fi_endpoint returned %d\n", ret); + goto err; } - hints->src_addr = NULL; - hints->src_addrlen = 0; - hints->dest_addr = NULL; - hints->dest_addrlen = 0; - - char if_name[IF_NAMESIZE]; - - if (cfg->dir == RX) { - if (get_interface_name_by_ip(cfg->local_addr.ip, if_name, sizeof(if_name)) == 0) { - printf("Interface for IP %s is: %s\n", cfg->local_addr.ip, if_name); - } else { - printf("Interface for IP %s not found\n", cfg->local_addr.ip); - } - - hints->domain_attr->name = strdup(if_name); - ret = fi_getinfo(FI_VERSION(1, 21), cfg->local_addr.ip, cfg->local_addr.port, FI_SOURCE, - hints, &fi); + // Initialize completion queue but use caller-supplied CQ if present + if (cfg->shared_rx_cq) { + (*ep_ctx)->cq_ctx.cq = cfg->shared_rx_cq; + (*ep_ctx)->cq_ctx.external = true; // Indicate this CQ is owned by the caller + /* NOTE: do NOT close in ep_destroy – caller owns it */ } else { - if (get_interface_name_by_ip(cfg->local_addr.ip, if_name, sizeof(if_name)) == 0) { - printf("Interface for IP %s is: %s\n", cfg->local_addr.ip, if_name); - } else { - printf("Interface for IP %s not found\n", cfg->local_addr.ip); + ret = libfabric_cq_ops.rdma_cq_open(*ep_ctx, 0, RDMA_COMP_SREAD); + if (ret) { + fprintf(stderr, "[ep_init] ERROR: rdma_cq_open returned %d\n", ret); + goto err_ep; } + (*ep_ctx)->cq_ctx.external = false; + cfg->shared_rx_cq = (*ep_ctx)->cq_ctx.cq; // Store for caller + } - struct sockaddr_in local_sockaddr; - memset(&local_sockaddr, 0, sizeof(local_sockaddr)); - local_sockaddr.sin_family = AF_INET; - local_sockaddr.sin_port = htons(0); // Any available port - inet_pton(AF_INET, cfg->local_addr.ip, &local_sockaddr.sin_addr); - - hints->src_addr = (void *)&local_sockaddr; - hints->src_addrlen = sizeof(local_sockaddr); + // Create address vector if needed based on endpoint type + if (cfg->rdma_ctx->ep_attr_type == FI_EP_RDM || + cfg->rdma_ctx->ep_attr_type == FI_EP_DGRAM) { + + struct fi_av_attr av_attr = { + .type = FI_AV_MAP, + .count = 1 + }; - hints->domain_attr->name = strdup(if_name); - ret = fi_getinfo(FI_VERSION(1, 21), cfg->remote_addr.ip, cfg->remote_addr.port, 0, hints, - &fi); + ret = fi_av_open(cfg->rdma_ctx->domain, &av_attr, &(*ep_ctx)->av, NULL); + if (ret) { + fprintf(stderr, "[ep_init] ERROR: fi_av_open returned %d\n", ret); + goto err_cq; + } } + + // Enable the endpoint + ret = enable_ep(*ep_ctx); if (ret) { - RDMA_PRINTERR("fi_getinfo", ret); - libfabric_ep_ops.ep_destroy(ep_ctx); - return ret; + fprintf(stderr, "[ep_init] ERROR: enable_ep returned %d\n", ret); + goto err_av; } - ret = ep_alloc_res(*ep_ctx, (*ep_ctx)->rdma_ctx, fi, 1); - if (ret) { - RDMA_PRINTERR("ep_alloc_res fail\n", ret); - libfabric_ep_ops.ep_destroy(ep_ctx); - return ret; + // Insert destination address if available (TX mode) + if (cfg->dir == TX && cfg->rdma_ctx->info->dest_addr) { + ret = ep_av_insert((*ep_ctx)->rdma_ctx, (*ep_ctx)->av, + cfg->rdma_ctx->info->dest_addr, 1, + &(*ep_ctx)->dest_av_entry, 0, NULL); + if (ret) { + fprintf(stderr, "[ep_init] ERROR: ep_av_insert returned %d\n", ret); + goto err_av; + } } - ret = enable_ep((*ep_ctx)); - if (ret) { - RDMA_PRINTERR("ep_enable fail\n", ret); - libfabric_ep_ops.ep_destroy(ep_ctx); - return ret; + fprintf(stderr, "[ep_init] Endpoint successfully initialized\n"); + return 0; + +err_av: + if ((*ep_ctx)->av) { + fi_close(&(*ep_ctx)->av->fid); } - if (fi->dest_addr) { - ret = ep_av_insert((*ep_ctx)->rdma_ctx, (*ep_ctx)->av, fi->dest_addr, 1, - &(*ep_ctx)->dest_av_entry, 0, NULL); - if (ret) { - RDMA_PRINTERR("ep_av_insert fail\n", ret); - libfabric_ep_ops.ep_destroy(ep_ctx); - return ret; - } +err_cq: + if ((*ep_ctx)->cq_ctx.cq) { + fi_close(&(*ep_ctx)->cq_ctx.cq->fid); + } + if ((*ep_ctx)->cq_ctx.waitset) { + fi_close(&(*ep_ctx)->cq_ctx.waitset->fid); } - fi_freeinfo(fi); +err_ep: + if ((*ep_ctx)->ep) { + fi_close(&(*ep_ctx)->ep->fid); + } - return 0; +err: + free(*ep_ctx); + *ep_ctx = NULL; + return ret; } int ep_destroy(ep_ctx_t **ep_ctx) { - if (!ep_ctx || !(*ep_ctx)) return -EINVAL; - libfabric_mr_ops.rdma_unreg_mr((*ep_ctx)->data_mr); - RDMA_CLOSE_FID((*ep_ctx)->ep); + ep_ctx_t *ctx = *ep_ctx; - RDMA_CLOSE_FID((*ep_ctx)->cq_ctx.cq); - RDMA_CLOSE_FID((*ep_ctx)->av); - RDMA_CLOSE_FID((*ep_ctx)->cq_ctx.waitset); + /* 1. unregister MR (ignore ENOENT-like errors) */ + if (ctx->data_mr) + libfabric_mr_ops.rdma_unreg_mr(ctx->data_mr); - if (ep_ctx && *ep_ctx) { - free(*ep_ctx); - *ep_ctx = NULL; - } + /* 2. Close endpoint object itself */ + RDMA_CLOSE_FID(ctx->ep); + + /* 3. Close CQ only if this EP owns it + * (ctx->cq_ctx.external == true ⇒ shared CQ owned by caller) */ + if (ctx->cq_ctx.cq && !ctx->cq_ctx.external) + RDMA_CLOSE_FID(ctx->cq_ctx.cq); + + /* 3a. The waitset belongs to the CQ; close only if we closed the CQ */ + if (ctx->cq_ctx.waitset && !ctx->cq_ctx.external) + RDMA_CLOSE_FID(ctx->cq_ctx.waitset); + + /* 4. Address vector always owned by the EP */ + RDMA_CLOSE_FID(ctx->av); + + /* 5. finally free the context struct */ + free(ctx); + *ep_ctx = NULL; return 0; } diff --git a/media-proxy/src/mesh/conn_rdma.cc b/media-proxy/src/mesh/conn_rdma.cc index 0da76b910..b82d0f086 100644 --- a/media-proxy/src/mesh/conn_rdma.cc +++ b/media-proxy/src/mesh/conn_rdma.cc @@ -1,12 +1,20 @@ #include #include "conn_rdma.h" +#include // for sockaddr_in +#include // for ntohs/htons namespace mesh::connection { std::atomic Rdma::active_connections(0); -Rdma::Rdma() : ep_ctx(nullptr), init(false), trx_sz(0), m_dev_handle(nullptr), - buffer_block(nullptr), queue_size(0) { +Rdma::Rdma() + : ep_ctxs{} + , init(false) + , trx_sz(0) + , m_dev_handle(nullptr) + , buffer_block(nullptr) + , queue_size(0) +{ std::memset(&ep_cfg, 0, sizeof(ep_cfg)); ++active_connections; } @@ -176,6 +184,27 @@ Result Rdma::configure(context::Context& ctx, const mcm_conn_param& request, { trx_sz = request.payload_args.rdma_args.transfer_size; + // --- validate & assign RDMA provider --- + if (request.payload_args.rdma_args.provider + && *request.payload_args.rdma_args.provider) { + rdma_provider = request.payload_args.rdma_args.provider; + } else { + log::warn("RDMA provider not specified, defaulting to 'verbs'"); + rdma_provider = "verbs"; + } + + // --- validate & assign number of endpoints --- + uint32_t neps = request.payload_args.rdma_args.num_endpoints; + if (neps >= 1 && neps <= 8) { + rdma_num_eps = neps; + } else { + log::warn( + "RDMA num_endpoints {} out of valid range [1..8], defaulting to 1", + neps + ); + rdma_num_eps = 1; + } + _kind = kind(); std::memset(&ep_cfg, 0, sizeof(ep_cfg)); @@ -205,76 +234,216 @@ Result Rdma::configure(context::Context& ctx, const mcm_conn_param& request, */ Result Rdma::on_establish(context::Context& ctx) { + // 1) Already initialized? if (init) { log::error("RDMA device is already initialized")("state", "initialized"); set_state(ctx, State::active); return Result::error_already_initialized; } + // 2) Unblock buffer threads init_buf_available(); - int ret; + int ret; Result res; + // 3) Initialize the RDMA device if needed if (!m_dev_handle) { + m_dev_handle = (libfabric_ctx*)calloc(1, sizeof(libfabric_ctx)); + if (!m_dev_handle) { + log::error("Failed to allocate RDMA context")("error", strerror(errno)); + return Result::error_out_of_memory; + } + _kind = kind(); + m_dev_handle->kind = (_kind == Kind::receiver + ? FI_KIND_RECEIVER + : FI_KIND_TRANSMITTER); + m_dev_handle->local_ip = ep_cfg.local_addr.ip; + m_dev_handle->local_port = ep_cfg.local_addr.port; + m_dev_handle->remote_ip = ep_cfg.remote_addr.ip; + m_dev_handle->remote_port = ep_cfg.remote_addr.port; + m_dev_handle->provider_name = strdup(rdma_provider.c_str()); ret = libfabric_dev_ops.rdma_init(&m_dev_handle); + if (ret) { - log::error("Failed to initialize RDMA device")("ret", ret); + log::error("Failed to initialize RDMA device")("ret", ret) + ("error", fi_strerror(-ret)); + free(m_dev_handle); + m_dev_handle = nullptr; set_state(ctx, State::closed); return Result::error_initialization_failed; } + log::info("RDMA device successfully initialized"); } - ep_cfg.rdma_ctx = m_dev_handle; + auto bump_sock = [](sockaddr_in* sa, uint32_t delta) + { + unsigned short p = ntohs(sa->sin_port); + sa->sin_port = htons(static_cast(p + delta)); + }; - ret = libfabric_ep_ops.ep_init(&ep_ctx, &ep_cfg); - if (ret) { - log::error("Failed to initialize RDMA endpoint context")("error", fi_strerror(-ret)); - set_state(ctx, State::closed); - return Result::error_initialization_failed; + auto bump_port_str = [](char dst[6], const char src[6], uint32_t delta) + { + unsigned short p = static_cast(std::atoi(src)); + std::snprintf(dst, 6, "%u", p + delta); + }; + + /* -------------------------------------------------------------------- + * 4) Allocate per‐EP arrays by rdma_num_eps + * ------------------------------------------------------------------*/ + std::vector devs(rdma_num_eps, nullptr); + std::vector info_dups(rdma_num_eps, nullptr); + + // cleanup helper: free all clones [1..n) + auto cleanup_clones = [&](std::size_t upto) { + for (std::size_t j = 1; j < upto; ++j) { + if (info_dups[j]) { + fi_freeinfo(info_dups[j]); + info_dups[j] = nullptr; + } + if (devs[j]) { + free(devs[j]); + devs[j] = nullptr; + } + } + }; + + // resize our member‐vector of EP pointers + ep_ctxs.resize(rdma_num_eps); + + /* EP-0 re-uses the context already created in rdma_init() */ + devs[0] = m_dev_handle; // never null + info_dups[0] = m_dev_handle->info; // keep for uniform cleanup + + /* create clones for EP-1 … EP-N */ + for (uint32_t i = 1; i < rdma_num_eps; ++i) { + fi_info* dup = fi_dupinfo(m_dev_handle->info); + if (!dup) { + log::error("fi_dupinfo failed for EP %u", i)("kind", kind2str(_kind)); + // free any earlier clones + cleanup_clones(i); + set_state(ctx, State::closed); + return Result::error_initialization_failed; + } + info_dups[i] = dup; + + if (dup->src_addr && dup->src_addrlen == sizeof(sockaddr_in)) + bump_sock(reinterpret_cast(dup->src_addr), i); + if (dup->dest_addr && dup->dest_addrlen == sizeof(sockaddr_in)) + bump_sock(reinterpret_cast(dup->dest_addr), i); + + devs[i] = static_cast(calloc(1, sizeof(libfabric_ctx))); + if (!devs[i]) { + log::error("Failed to allocate RDMA context clone for EP %u", i)("error", + strerror(errno)); + fi_freeinfo(dup); + // free any earlier clones + cleanup_clones(i); + set_state(ctx, State::closed); + return Result::error_out_of_memory; + } + *devs[i] = *m_dev_handle; // shallow copy + devs[i]->info = dup; + devs[i]->is_initialized = true; } - if(queue_size == 0) { - log::error("RDMA queue size is not set")("queue_size", queue_size); - set_state(ctx, State::closed); - return Result::error_bad_argument; + + /* ---------- cfgs ---------------------------------------------------- */ + std::vector cfgs(rdma_num_eps, ep_cfg); + for (uint32_t i = 1; i < rdma_num_eps; ++i) { + bump_port_str(cfgs[i].local_addr.port, cfgs[0].local_addr.port, i); + bump_port_str(cfgs[i].remote_addr.port, cfgs[0].remote_addr.port, i); } - res = init_queue_with_elements(queue_size, trx_sz); - if (res != Result::success) { - log::error("Failed to initialize RDMA buffer queue")("trx_sz", trx_sz); - if (ep_ctx) { - libfabric_ep_ops.ep_destroy(&ep_ctx); + /* ---------- bring up endpoints ------------------------------------- */ + for (uint32_t i = 0; i < rdma_num_eps; ++i) { + cfgs[i].rdma_ctx = devs[i]; + if (i > 0) { + cfgs[i].shared_rx_cq = ep_ctxs[0]->cq_ctx.cq; // use EP-0 CQ for RX + } else { + cfgs[i].shared_rx_cq = nullptr; // no shared CQ for EP-0 + } + int ret = libfabric_ep_ops.ep_init(&ep_ctxs[i], &cfgs[i]); + if (ret) { + log::error("Failed to initialize RDMA endpoint #%u", i)("ret", ret)("error", + fi_strerror(-ret)); + // destroy all created endpoints + for (auto& e : ep_ctxs) { + if (e) { + libfabric_ep_ops.ep_destroy(&e); + } + } + cleanup_clones(i + 1); + set_state(ctx, State::closed); + return Result::error_initialization_failed; } - set_state(ctx, State::closed); - return res; } - // Configure RDMA endpoint - res = configure_endpoint(ctx); + /* ---------- queue & MR section (no duplicate ‘res’) ----------------- */ + res = init_queue_with_elements(queue_size, trx_sz + TRAILER); if (res != Result::success) { - log::error("RDMA configuring failed")("state", "closed"); - if (ep_ctx) { - libfabric_ep_ops.ep_destroy(&ep_ctx); + log::error("Failed to initialise RDMA buffer queue")("trx_sz", trx_sz); + for (auto &e : ep_ctxs) { + if (e) { + libfabric_ep_ops.ep_destroy(&e); + } } + cleanup_clones(rdma_num_eps); set_state(ctx, State::closed); return res; } + /* ------------------------------------------------------------------ + * 8) Register the **same** memory block on every endpoint + * -----------------------------------------------------------------*/ + { + std::lock_guard lock(queue_mutex); + + std::size_t aligned_sz = /* one slot */ + (((trx_sz + TRAILER) + PAGE_SIZE - 1) / PAGE_SIZE) * PAGE_SIZE; + std::size_t total_size = queue_size * aligned_sz; + + for (auto* ep : ep_ctxs) { + int rc = libfabric_ep_ops.ep_reg_mr(ep, buffer_block, total_size); + if (rc) { + log::error("Memory registration failed")("error", fi_strerror(-rc)); + for (auto &ee : ep_ctxs) { + if (ee) { + libfabric_ep_ops.ep_destroy(&ee); + } + } + cleanup_clones(rdma_num_eps); + set_state(ctx, State::closed); + return Result::error_memory_registration_failed; + } + } + } + + /* ------------------------------------------------------------------ + * 9) Start TX / RX / CQ threads + * -----------------------------------------------------------------*/ init = true; - res = start_threads(ctx); + res = start_threads(ctx); if (res != Result::success) { log::error("Failed to start RDMA threads")("state", "closed"); - if (ep_ctx) { - libfabric_ep_ops.ep_destroy(&ep_ctx); + for (auto &e : ep_ctxs) if (e) libfabric_ep_ops.ep_destroy(&e); + for (std::size_t i = 1; i < rdma_num_eps; ++i) { + if (info_dups[i]) fi_freeinfo(info_dups[i]); + if (devs[i]) free(devs[i]); } set_state(ctx, State::closed); return res; } + /* ------------------------------------------------------------------ + * 10) Ready for traffic + * -----------------------------------------------------------------*/ set_state(ctx, State::active); return Result::success; + } + + /** * @brief Cleans up RDMA resources and resets the state. * @@ -352,27 +521,38 @@ void Rdma::on_delete(context::Context& ctx) */ Result Rdma::configure_endpoint(context::Context& ctx) { - if (!ep_ctx) { - log::error("RDMA endpoint context is not initialized"); - return Result::error_wrong_state; + // Ensure we have initialized endpoints + for (size_t i = 0; i < ep_ctxs.size(); ++i) { + if (!ep_ctxs[i]) { + log::error("RDMA endpoint context #%zu is not initialized", i) + ("kind", kind2str(_kind)); + return Result::error_wrong_state; + } } std::lock_guard lock(queue_mutex); if (!buffer_block) { - log::error("Memory block for RDMA buffer queue is not allocated"); + log::error("Memory block for RDMA buffer queue is not allocated") + ("kind", kind2str(_kind)); return Result::error_out_of_memory; } - // Calculate the total size of the memory block - size_t total_size = buffer_queue.size() * ((trx_sz + PAGE_SIZE - 1) / PAGE_SIZE * PAGE_SIZE); + // Total size in bytes of the contiguous buffer pool + size_t buf_count = buffer_queue.size(); + size_t aligned_sz = (((trx_sz + TRAILER) + PAGE_SIZE - 1) / PAGE_SIZE) * PAGE_SIZE; + size_t total_size = buf_count * aligned_sz; - // Register the entire memory block with the RDMA endpoint - int ret = libfabric_ep_ops.ep_reg_mr(ep_ctx, buffer_block, total_size); - if (ret) { - log::error("Memory registration failed for the RDMA buffer block") - ("error", fi_strerror(-ret)); - return Result::error_memory_registration_failed; + // Register the entire buffer block with each endpoint (QP) + for (size_t i = 0; i < ep_ctxs.size(); ++i) { + ep_ctx_t* e = ep_ctxs[i]; + int ret = libfabric_ep_ops.ep_reg_mr(e, buffer_block, total_size); + if (ret) { + log::error("Memory registration failed on endpoint #%zu", i) + ("error", fi_strerror(-ret)) + ("kind", kind2str(_kind)); + return Result::error_memory_registration_failed; + } } return Result::success; @@ -381,21 +561,28 @@ Result Rdma::configure_endpoint(context::Context& ctx) Result Rdma::cleanup_resources(context::Context& ctx) { - if (ep_ctx) { - int err = libfabric_ep_ops.ep_destroy(&ep_ctx); - if (err) { - log::error("Failed to destroy RDMA endpoint")("error", fi_strerror(-err)); - return Result::error_general_failure; + // Destroy each endpoint (QP) + for (size_t i = ep_ctxs.size(); i >= 1; --i) { + ep_ctx_t*& e = ep_ctxs[i-1]; + if (e) { + int err = libfabric_ep_ops.ep_destroy(&e); + if (err) { + log::error("Failed to destroy RDMA endpoint #%zu", i) + ("error", fi_strerror(-err)) + ("kind", kind2str(_kind)); + return Result::error_general_failure; + } + e = nullptr; } } - // Clean up the buffer queue + // Free and clear our buffer queue cleanup_queue(); - // Mark RDMA as uninitialized + // Mark as uninitialized so we can re-establish if needed init = false; - return Result::success; } + } // namespace mesh::connection diff --git a/media-proxy/src/mesh/conn_rdma_rx.cc b/media-proxy/src/mesh/conn_rdma_rx.cc index 02aa452f8..9035a9e01 100644 --- a/media-proxy/src/mesh/conn_rdma_rx.cc +++ b/media-proxy/src/mesh/conn_rdma_rx.cc @@ -1,11 +1,13 @@ #include "conn_rdma_rx.h" #include #include +#include namespace mesh::connection { RdmaRx::RdmaRx() : Rdma() { _kind = Kind::receiver; // Set the Kind in the constructor + next_rx_idx = 0; // Initialize the next receive index } RdmaRx::~RdmaRx() @@ -61,35 +63,52 @@ Result RdmaRx::start_threads(context::Context& ctx) { */ void RdmaRx::process_buffers_thread(context::Context& ctx) { - while (!ctx.cancelled()) { void *buf = nullptr; - // Process all available buffers in the queue + // Drain all currently queued buffers while (!ctx.cancelled()) { Result res = consume_from_queue(ctx, &buf); - if (res == Result::success && buf != nullptr) { - int err = libfabric_ep_ops.ep_recv_buf(ep_ctx, buf, trx_sz, buf); + if (res == Result::success && buf) { + // Round-robin receive postings across the two QPs + uint32_t idx = next_rx_idx.fetch_add(1, std::memory_order_relaxed) + % ep_ctxs.size(); + ep_ctx_t* chosen = ep_ctxs[idx]; + if (!chosen) { + log::error("RDMA rx endpoint #%u is null, skipping buffer") + ("idx", idx)("kind", kind2str(_kind)); + // Return buffer so it isn't lost + add_to_queue(buf); + break; + } + + int err = libfabric_ep_ops.ep_recv_buf(chosen, buf, trx_sz + TRAILER, buf); if (err) { - log::error("Failed to pass empty buffer to RDMA rx to receive into") - ("buffer_address", buf)("error", fi_strerror(-err)) - ("kind", kind2str(_kind)); + log::error("Failed to post recv buffer to RDMA rx") + ("buffer_address", buf) + ("error", fi_strerror(-err)) + ("kind", kind2str(_kind)); + // On error, put the buffer back on the queue res = add_to_queue(buf); if (res != Result::success) { - log::error("Failed to add buffer to RDMA rx queue")("error", result2str(res)) - ("kind", kind2str(_kind)); + log::error("Failed to re-queue buffer after recv error") + ("error", result2str(res)) + ("kind", kind2str(_kind)); break; } } } else { - break; // Exit the loop to avoid spinning on errors + // No more buffers available right now + break; } } - // Wait for the buffer to become available + + // Wait until new buffers are added wait_buf_available(); } } + /** * @brief Handles the RDMA completion queue (CQ) events in a dedicated thread. * @@ -110,73 +129,181 @@ void RdmaRx::process_buffers_thread(context::Context& ctx) * @param ctx The context for managing thread cancellation and operations. */ void RdmaRx::rdma_cq_thread(context::Context& ctx) { - constexpr int CQ_RETRY_DELAY_US = 100; // Retry delay for EAGAIN - struct fi_cq_entry cq_entries[CQ_BATCH_SIZE]; // Array to hold batch of CQ entries + constexpr int CQ_RETRY_DELAY_US = 100; + struct fi_cq_entry cq_entries[CQ_BATCH_SIZE]; while (!ctx.cancelled()) { - // Read a batch of completion events - int ret = fi_cq_read(ep_ctx->cq_ctx.cq, cq_entries, CQ_BATCH_SIZE); - if (ret > 0) { - for (int i = 0; i < ret; ++i) { - void *buf = cq_entries[i].op_context; - if (buf == nullptr) { - log::error("RDMA rx null buffer context, skipping...") - ("batch_index",i)("kind", kind2str(_kind)); - continue; - } + bool did_work = false; - // Process the buffer (e.g., transmit or handle) - Result res = transmit(ctx, buf, trx_sz); - if (res != Result::success) { - log::error("RDMA rx failed to transmit buffer")("buffer_address", buf) - ("size", trx_sz)("kind", kind2str(_kind)); - continue; - } + // Poll each *unique* CQ only once + struct fid_cq *last_cq = nullptr; + for (auto* ep : ep_ctxs) { + if (!ep) continue; - // Add the buffer back to the queue - res = add_to_queue(buf); - if (res == Result::success) { - // Notify that a buffer is available - notify_buf_available(); - } else { - log::error("Failed to add buffer back to the RDMA rx queue") - ("buffer_address", buf)("kind", kind2str(_kind)); + struct fid_cq *cq = ep->cq_ctx.cq; + if (cq == last_cq) // duplicate of the one we just handled + continue; + last_cq = cq; + + int ret = fi_cq_read(cq, cq_entries, CQ_BATCH_SIZE); + if (ret > 0) { + did_work = true; + + for (int i = 0; i < ret; ++i) { + void* buf = cq_entries[i].op_context; + if (!buf) { + log::error("RDMA rx null buffer context, skipping...") + ("batch_index", i) + ("kind", kind2str(_kind)); + continue; + } + + // Read 64-bit trailer after payload + auto* trailer_ptr = reinterpret_cast( + reinterpret_cast(buf) + trx_sz); + uint64_t seq = *trailer_ptr; + + if (reorder_head == UINT64_MAX) { + reorder_head = seq; + } + // Slot into ring buffer + size_t idx = seq & (REORDER_WINDOW - 1); + reorder_ring[idx] = buf; + + // Flush any in-order entries + while (true) { + size_t head_idx = reorder_head & (REORDER_WINDOW - 1); + void* ready = reorder_ring[head_idx]; + if (!ready) break; + reorder_ring[head_idx] = nullptr; + + // Deliver payload (exclude trailer) + Result r = transmit(ctx, ready, trx_sz); + if (r != Result::success) { + log::error("RDMA rx failed to transmit buffer") + ("buffer_address", ready) + ("size", trx_sz) + ("kind", kind2str(_kind)); + } + + // Recycle buffer + r = add_to_queue(ready); + if (r == Result::success) { + notify_buf_available(); + } else { + log::error("Failed to recycle buffer to queue") + ("buffer_address", ready) + ("kind", kind2str(_kind)); + } + ++reorder_head; + } } } - } else if (ret == -EAGAIN) { - // No events to process, yield CPU briefly to avoid busy looping - std::this_thread::sleep_for(std::chrono::microseconds(CQ_RETRY_DELAY_US)); - } else if (ret == -FI_EAVAIL) { - // Read the error details - struct fi_cq_err_entry err_entry; - int err_ret = fi_cq_readerr(ep_ctx->cq_ctx.cq, &err_entry, 0); - if (err_ret >= 0) { - if (err_entry.err == -FI_ECONNRESET || err_entry.err == -FI_ENOTCONN) { - log::warn("RDMA connection reset or endpoint not connected. Waiting for new connection.") - ("error", fi_strerror(err_entry.err))("kind",kind2str(_kind)); - thread::Sleep(ctx, std::chrono::milliseconds(1000)); // Pause before retrying + else if (ret == -FI_EAVAIL) { + fi_cq_err_entry err_entry {}; + int err_ret = fi_cq_readerr(ep->cq_ctx.cq, &err_entry, 0); + if (err_ret >= 0) { + int err = err_entry.err; + + /* human-friendly diagnostics */ + if (err == -FI_ECANCELED) { + log::warn("RDMA rx operation canceled") + ("error", fi_strerror(err))("kind", kind2str(_kind)); + } else if (err == -FI_ECONNRESET || err == -FI_ENOTCONN) { + log::warn("RDMA connection reset/not connected; retrying") + ("error", fi_strerror(err))("kind", kind2str(_kind)); + thread::Sleep(ctx, std::chrono::milliseconds(1000)); + } else if (err == -FI_ECONNABORTED) { + log::warn("RDMA rx connection aborted") + ("error", fi_strerror(err))("kind", kind2str(_kind)); + } else { + log::error("RDMA rx encountered CQ error") + ("error", fi_strerror(err))("kind", kind2str(_kind)); + } + + /* recycle the buffer that was canceled / errored */ + if (err_entry.op_context) { + if (add_to_queue(err_entry.op_context) == Result::success) + notify_buf_available(); + else + log::error("Failed to recycle buffer after CQ error") + ("buffer_address", err_entry.op_context) + ("kind", kind2str(_kind)); + } + + /* ---- if it was ECANCELED, repost done: keep reorder_head, + just try to flush any packet that became in-order now ---- */ + if (err == -FI_ECANCELED) { + std::size_t flushed = 0; + while (true) { + size_t head_idx = reorder_head & (REORDER_WINDOW - 1); + void* ready = reorder_ring[head_idx]; + if (!ready) break; // next frame not here yet + reorder_ring[head_idx] = nullptr; + + Result r = transmit(ctx, ready, trx_sz); + if (r != Result::success) + log::error("RDMA rx failed to transmit buffer") + ("buffer_address", ready)("size", trx_sz) + ("kind", kind2str(_kind)); + + if (add_to_queue(ready) == Result::success) + notify_buf_available(); + else + log::error("Failed to recycle buffer to queue") + ("buffer_address", ready)("kind", kind2str(_kind)); + ++reorder_head; + ++flushed; + } + log::debug("RX ECANCELED: flushed %zu frame%s waiting in ring", + flushed, flushed == 1 ? "" : "s") + ("kind", kind2str(_kind)); + } + + did_work = true; // we handled something – no sleep } else { - log::error("RDMA rx encountered an error in CQ")( - "error", fi_strerror(err_entry.err))("kind", kind2str(_kind)); + log::error("RDMA rx failed to read CQ error entry") + ("error", fi_strerror(-err_ret))("kind", kind2str(_kind)); } + } + else if (ret != -EAGAIN && ret != -FI_ENOTCONN) { + // Fatal CQ read error + log::error("RDMA rx cq read failed") + ("error", fi_strerror(-ret)) + ("kind", kind2str(_kind)); + goto shutdown; + } + // else: -EAGAIN or -FI_ENOTCONN → retry + } + + /* ---------- hybrid back-off when no CQ work was done ---------- */ + static constexpr int SPIN_LIMIT = 50; // ≈ 1–2 µs of busy-wait + static constexpr int YIELD_LIMIT = 200; // then ~200 sched_yield() calls + static thread_local int idle_cycles = 0; // per-thread counter + + if (!did_work) { + if (idle_cycles < SPIN_LIMIT) { + /* short spin: cheapest path at high packet rate */ + _mm_pause(); // pause instruction = 40–100 ns + } else if (idle_cycles < SPIN_LIMIT + YIELD_LIMIT) { + /* medium wait: let other threads run */ + std::this_thread::yield(); } else { - log::error("RDMA rx failed to read CQ error entry") - ("error", fi_strerror(-err_ret))("kind", kind2str(_kind)); + /* long wait: nothing for a while – real sleep */ + std::this_thread::sleep_for( + std::chrono::microseconds(CQ_RETRY_DELAY_US)); } - } else if (ret == -FI_ENOTCONN) { - // Handle disconnection (Transport endpoint is not connected) - log::warn("Transport endpoint is not connected. Waiting for new connection.")( - "error", fi_strerror(ret))("kind", kind2str(_kind)); - thread::Sleep(ctx, std::chrono::milliseconds(1000)); // Pause before retrying + ++idle_cycles; // back-off gets longer } else { - // Handle CQ read error - log::error("RDMA rx cq read failed")("error", fi_strerror(-ret)) - ("kind", kind2str(_kind)); - break; + idle_cycles = 0; // reset after we did useful work } } - ep_ctx->stop_flag = true; // Set the stop flag +shutdown: + // Signal all endpoints to stop + for (auto* ep : ep_ctxs) { + if (ep) ep->stop_flag = true; + } log::info("RDMA RX CQ thread stopped.")("kind", kind2str(_kind)); } diff --git a/media-proxy/src/mesh/conn_rdma_tx.cc b/media-proxy/src/mesh/conn_rdma_tx.cc index 5010fde6f..f1afb33c8 100644 --- a/media-proxy/src/mesh/conn_rdma_tx.cc +++ b/media-proxy/src/mesh/conn_rdma_tx.cc @@ -6,6 +6,7 @@ namespace mesh::connection { RdmaTx::RdmaTx() : Rdma() { _kind = Kind::transmitter; // Set the Kind in the constructor + next_tx_idx = 0; // Initialize the next transmit index } RdmaTx::~RdmaTx() @@ -51,61 +52,95 @@ Result RdmaTx::start_threads(context::Context& ctx) void RdmaTx::rdma_cq_thread(context::Context& ctx) { - constexpr uint32_t RETRY_INTERVAL_US = 100; // Retry interval of 100 us - constexpr uint32_t TIMEOUT_US = 1000000; // Total timeout of 1s + constexpr uint32_t RETRY_INTERVAL_US = 100; // 100 µs back-off + constexpr uint32_t TIMEOUT_US = 1'000'000; // 1 s max spin + + // Buffer for batched completions + struct fi_cq_entry cq_entries[CQ_BATCH_SIZE]; while (!ctx.cancelled()) { - // Wait for the buffer to become available after successful send + // Wait until at least one send has occurred wait_buf_available(); - struct fi_cq_entry cq_entries[CQ_BATCH_SIZE]; - - uint32_t elapsed_time = 0; - while (elapsed_time < TIMEOUT_US) { - int ret = fi_cq_read(ep_ctx->cq_ctx.cq, cq_entries, CQ_BATCH_SIZE); - - if (ret > 0) { - for (int i = 0; i < ret; i++) { - void *buf = cq_entries[i].op_context; - if (buf == nullptr) { - log::error("RDMA tx null buffer context, skipping...") - ("kind", kind2str(_kind)); - continue; + uint32_t elapsed = 0; + bool done = false; + + while (!ctx.cancelled() && elapsed < TIMEOUT_US) { + /* We use a single CQ for all QPs – skip duplicates */ + struct fid_cq *last_cq = nullptr; + for (auto *ep : ep_ctxs) { + if (!ep) continue; + + struct fid_cq *cq = ep->cq_ctx.cq; + if (cq == last_cq) // already handled in this iteration + continue; + last_cq = cq; + + int ret = fi_cq_read(cq, cq_entries, CQ_BATCH_SIZE); + if (ret > 0) { + // We got completions on this QP + for (int i = 0; i < ret; ++i) { + void *buf = cq_entries[i].op_context; + if (!buf) { + log::error("RDMA tx null buffer context, skipping...") + ("kind", kind2str(_kind)); + continue; + } + if (add_to_queue(buf) != Result::success) { + log::error("RDMA tx failed to add buffer back to queue") + ("buffer_address", buf) + ("kind", kind2str(_kind)); + } } - - // Replenish buffer - Result res = add_to_queue(buf); - if (res != Result::success) { - log::error("RDMA tx failed to add buffer back to queue") - ("buffer_address", buf)("kind", kind2str(_kind)); + done = true; + } + else if (ret == -FI_EAVAIL) { + // asynchronous error completions ― recycle buffers too + fi_cq_err_entry err {}; + if (fi_cq_readerr(ep->cq_ctx.cq, &err, 0) >= 0) { + log::error("RDMA tx CQ error")("error", fi_strerror(err.err)) + ("kind", kind2str(_kind)); + if (err.op_context) { + add_to_queue(err.op_context); // reclaim the failed buffer + } + } else { + log::error("RDMA tx failed to read CQ error entry") + ("kind", kind2str(_kind)); } + done = true; + } + else if (ret != -EAGAIN) { + // fatal CQ read error + log::error("RDMA tx cq read failed") + ("error", fi_strerror(-ret)) + ("kind", kind2str(_kind)); + done = true; } - break; // Exit retry loop as CQ events were successfully processed - } else if (ret == -EAGAIN) { - // No events, introduce short wait to avoid busy looping - std::this_thread::sleep_for(std::chrono::microseconds(RETRY_INTERVAL_US)); - elapsed_time += RETRY_INTERVAL_US; - } else { - // Handle errors - log::error("RDMA tx cq read failed") - ("error", fi_strerror(-ret))("kind", kind2str(_kind)); - break; // Exit retry loop on error - } - if (ctx.cancelled()) { - break; + if (done) break; // processed something on this iteration } + + if (done) break; // go back to outer loop + + // No events yet on either CQ, back off a bit + std::this_thread::sleep_for(std::chrono::microseconds(RETRY_INTERVAL_US)); + elapsed += RETRY_INTERVAL_US; } - // Log if timeout occurred without receiving any events after buffer should be already - // available - if (elapsed_time >= TIMEOUT_US) { - log::debug("RDMA tx cq read timed out after retries")("kind", kind2str(_kind)); + if (elapsed >= TIMEOUT_US) { + log::debug("RDMA tx cq read timed out after retries") + ("kind", kind2str(_kind)); } } - ep_ctx->stop_flag = true; // Set the stop flag + + // Signal shutdown on all endpoints + for (auto *ep : ep_ctxs) { + if (ep) ep->stop_flag = true; + } + log::info("RDMA TX CQ thread stopped.")("kind", kind2str(_kind)); } + /** * @brief Handles sending data through RDMA by consuming a buffer, copying data, and transmitting it. * @@ -122,64 +157,73 @@ void RdmaTx::rdma_cq_thread(context::Context& ctx) Result RdmaTx::on_receive(context::Context& ctx, void *ptr, uint32_t sz, uint32_t& sent) { void *reg_buf = nullptr; - constexpr uint32_t TIMEOUT_US = 500000; // 0.5-second timeout - constexpr uint32_t RETRY_INTERVAL_US = 100; // Retry interval of 100 us - uint32_t elapsed_time = 0; - - // Attempt to consume a buffer from the queue with a timeout - while (elapsed_time < TIMEOUT_US && !ctx.cancelled()) { - Result res = consume_from_queue(ctx, ®_buf); - if (res == Result::success) { - if (reg_buf != nullptr) { - break; // Successfully got a buffer - } else { - log::debug("RDMA tx buffer is null, retrying...")("kind", kind2str(_kind)); - } - } else if (res != Result::error_no_buffer) { - // Log non-retryable errors and exit - log::error("RDMA tx failed to consume buffer from queue")("result", static_cast(res)) - ("kind", kind2str(_kind)); + constexpr uint32_t TIMEOUT_US = 1000000; // 1-second timeout + constexpr uint32_t RETRY_INTERVAL_US = 100; // 100 µs + uint32_t elapsed = 0; + + // 1) Acquire a buffer from our pool, with timeout + while (elapsed < TIMEOUT_US && !ctx.cancelled()) { + Result r = consume_from_queue(ctx, ®_buf); + if (r == Result::success) { + if (reg_buf) break; + log::debug("RDMA tx buffer is null, retrying...")("kind", kind2str(_kind)); + } + else if (r != Result::error_no_buffer) { + log::error("RDMA tx failed to consume buffer from queue") + ("result", static_cast(r))("kind", kind2str(_kind)); sent = 0; - return res; + return r; } - - // Wait before retrying std::this_thread::sleep_for(std::chrono::microseconds(RETRY_INTERVAL_US)); - elapsed_time += RETRY_INTERVAL_US; + elapsed += RETRY_INTERVAL_US; } - // Check if we failed to get a buffer within the timeout - if (reg_buf == nullptr) { - log::error("RDMA tx failed to consume buffer within timeout")("timeout_ms", TIMEOUT_US) - ("kind", kind2str(_kind)); + if (!reg_buf) { + log::error("RDMA tx failed to consume buffer within timeout") + ("timeout_us", TIMEOUT_US)("kind", kind2str(_kind)); sent = 0; return Result::error_timeout; } - uint32_t tmp_sent = std::min(static_cast(trx_sz), sz); - if (tmp_sent != trx_sz) { - log::debug("RDMA tx sent size differs from transfer size")("requested_size", tmp_sent) - ("trx_sz", trx_sz)("kind", kind2str(_kind)); + // 2) Copy payload and pad to trx_sz + char* data_ptr = reinterpret_cast(reg_buf); + uint32_t to_send = std::min(trx_sz, sz); + std::memcpy(data_ptr, ptr, to_send); + if (to_send < trx_sz) // pad any unused space + std::memset(data_ptr + to_send, 0, trx_sz - to_send); + + // ---- write trailer at fixed offset ---- + uint64_t seq = global_seq.fetch_add(1, std::memory_order_relaxed); + *reinterpret_cast(data_ptr + trx_sz) = seq; + + // Always send full payload-slot + trailer + uint32_t total_len = trx_sz + static_cast(TRAILER); + + uint32_t idx = next_tx_idx.fetch_add(1, std::memory_order_relaxed) + % static_cast(ep_ctxs.size()); + ep_ctx_t* chosen = ep_ctxs[idx]; + if (!chosen) { + log::error("RDMA tx endpoint #%u is null, cannot send")("idx", idx); + sent = 0; + add_to_queue(reg_buf); + return Result::error_general_failure; } - std::memcpy(reg_buf, ptr, tmp_sent); - - // Transmit the buffer through RDMA - int err = libfabric_ep_ops.ep_send_buf(ep_ctx, reg_buf, tmp_sent); + int rc = libfabric_ep_ops.ep_send_buf(chosen, reg_buf, total_len); + // Signal that there’s now room for more sends notify_buf_available(); - sent = tmp_sent; + sent = to_send; - if (err) { - log::error("Failed to send buffer through RDMA tx")("error", fi_strerror(-err)) - ("kind", kind2str(_kind)); - // Add the buffer back to the queue in case of failure - Result res = add_to_queue(reg_buf); - if (res != Result::success) { - log::error("Failed to add buffer to RDMA tx queue")("error", result2str(res)) - ("kind", kind2str(_kind)); + if (rc) { + log::error("Failed to send buffer through RDMA tx") + ("error", fi_strerror(-rc))("kind", kind2str(_kind)); + // Return buffer to pool + Result qr = add_to_queue(reg_buf); + if (qr != Result::success) { + log::error("Failed to return buffer to queue after send error") + ("error", result2str(qr))("kind", kind2str(_kind)); } - sent = 0; return Result::error_general_failure; } @@ -187,4 +231,5 @@ Result RdmaTx::on_receive(context::Context& ctx, void *ptr, uint32_t sz, uint32_ return Result::success; } + } // namespace mesh::connection \ No newline at end of file diff --git a/media-proxy/src/mesh/manager_bridges.cc b/media-proxy/src/mesh/manager_bridges.cc index 58c08efad..c0c56d4f8 100644 --- a/media-proxy/src/mesh/manager_bridges.cc +++ b/media-proxy/src/mesh/manager_bridges.cc @@ -228,6 +228,9 @@ int BridgesManager::create_bridge(context::Context& ctx, Connection*& bridge, req.payload_args.rdma_args.transfer_size = cfg.conn_config.buf_parts.total_size(); req.payload_args.rdma_args.queue_size = 16; + req.payload_args.rdma_args.provider = strdup(cfg.conn_config.options.rdma.provider.c_str()); + char* _rdma_provider_dup = req.payload_args.rdma_args.provider; + req.payload_args.rdma_args.num_endpoints = cfg.conn_config.options.rdma.num_endpoints; // Create Egress RDMA Bridge if (cfg.kind == Kind::transmitter) { @@ -245,6 +248,7 @@ int BridgesManager::create_bridge(context::Context& ctx, Connection*& bridge, if (res != Result::success) { log::error("Error configuring RDMA Egress bridge: %s", result2str(res)); + free(_rdma_provider_dup); delete egress_bridge; return -1; } @@ -254,6 +258,7 @@ int BridgesManager::create_bridge(context::Context& ctx, Connection*& bridge, } else if (cfg.kind == Kind::receiver) { auto ingress_bridge = new(std::nothrow) RdmaRx; if (!ingress_bridge) + free(_rdma_provider_dup); return -ENOMEM; req.type = is_rx; @@ -266,14 +271,18 @@ int BridgesManager::create_bridge(context::Context& ctx, Connection*& bridge, if (res != Result::success) { log::error("Error configuring RDMA Ingress bridge: %s", result2str(res)); + free(_rdma_provider_dup); delete ingress_bridge; return -1; } bridge = ingress_bridge; } else { + free(_rdma_provider_dup); return -1; } + // we've handed off the provider into Rdma::configure, free our copy + free(_rdma_provider_dup); } // log::debug("BEFORE ESTABLISH"); diff --git a/media-proxy/tests/CMakeLists.txt b/media-proxy/tests/CMakeLists.txt index 0f215b342..e8b49fb27 100644 --- a/media-proxy/tests/CMakeLists.txt +++ b/media-proxy/tests/CMakeLists.txt @@ -51,6 +51,16 @@ file(GLOB PHYS_RDMA_TEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/conn_rdma_real_ep_test.cc" ) +# Find source files for physical-RDMA-specific tests +file(GLOB PHYS_RDMA_RX_TEST_SOURCES + "${CMAKE_CURRENT_SOURCE_DIR}/rdma_rx_test.cpp" +) + +# Find source files for physical-RDMA-specific tests +file(GLOB PHYS_RDMA_TX_TEST_SOURCES + "${CMAKE_CURRENT_SOURCE_DIR}/rdma_tx_test.cpp" +) + # Find source files for physical-RDMA-specific tests file(GLOB RDMA_BASE_TEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/conn_rdma_tests.cc" @@ -102,6 +112,26 @@ target_include_directories(conn_rdma_real_ep_test PUBLIC ${CMAKE_SOURCE_DIR}/sdk/3rdparty/libmemif/src ) +# Add an executable for physical-RDMA-RX-specific test +add_executable(rdma_rx_test ${PHYS_RDMA_RX_TEST_SOURCES}) +target_link_libraries(rdma_rx_test PRIVATE gtest gtest_main ${MP_LIB} rdmacm) +target_include_directories(rdma_rx_test PUBLIC + ${CMAKE_SOURCE_DIR}/media-proxy/include + ${CMAKE_BINARY_DIR}/media-proxy/generated + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/sdk/3rdparty/libmemif/src +) + +# Add an executable for physical-RDMA-TX-specific test +add_executable(rdma_tx_test ${PHYS_RDMA_TX_TEST_SOURCES}) +target_link_libraries(rdma_tx_test PRIVATE gtest gtest_main ${MP_LIB} rdmacm) +target_include_directories(rdma_tx_test PUBLIC + ${CMAKE_SOURCE_DIR}/media-proxy/include + ${CMAKE_BINARY_DIR}/media-proxy/generated + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/sdk/3rdparty/libmemif/src +) + # Add tests to CTest add_test(NAME conn_rdma_rx_tx_unit_tests COMMAND conn_rdma_rx_tx_unit_tests) add_test(NAME media_proxy_unit_tests COMMAND media_proxy_unit_tests) diff --git a/media-proxy/tests/conn_rdma_tests.cc b/media-proxy/tests/conn_rdma_tests.cc index eafe1c6a8..ab3854480 100644 --- a/media-proxy/tests/conn_rdma_tests.cc +++ b/media-proxy/tests/conn_rdma_tests.cc @@ -10,6 +10,13 @@ using namespace mesh; using namespace connection; using namespace mesh::log; +// Helper alias +constexpr std::size_t NUM_EPS = RDMA_NUM_EPS; + + +// ────────────────────────────────────────────────────────────────────────────── +// A thin wrapper that exposes the protected internals we want to poke at +// ────────────────────────────────────────────────────────────────────────────── class TestRdma : public Rdma { public: using Rdma::add_to_queue; @@ -19,7 +26,7 @@ class TestRdma : public Rdma { using Rdma::configure_endpoint; using Rdma::consume_from_queue; using Rdma::ep_cfg; - using Rdma::ep_ctx; + using Rdma::ep_ctxs; using Rdma::init; using Rdma::init_queue_with_elements; using Rdma::m_dev_handle; @@ -28,7 +35,9 @@ class TestRdma : public Rdma { using Rdma::on_shutdown; using Rdma::trx_sz; void set_kind(Kind kind) { _kind = kind; } - Result start_threads(mesh::context::Context& ctx) {return Result::success;} + + /* don’t really spin up threads in the unit-test build */ + Result start_threads(context::Context&) override { return Result::success; } }; // Test fixture @@ -93,21 +102,28 @@ TEST_F(RdmaTest, EstablishSuccess) { .WillOnce(::testing::DoAll(::testing::SetArgPointee<0>(dev_handle), ::testing::Return(0))); EXPECT_CALL(*mock_ep_ops, ep_init(::testing::_, ::testing::_)) - .WillOnce([](ep_ctx_t **ep_ctx, ep_cfg_t *cfg) -> int { + .Times(NUM_EPS) + .WillRepeatedly([](ep_ctx_t **ep_ctx, ep_cfg_t * /*cfg*/) -> int { *ep_ctx = new ep_ctx_t(); (*ep_ctx)->stop_flag = false; - (*ep_ctx)->ep = reinterpret_cast(0xdeadbeef); // Mock endpoint - return 0; // Success + (*ep_ctx)->ep = reinterpret_cast(0xdeadbeef); + return 0; }); - EXPECT_CALL(*mock_ep_ops, ep_reg_mr(_, _, _)).WillRepeatedly(Return(0)); - - EXPECT_CALL(*mock_ep_ops, ep_destroy(_)).WillOnce([](ep_ctx_t **ep_ctx) -> int { - delete *ep_ctx; - return 0; - }); + // Each endpoint registers its MR once: + EXPECT_CALL(*mock_ep_ops, ep_reg_mr(_, _, _)) + .Times(NUM_EPS) + .WillRepeatedly(Return(0)); + + // on_shutdown (called in Rdma’s destructor) must destroy all NUM_EPS ep_ctxs: + EXPECT_CALL(*mock_ep_ops, ep_destroy(_)) + .Times(NUM_EPS) + .WillRepeatedly([](ep_ctx_t **ep_ctx) -> int { + delete *ep_ctx; + return 0; + }); - auto result = rdma->on_establish(ctx); + auto result = rdma->establish(ctx); ASSERT_EQ(result, Result::success); ASSERT_EQ(rdma->state(), State::active); } @@ -121,10 +137,13 @@ TEST_F(RdmaTest, EstablishFailureEpInit) { EXPECT_CALL(*mock_dev_ops, rdma_init(::testing::_)) .WillOnce(::testing::DoAll(::testing::SetArgPointee<0>(dev_handle), ::testing::Return(0))); - // Simulate failure during ep_init - EXPECT_CALL(*mock_ep_ops, ep_init(_, _)).WillOnce(Return(-1)); + // If the very first ep_init fails, Rdma::establish should abort immediately + EXPECT_CALL(*mock_ep_ops, ep_init(_, _)) + .WillOnce(Return(-1)) + .RetiresOnSaturation(); + // No further ep_init calls are expected after a failure. - auto result = rdma->on_establish(ctx); + auto result = rdma->establish(ctx); ASSERT_EQ(result, Result::error_initialization_failed); ASSERT_EQ(rdma->state(), State::closed); } @@ -140,19 +159,28 @@ TEST_F(RdmaTest, CleanupResources) .WillOnce(::testing::DoAll(::testing::SetArgPointee<0>(dev_handle), ::testing::Return(0))); - EXPECT_CALL(*mock_ep_ops, ep_init(_, _)).WillOnce([](ep_ctx_t **ep_ctx, ep_cfg_t *cfg) -> int { - *ep_ctx = new ep_ctx_t(); - return 0; - }); - - EXPECT_CALL(*mock_ep_ops, ep_reg_mr(_, _, _)).WillRepeatedly(Return(0)); + // Expect rdma->establish to initialize all NUM_EPS endpoints: + EXPECT_CALL(*mock_ep_ops, ep_init(_, _)) + .Times(NUM_EPS) + .WillRepeatedly([](ep_ctx_t **ep_ctx, ep_cfg_t *) -> int { + *ep_ctx = new ep_ctx_t(); + return 0; + }); - EXPECT_CALL(*mock_ep_ops, ep_destroy(_)).WillOnce([](ep_ctx_t **ep_ctx) -> int { - delete *ep_ctx; - return 0; - }); + // Each endpoint also registers its MR once: + EXPECT_CALL(*mock_ep_ops, ep_reg_mr(_, _, _)) + .Times(NUM_EPS) + .WillRepeatedly(Return(0)); + + // on_shutdown should destroy all NUM_EPS ep_ctxs: + EXPECT_CALL(*mock_ep_ops, ep_destroy(_)) + .Times(NUM_EPS) + .WillRepeatedly([](ep_ctx_t **ep_ctx) -> int { + delete *ep_ctx; + return 0; + }); - auto result = rdma->on_establish(ctx); + auto result = rdma->establish(ctx); ASSERT_EQ(result, Result::success); // Trigger cleanup @@ -164,18 +192,27 @@ TEST_F(RdmaTest, CleanupResources) TEST_F(RdmaTest, ValidateStateTransitions) { - EXPECT_CALL(*mock_ep_ops, ep_init(_, _)).WillOnce([](ep_ctx_t **ep_ctx, ep_cfg_t *cfg) -> int { - *ep_ctx = new ep_ctx_t(); - return 0; // Success - }); + // establish will call ep_init() once per endpoint + EXPECT_CALL(*mock_ep_ops, ep_init(_, _)) + .Times(NUM_EPS) + .WillRepeatedly([](ep_ctx_t **ep_ctx, ep_cfg_t *) -> int { + *ep_ctx = new ep_ctx_t(); + return 0; // Success + }); - EXPECT_CALL(*mock_ep_ops, ep_destroy(_)).WillOnce([](ep_ctx_t **ep_ctx) -> int { - delete *ep_ctx; - *ep_ctx = nullptr; - return 0; // Success - }); + // on_shutdown will destroy all endpoints + EXPECT_CALL(*mock_ep_ops, ep_destroy(_)) + .Times(NUM_EPS) + .WillRepeatedly([](ep_ctx_t **ep_ctx) -> int { + delete *ep_ctx; + *ep_ctx = nullptr; + return 0; // Success + }); - EXPECT_CALL(*mock_ep_ops, ep_reg_mr(_, _, _)).WillRepeatedly(Return(0)); + // Each endpoint will register its MR once + EXPECT_CALL(*mock_ep_ops, ep_reg_mr(_, _, _)) + .Times(NUM_EPS) + .WillRepeatedly(Return(0)); libfabric_ctx *dev_handle = nullptr; @@ -410,22 +447,30 @@ TEST_F(RdmaTest, RepeatedShutdown) { }); // Mock ep_init - EXPECT_CALL(*mock_ep_ops, ep_init(_, _)).WillOnce([](ep_ctx_t **ep_ctx, ep_cfg_t *cfg) -> int { - *ep_ctx = new ep_ctx_t(); - return 0; // Success - }); + EXPECT_CALL(*mock_ep_ops, ep_init(_, _)) + .Times(NUM_EPS) + .WillRepeatedly([](ep_ctx_t **ep_ctx, ep_cfg_t *) -> int { + *ep_ctx = new ep_ctx_t(); + return 0; // Success + }); // Mock ep_reg_mr - EXPECT_CALL(*mock_ep_ops, ep_reg_mr(_, _, _)).WillRepeatedly(Return(0)); + EXPECT_CALL(*mock_ep_ops, ep_reg_mr(_, _, _)) + .Times(NUM_EPS) + .WillRepeatedly(Return(0)); // Mock ep_destroy - EXPECT_CALL(*mock_ep_ops, ep_destroy(_)).Times(1).WillRepeatedly([](ep_ctx_t **ep_ctx) -> int { - delete *ep_ctx; - *ep_ctx = nullptr; - return 0; // Success - }); + EXPECT_CALL(*mock_ep_ops, ep_destroy(_)) + .Times(NUM_EPS) + .WillRepeatedly([](ep_ctx_t **ep_ctx) -> int { + delete *ep_ctx; + *ep_ctx = nullptr; + return 0; // Success + }); - EXPECT_CALL(*mock_dev_ops, rdma_deinit(::testing::_)).Times(1).WillOnce(::testing::Return(0)); + EXPECT_CALL(*mock_dev_ops, rdma_deinit(::testing::_)) + .Times(1) + .WillOnce(::testing::Return(0)); // Use ConfigureRdma helper to configure the RDMA instance ConfigureRdma(rdma, ctx, 1024, Kind::transmitter); diff --git a/media-proxy/tests/libfabric_dev_tests.cc b/media-proxy/tests/libfabric_dev_tests.cc index 29e536427..4032f6e74 100644 --- a/media-proxy/tests/libfabric_dev_tests.cc +++ b/media-proxy/tests/libfabric_dev_tests.cc @@ -1,154 +1,192 @@ -extern "C" { -#include "libfabric_dev.h" -} +// Define FFF globals exactly once. +#define DEFINE_FFF_GLOBALS +#include -#include -#include "libfabric_mocks.h" +#include "libfabric_dev_tests.h" +#include +#include +#include -// Mock functions +// --------------------------------------------------------------------------- +// Additional fake declarations not present in libfabric_mocks.h. FAKE_VALUE_FUNC(int, fi_fabric, struct fi_fabric_attr *, struct fid_fabric **, void *); FAKE_VALUE_FUNC(int, domain, struct fid_fabric *, struct fi_info *, struct fid_domain **, void *); -class LibfabricDevTest : public ::testing::Test -{ - protected: - void SetUp() override - { - ops_fabric = {.domain = domain}; - ops = {.close = custom_close}; - fabric = {.fid = {.ops = &ops}, .ops = &ops_fabric}; - dom = {.fid = {.ops = &ops}}; - - RESET_FAKE(fi_getinfo); - RESET_FAKE(fi_freeinfo); - RESET_FAKE(fi_fabric); - RESET_FAKE(domain); - } - - void TearDown() override {} - - static struct fi_ops_fabric ops_fabric; - static struct fid_domain dom; - static fid_fabric fabric; - static struct fi_ops ops; +// rdma_getaddrinfo and rdma_freeaddrinfo declarations matching the system header. +FAKE_VALUE_FUNC(int, rdma_getaddrinfo, const char*, const char*, const struct rdma_addrinfo*, struct rdma_addrinfo**); +FAKE_VOID_FUNC(rdma_freeaddrinfo, struct rdma_addrinfo*); + + +// --------------------------------------------------------------------------- +// Custom fake for rdma_getaddrinfo: allocate a minimal dummy structure. +static int rdma_getaddrinfo_custom_fake(const char *ip, const char *port, + const struct rdma_addrinfo *hints, + struct rdma_addrinfo **res) { + *res = (struct rdma_addrinfo *)malloc(sizeof(struct rdma_addrinfo)); + // Set minimal fields for both RX and TX. + (*res)->ai_src_len = sizeof(struct sockaddr_in); + (*res)->ai_src_addr = (struct sockaddr *)malloc(sizeof(struct sockaddr_in)); + memset((*res)->ai_src_addr, 0, sizeof(struct sockaddr_in)); + (*res)->ai_dst_len = sizeof(struct sockaddr_in); + (*res)->ai_dst_addr = (struct sockaddr *)malloc(sizeof(struct sockaddr_in)); + memset((*res)->ai_dst_addr, 0, sizeof(struct sockaddr_in)); + return 0; +} - static int fi_fabric_custom_fake(struct fi_fabric_attr *attr, struct fid_fabric **fabric, - void *context) - { - *fabric = &LibfabricDevTest::fabric; - return 0; +// Custom fake for rdma_freeaddrinfo: free the allocated structure. +static void rdma_freeaddrinfo_custom_fake(struct rdma_addrinfo *res) { + if (res) { + if (res->ai_src_addr) + free(res->ai_src_addr); + if (res->ai_dst_addr) + free(res->ai_dst_addr); + free(res); } - static int domain_custom_fake(struct fid_fabric *fabric, struct fi_info *info, - struct fid_domain **dom, void *context) - { - *dom = &LibfabricDevTest::dom; - return 0; - } -}; +} -struct fid_fabric LibfabricDevTest::fabric; -struct fi_ops_fabric LibfabricDevTest::ops_fabric; -struct fi_ops LibfabricDevTest::ops; -struct fid_domain LibfabricDevTest::dom; +// --------------------------------------------------------------------------- +// Global objects for our additional mocks. +static struct fi_ops_fabric ops_fabric; +static struct fid_domain dom; +static fid_fabric fabric; +static struct fi_ops ops; + +// Custom fake implementations for fi_fabric and domain. +static int fi_fabric_custom_fake(struct fi_fabric_attr *attr, struct fid_fabric **fabric, void *context) { + *fabric = &::fabric; + return 0; +} -TEST_F(LibfabricDevTest, TestRdmaInitSuccess) -{ - libfabric_ctx *ctx = nullptr; - fi_getinfo_fake.custom_fake = fi_getinfo_custom_fake; +static int domain_custom_fake(struct fid_fabric *fabric, struct fi_info *info, + struct fid_domain **dom, void *context) { + *dom = &::dom; + return 0; +} + +// --------------------------------------------------------------------------- +// Test fixture implementation. +void LibfabricDevTest::SetUp() { + // Allocate a dummy libfabric context. + ctx = (libfabric_ctx*)malloc(sizeof(libfabric_ctx)); + memset(ctx, 0, sizeof(libfabric_ctx)); + // Set receiver parameters. + ctx->kind = FI_KIND_RECEIVER; + ctx->local_ip = strdup("127.0.0.1"); + ctx->local_port = strdup("12345"); + + // Initialize additional mock objects. + ops_fabric.domain = domain; + ops.close = custom_close; // custom_close is defined in libfabric_mocks.cc. + fabric.fid.ops = &ops; + fabric.ops = &ops_fabric; + dom.fid.ops = &ops; + + // Reset all fakes. + RESET_FAKE(fi_getinfo); + RESET_FAKE(fi_freeinfo); + RESET_FAKE(fi_fabric); + RESET_FAKE(domain); + RESET_FAKE(rdma_getaddrinfo); + RESET_FAKE(rdma_freeaddrinfo); + + // Set custom fake functions. + fi_getinfo_fake.custom_fake = fi_getinfo_custom_fake; // Defined in libfabric_mocks.cc. fi_fabric_fake.custom_fake = fi_fabric_custom_fake; domain_fake.return_val = 0; + rdma_getaddrinfo_fake.custom_fake = rdma_getaddrinfo_custom_fake; + rdma_freeaddrinfo_fake.custom_fake = rdma_freeaddrinfo_custom_fake; +} - int ret = libfabric_dev_ops.rdma_init(&ctx); +void LibfabricDevTest::TearDown() { + if (ctx) + libfabric_dev_ops.rdma_deinit(&ctx); +} +// --------------------------------------------------------------------------- +// Tests + +TEST_F(LibfabricDevTest, TestRdmaInitSuccess) { + int ret = libfabric_dev_ops.rdma_init(&ctx); ASSERT_EQ(ret, 0); ASSERT_NE(ctx, nullptr); + // Expect one call each for fi_getinfo, fi_fabric, and domain, and one free of hints. ASSERT_EQ(fi_getinfo_fake.call_count, 1); ASSERT_EQ(fi_fabric_fake.call_count, 1); ASSERT_EQ(domain_fake.call_count, 1); ASSERT_EQ(fi_freeinfo_fake.call_count, 1); - libfabric_dev_ops.rdma_deinit(&ctx); } -TEST_F(LibfabricDevTest, TestRdmaInitFailGetinfo) -{ - libfabric_ctx *ctx = nullptr; +TEST_F(LibfabricDevTest, TestRdmaInitFailGetinfo) { + fi_getinfo_fake.custom_fake = NULL; fi_getinfo_fake.return_val = -1; - + std::string captured_stderr; testing::internal::CaptureStderr(); int ret = libfabric_dev_ops.rdma_init(&ctx); - captured_stderr = testing::internal::GetCapturedStderr(); EXPECT_FALSE(captured_stderr.empty()); - + ASSERT_EQ(ret, -1); - ASSERT_EQ(ctx, nullptr); + ASSERT_FALSE(ctx->is_initialized); + ASSERT_EQ(fi_getinfo_fake.call_count, 1); ASSERT_EQ(fi_fabric_fake.call_count, 0); ASSERT_EQ(domain_fake.call_count, 0); ASSERT_EQ(fi_freeinfo_fake.call_count, 1); } -TEST_F(LibfabricDevTest, TestRdmaInitFailFabric) -{ - libfabric_ctx *ctx = nullptr; +TEST_F(LibfabricDevTest, TestRdmaInitFailFabric) { fi_getinfo_fake.custom_fake = fi_getinfo_custom_fake; + fi_fabric_fake.custom_fake = NULL; fi_fabric_fake.return_val = -1; - std::string captured_stderr; testing::internal::CaptureStderr(); int ret = libfabric_dev_ops.rdma_init(&ctx); - captured_stderr = testing::internal::GetCapturedStderr(); EXPECT_FALSE(captured_stderr.empty()); - ASSERT_EQ(ret, -1); ASSERT_EQ(ctx, nullptr); ASSERT_EQ(fi_getinfo_fake.call_count, 1); ASSERT_EQ(fi_fabric_fake.call_count, 1); ASSERT_EQ(domain_fake.call_count, 0); + // Expect two freeinfo calls: one for hints and one for partially allocated info. ASSERT_EQ(fi_freeinfo_fake.call_count, 2); } -TEST_F(LibfabricDevTest, TestRdmaInitFailDomain) -{ - libfabric_ctx *ctx = nullptr; +TEST_F(LibfabricDevTest, TestRdmaInitFailDomain) { fi_getinfo_fake.custom_fake = fi_getinfo_custom_fake; fi_fabric_fake.custom_fake = fi_fabric_custom_fake; + domain_fake.custom_fake = NULL; domain_fake.return_val = -1; - std::string captured_stderr; testing::internal::CaptureStderr(); int ret = libfabric_dev_ops.rdma_init(&ctx); - captured_stderr = testing::internal::GetCapturedStderr(); EXPECT_FALSE(captured_stderr.empty()); - ASSERT_EQ(ret, -1); ASSERT_EQ(ctx, nullptr); ASSERT_EQ(fi_getinfo_fake.call_count, 1); ASSERT_EQ(fi_fabric_fake.call_count, 1); ASSERT_EQ(domain_fake.call_count, 1); + // Expect two freeinfo calls: one for hints and one for partially allocated info. ASSERT_EQ(fi_freeinfo_fake.call_count, 2); } -TEST_F(LibfabricDevTest, TestRdmaDeinitSuccess) -{ - libfabric_ctx *ctx = nullptr; - fi_getinfo_fake.custom_fake = fi_getinfo_custom_fake; - fi_fabric_fake.custom_fake = fi_fabric_custom_fake; - domain_fake.return_val = 0; - - libfabric_dev_ops.rdma_init(&ctx); +TEST_F(LibfabricDevTest, TestRdmaDeinitSuccess) { + int ret = libfabric_dev_ops.rdma_init(&ctx); ASSERT_NE(ctx, nullptr); - - int ret = libfabric_dev_ops.rdma_deinit(&ctx); - + ret = libfabric_dev_ops.rdma_deinit(&ctx); ASSERT_EQ(ret, 0); ASSERT_EQ(ctx, nullptr); + // Expect a total of two freeinfo calls. ASSERT_EQ(fi_freeinfo_fake.call_count, 2); } + +TEST_F(LibfabricDevTest, TestRdmaInitNullContext) { + int ret = libfabric_dev_ops.rdma_init(nullptr); + ASSERT_EQ(ret, -EINVAL); +} diff --git a/media-proxy/tests/libfabric_dev_tests.h b/media-proxy/tests/libfabric_dev_tests.h new file mode 100644 index 000000000..9efe88cd5 --- /dev/null +++ b/media-proxy/tests/libfabric_dev_tests.h @@ -0,0 +1,17 @@ +#ifndef LIBFABRIC_DEV_TESTS_H +#define LIBFABRIC_DEV_TESTS_H + +#include +#include +#include "libfabric_dev.h" +#include "libfabric_mocks.h" // This header declares fi_getinfo_custom_fake and custom_close. + +class LibfabricDevTest : public ::testing::Test { + protected: + void SetUp() override; + void TearDown() override; + // Test context used by libfabric_dev. + libfabric_ctx* ctx; +}; + +#endif // LIBFABRIC_DEV_TESTS_H \ No newline at end of file diff --git a/media-proxy/tests/libfabric_ep_tests.cc b/media-proxy/tests/libfabric_ep_tests.cc index 4f0d673d8..1cf5bb127 100644 --- a/media-proxy/tests/libfabric_ep_tests.cc +++ b/media-proxy/tests/libfabric_ep_tests.cc @@ -4,7 +4,6 @@ extern "C" { } #include - #include "libfabric_mocks.h" FAKE_VALUE_FUNC(int, av_insert, struct fid_av *, const void *, size_t, fi_addr_t *, uint64_t, @@ -44,7 +43,20 @@ class LibfabricEpTest : public ::testing::Test domain = {.ops = &ops_domain}; info = fi_allocinfo(); info->ep_attr->type = FI_EP_RDM; - rdma_ctx = {.domain = &domain, .info = info}; + + /* dummy destination so av_insert() executes on TX paths */ + static uint8_t dummy_dest = 0; // just needs to be non-NULL + info->dest_addr = &dummy_dest; + + /* fabricate a fabric object – ep_init() sanity-check needs it */ + static struct fid_fabric fabric = {}; + + memset(&rdma_ctx, 0, sizeof(rdma_ctx)); + rdma_ctx.info = info; + rdma_ctx.domain = &domain; + rdma_ctx.fabric = &fabric; + rdma_ctx.ep_attr_type = FI_EP_RDM; + rdma_ctx.is_initialized = true; ep_ctx = {.ep = &ep, .av = &av, .cq_ctx = cq_ctx, .rdma_ctx = &rdma_ctx}; @@ -273,7 +285,7 @@ TEST_F(LibfabricEpTest, TestEpInitSuccessRX) ASSERT_EQ(ret, 0); ASSERT_NE(ep_ctx_ptr, nullptr); - ASSERT_EQ(fi_getinfo_fake.call_count, 1); + ASSERT_EQ(fi_getinfo_fake.call_count, 0); ASSERT_EQ(endpoint_fake.call_count, 1); ASSERT_EQ(rdma_cq_open_mock_fake.call_count, 1); ASSERT_EQ(av_open_fake.call_count, 1); @@ -304,7 +316,7 @@ TEST_F(LibfabricEpTest, TestEpInitSuccessTX) ASSERT_EQ(ret, 0); ASSERT_NE(ep_ctx_ptr, nullptr); - ASSERT_EQ(fi_getinfo_fake.call_count, 1); + ASSERT_EQ(fi_getinfo_fake.call_count, 0); ASSERT_EQ(endpoint_fake.call_count, 1); ASSERT_EQ(rdma_cq_open_mock_fake.call_count, 1); ASSERT_EQ(av_open_fake.call_count, 1); @@ -332,7 +344,7 @@ TEST_F(LibfabricEpTest, TestEpInitSuccessDefault) ASSERT_EQ(ret, 0); ASSERT_NE(ep_ctx_ptr, nullptr); - ASSERT_EQ(fi_getinfo_fake.call_count, 1); + ASSERT_EQ(fi_getinfo_fake.call_count, 0); ASSERT_EQ(endpoint_fake.call_count, 1); ASSERT_EQ(rdma_cq_open_mock_fake.call_count, 1); ASSERT_EQ(av_open_fake.call_count, 1); @@ -346,6 +358,8 @@ TEST_F(LibfabricEpTest, TestEpInitGetinfoFail) { fi_getinfo_fake.return_val = -1; + /* Force ep_init() to hit the ‘device ctx not initialised’ guard */ + rdma_ctx.fabric = nullptr; endpoint_fake.custom_fake = endpoint_custom_fake; rdma_cq_open_mock_fake.custom_fake = rdma_cq_open_custom_fake; av_open_fake.custom_fake = av_open_custom_fake; @@ -364,9 +378,10 @@ TEST_F(LibfabricEpTest, TestEpInitGetinfoFail) captured_stderr = testing::internal::GetCapturedStderr(); EXPECT_FALSE(captured_stderr.empty()); - ASSERT_EQ(ret, -1); + rdma_ctx.fabric = nullptr; + ASSERT_EQ(ret, -EINVAL); ASSERT_EQ(ep_ctx_ptr, nullptr); - ASSERT_EQ(fi_getinfo_fake.call_count, 1); + ASSERT_EQ(fi_getinfo_fake.call_count, 0); ASSERT_EQ(endpoint_fake.call_count, 0); ASSERT_EQ(av_open_fake.call_count, 0); ASSERT_EQ(rdma_cq_open_mock_fake.call_count, 0); @@ -400,7 +415,7 @@ TEST_F(LibfabricEpTest, TestEpInitEndpointFail) ASSERT_EQ(ret, -1); ASSERT_EQ(ep_ctx_ptr, nullptr); - ASSERT_EQ(fi_getinfo_fake.call_count, 1); + ASSERT_EQ(fi_getinfo_fake.call_count, 0); ASSERT_EQ(endpoint_fake.call_count, 1); ASSERT_EQ(rdma_cq_open_mock_fake.call_count, 0); ASSERT_EQ(av_open_fake.call_count, 0); @@ -434,7 +449,7 @@ TEST_F(LibfabricEpTest, TestEpInitAv_openFail) ASSERT_EQ(ret, -1); ASSERT_EQ(ep_ctx_ptr, nullptr); - ASSERT_EQ(fi_getinfo_fake.call_count, 1); + ASSERT_EQ(fi_getinfo_fake.call_count, 0); ASSERT_EQ(endpoint_fake.call_count, 1); ASSERT_EQ(rdma_cq_open_mock_fake.call_count, 1); ASSERT_EQ(av_open_fake.call_count, 1); @@ -468,7 +483,7 @@ TEST_F(LibfabricEpTest, TestEpInitRdma_cq_openFail) ASSERT_EQ(ret, -1); ASSERT_EQ(ep_ctx_ptr, nullptr); - ASSERT_EQ(fi_getinfo_fake.call_count, 1); + ASSERT_EQ(fi_getinfo_fake.call_count, 0); ASSERT_EQ(endpoint_fake.call_count, 1); ASSERT_EQ(rdma_cq_open_mock_fake.call_count, 1); ASSERT_EQ(av_open_fake.call_count, 0); @@ -503,7 +518,7 @@ TEST_F(LibfabricEpTest, TestEpInitEnableFail) ASSERT_EQ(ret, -1); ASSERT_EQ(ep_ctx_ptr, nullptr); - ASSERT_EQ(fi_getinfo_fake.call_count, 1); + ASSERT_EQ(fi_getinfo_fake.call_count, 0); ASSERT_EQ(endpoint_fake.call_count, 1); ASSERT_EQ(rdma_cq_open_mock_fake.call_count, 1); ASSERT_EQ(av_open_fake.call_count, 1); @@ -537,7 +552,7 @@ TEST_F(LibfabricEpTest, TestEpInitAv_insertFail) ASSERT_EQ(ret, -1); ASSERT_EQ(ep_ctx_ptr, nullptr); - ASSERT_EQ(fi_getinfo_fake.call_count, 1); + ASSERT_EQ(fi_getinfo_fake.call_count, 0); ASSERT_EQ(endpoint_fake.call_count, 1); ASSERT_EQ(rdma_cq_open_mock_fake.call_count, 1); ASSERT_EQ(av_open_fake.call_count, 1); @@ -571,7 +586,7 @@ TEST_F(LibfabricEpTest, TestEpInitAv_insertReturnsNot1) ASSERT_EQ(ret, -EINVAL); ASSERT_EQ(ep_ctx_ptr, nullptr); - ASSERT_EQ(fi_getinfo_fake.call_count, 1); + ASSERT_EQ(fi_getinfo_fake.call_count, 0); ASSERT_EQ(endpoint_fake.call_count, 1); ASSERT_EQ(rdma_cq_open_mock_fake.call_count, 1); ASSERT_EQ(av_open_fake.call_count, 1); @@ -605,7 +620,7 @@ TEST_F(LibfabricEpTest, TestEpInitBindFail) ASSERT_EQ(ret, -1); ASSERT_EQ(ep_ctx_ptr, nullptr); - ASSERT_EQ(fi_getinfo_fake.call_count, 1); + ASSERT_EQ(fi_getinfo_fake.call_count, 0); ASSERT_EQ(endpoint_fake.call_count, 1); ASSERT_EQ(rdma_cq_open_mock_fake.call_count, 1); ASSERT_EQ(av_open_fake.call_count, 1); @@ -733,9 +748,10 @@ TEST_F(LibfabricEpTest, TestEpDestroySuccess) int ret = libfabric_ep_ops.ep_destroy(&ep_ctx_ptr); + ep_ctx.data_mr = reinterpret_cast(0x1); + ASSERT_EQ(ret, 0); ASSERT_EQ(ep_ctx_ptr, nullptr); - ASSERT_EQ(rdma_unreg_mr_mock_fake.call_count, 1); ASSERT_EQ(custom_close_fake.call_count, 3); // ep, cq, av } diff --git a/media-proxy/tests/metrics.h b/media-proxy/tests/metrics.h new file mode 100644 index 000000000..d1a6f79e3 --- /dev/null +++ b/media-proxy/tests/metrics.h @@ -0,0 +1,20 @@ +#pragma once +#include + +/* wire header that prefixes every RDMA payload */ +#pragma pack(push, 1) +struct FrameHdr { + uint32_t frame; /* network-order */ + uint64_t tx_ns; /* network-order, CLOCK_REALTIME in ns */ +}; +#pragma pack(pop) + +/*UDP message RX→TX with results */ +struct StatsMsg { + uint32_t payload_mb; /* MB for this run */ + uint32_t queue; /* queue size */ + double ttlb_spaced_ms; /* TTLB @60fps */ + double ttlb_full_ms; /* TTLB @ max throughput */ + double cpu_tx_pct; /* TX process CPU-load percent */ + double cpu_rx_pct; /* RX process CPU-load percent */ +}; \ No newline at end of file diff --git a/media-proxy/tests/rdma_rx_test.cpp b/media-proxy/tests/rdma_rx_test.cpp new file mode 100644 index 000000000..3bca07f3b --- /dev/null +++ b/media-proxy/tests/rdma_rx_test.cpp @@ -0,0 +1,381 @@ +#include +#include +#include +#include +#include + +// Include your RDMA Rx and logging interfaces +#include "mesh/conn_rdma_rx.h" +#include "mesh/concurrency.h" +#include "logger.h" +#include + +#include "metrics.h" // FrameHdr + StatsMsg +#include +#include +#include +#include + +using namespace mesh::log; + +namespace mesh { +namespace connection { + +/** Logging level (optional) **/ +Level log_level = Level::fatal; + +/** -------------------------------------------------------------------- + * PerfReceiver: + * - Extends Connection to handle incoming data with minimal overhead + * - Increments a counter per receive, does no string copy or print + * ------------------------------------------------------------------ **/ +class PerfReceiver : public connection::Connection { +public: + std::atomic received_count{0}; + // Track last frame and total missing + bool have_last_ = false; + uint32_t last_frame_ = 0; + std::atomic missing_frames_{0}; + std::atomic first_tx_ns{0}; + std::atomic first_rx_ns{0}; + std::atomic last_rx_ns{0}; + std::atomic ttlb_seen{0}; + std::atomic ttlb_ns_sum{0}; + std::atomic first_ttlb_tx_ns{0}; + std::atomic last_ttlb_rx_ns{0}; + // paced‐probe (phase A) stats + std::atomic ttlb_spaced_ns_sum{0}; + std::atomic ttlb_spaced_seen {0}; + std::vector ttlb_spaced_samples_; + + // full‐speed (phase B) stats + std::atomic ttlb_full_ns_sum {0}; + std::atomic ttlb_full_seen {0}; + std::vector ttlb_full_samples_; + + std::vector ttlb_samples_; + void clearLatencySamples() { + ttlb_spaced_samples_.clear(); + ttlb_full_samples_.clear(); + } + const std::vector& getTtlbSpacedSamples() const { return ttlb_spaced_samples_; } + const std::vector& getTtlbFullSamples() const { return ttlb_full_samples_; } + + PerfReceiver(context::Context& ctx) + { + _kind = connection::Kind::receiver; + set_state(ctx, connection::State::configured); + } + + connection::Result on_establish(context::Context& ctx) override + { + set_state(ctx, connection::State::active); + return connection::Result::success; + } + + connection::Result on_shutdown(context::Context& ctx) override + { + return connection::Result::success; + } + + connection::Result on_receive(context::Context& /*ctx*/, + void* ptr, + uint32_t sz, + uint32_t& sent) override + { + static constexpr uint32_t TTLB_ITERS = 200; // frames per phase + + if (sz < sizeof(FrameHdr)) { + std::cerr << "[RX] Packet too small (" << sz << " B)\n"; + return connection::Result::error_bad_argument; + } + + // parse header + timestamps + auto* hdr = reinterpret_cast(ptr); + uint32_t frame = ntohl(hdr->frame); + uint64_t tx_ns = be64toh(hdr->tx_ns); + + uint64_t rx_ns = std::chrono::time_point_cast( + std::chrono::high_resolution_clock::now()) + .time_since_epoch() + .count(); + + // —————— TTLB book-keeping —————— + // Warmup phase: first 200 frames + if (frame < TTLB_ITERS) { + // ignore + } + // Phase A: paced probes (frames 0…199) + else if (frame < 2 * TTLB_ITERS) { + uint64_t dt = rx_ns - tx_ns; + ttlb_spaced_ns_sum.fetch_add(dt, std::memory_order_relaxed); + ttlb_spaced_seen.fetch_add(1, std::memory_order_relaxed); + ttlb_spaced_samples_.push_back(dt); + } + // Phase B: full-speed probes (frames 200…399) + else if (frame < 3 * TTLB_ITERS) { + uint64_t dt = rx_ns - tx_ns; + ttlb_full_ns_sum.fetch_add(dt, std::memory_order_relaxed); + ttlb_full_seen.fetch_add(1, std::memory_order_relaxed); + ttlb_full_samples_.push_back(dt); + } + + // —————— first / last arrival stamps (diagnostics) —————— + { + uint64_t exp = 0; + first_tx_ns.compare_exchange_strong(exp, tx_ns, std::memory_order_relaxed); + exp = 0; + first_rx_ns.compare_exchange_strong(exp, rx_ns, std::memory_order_relaxed); + last_rx_ns.store(rx_ns, std::memory_order_relaxed); + } + + // —————— loss / ordering tracking (unchanged) —————— + if (have_last_) { + if (frame == last_frame_ + 1) { + // in‐order + } else if (frame > last_frame_ + 1) { + uint32_t gap = frame - last_frame_ - 1; + missing_frames_.fetch_add(gap, std::memory_order_relaxed); + std::cerr << "[RX] Missing " << gap << " between " << last_frame_ << " and " + << frame << "\n"; + } else if (frame != last_frame_) { + std::cerr << "[RX] Out-of-order " << frame << " after " << last_frame_ << "\n"; + } + } else { + have_last_ = true; + } + last_frame_ = frame; + + // —————— count it and consume —————— + received_count.fetch_add(1, std::memory_order_relaxed); + sent = 0; + return connection::Result::success; + } + + connection::Result configure(context::Context& ctx) + { + set_state(ctx, connection::State::configured); + return connection::Result::success; + } +}; + +/** -------------------------------------------------------------------- + * RdmaRealEndpointsRxTest: + * - GTest fixture for the RX side + * ------------------------------------------------------------------ **/ +class RdmaRealEndpointsRxTest : public ::testing::Test { +protected: + context::Context ctx; + RdmaRx* conn_rx; + PerfReceiver* perf_rx; + libfabric_ctx* rx_dev_handle; + std::atomic keep_running; + + void SetUp() override + { + ctx = context::WithCancel(context::Background()); + + // Create RDMA receiver connection + conn_rx = new RdmaRx(); + perf_rx = new PerfReceiver(ctx); + rx_dev_handle = nullptr; + keep_running = true; + + // Adjust these for your environment: + // A larger transfer_size ensures the posted buffer is big enough + // for the largest messages you expect from the transmitter. + // A bigger queue_size can boost throughput by posting multiple receives. + mcm_conn_param rx_request = {}; + rx_request.type = is_rx; + rx_request.local_addr = {.ip = "192.168.2.30", .port = "9002"}; + // rx_request.local_addr = {.ip = "192.168.2.20", .port = "9002"}; + rx_request.payload_args.rdma_args.transfer_size = 3840ULL * 2160ULL * 4ULL; + rx_request.payload_args.rdma_args.queue_size = 64; + + // Configure & establish the RDMA Rx + ASSERT_EQ(conn_rx->configure(ctx, rx_request, rx_dev_handle), + connection::Result::success); + ASSERT_EQ(conn_rx->establish(ctx), connection::Result::success); + + // Configure & establish our PerfReceiver + ASSERT_EQ(perf_rx->configure(ctx), connection::Result::success); + ASSERT_EQ(perf_rx->establish(ctx), connection::Result::success); + + // Link the RDMA Rx to our PerfReceiver + conn_rx->set_link(ctx, perf_rx); + } + + void TearDown() override + { + keep_running = false; + ASSERT_EQ(conn_rx->shutdown(ctx), connection::Result::success); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + delete conn_rx; + delete perf_rx; + } +}; + +// --------------------------------------------------------------------------- +// RdmaRealEndpointsRxTest +// Matches the three-phase TX scheme: +// +// • first 200 normal-size frames → average **TTLB** +// • remaining frames (to 16 GiB) → throughput, no timing +// +// The averages are accumulated inside PerfReceiver with +// ttlb_seen / ttfb_ns_sum / ttlb_ns_sum +// --------------------------------------------------------------------------- +TEST_F(RdmaRealEndpointsRxTest, MultipleReception) +{ + /* --------------------------- test matrix ------------------------------ */ + const size_t payload_sizes[] = { + 568ULL * 320ULL * 4ULL, // 320p + 1280ULL * 720ULL * 4ULL, // 720p + 1920ULL * 1080ULL * 4ULL, // 1080p + 3840ULL * 2160ULL * 4ULL // 4K + }; + const int queue_sizes[] = {1, 4, 16}; + char* providers[] = {"tcp", "verbs"}; + const int endpoint_counts[] = {1, 2, 4}; + + const size_t TOTAL_STREAM_BYTES = 16ULL * 1024ULL * 1024ULL * 1024ULL; // 16 GiB + + /* --------------------------- constants -------------------------------- */ + static constexpr size_t TTLB_ITERS = 200; + + constexpr char TX_IP[] = "192.168.2.20"; + constexpr int METRICS_PORT = 9999; + + auto cpu_seconds = []() -> double { + rusage ru{}; getrusage(RUSAGE_SELF, &ru); + return ru.ru_utime.tv_sec + ru.ru_utime.tv_usec/1e6 + + ru.ru_stime.tv_sec + ru.ru_stime.tv_usec/1e6; + }; + + /* ===================================================================== */ + for (auto prov : providers) + for (int num_eps : endpoint_counts) + for (int qsz : queue_sizes) + for (size_t psz : payload_sizes) + { + if (prov == "tcp" && num_eps > 1) { + std::cerr << "[RX] ⚠ TCP provider does not support multiple endpoints\n"; + continue; + } + + /* ---- clean any prior connection --------------------------------- */ + if (conn_rx) { + EXPECT_EQ(conn_rx->shutdown(ctx), connection::Result::success); + delete conn_rx; conn_rx = nullptr; + } + + /* ---- create fresh RX endpoint ----------------------------------- */ + conn_rx = new RdmaRx(); + mcm_conn_param p{}; p.type = is_rx; + p.local_addr = {.ip = "192.168.2.30", .port = "9002"}; + p.payload_args.rdma_args.transfer_size = psz; + p.payload_args.rdma_args.queue_size = qsz; + p.payload_args.rdma_args.provider = prov; + p.payload_args.rdma_args.num_endpoints = num_eps; + + ASSERT_EQ(conn_rx->configure(ctx, p, rx_dev_handle), + connection::Result::success); + ASSERT_EQ(conn_rx->establish(ctx), connection::Result::success); + conn_rx->set_link(ctx, perf_rx); + + /* ---- reset per-run counters in PerfReceiver --------------------- */ + const size_t msgs_expected = + TOTAL_STREAM_BYTES/psz + 3 * TTLB_ITERS; // only TTLB probes + + perf_rx->received_count .store(0, std::memory_order_relaxed); + perf_rx->missing_frames_.store(0, std::memory_order_relaxed); + perf_rx->have_last_ = false; + + perf_rx->ttlb_full_ns_sum .store(0, std::memory_order_relaxed); + perf_rx->ttlb_full_seen .store(0, std::memory_order_relaxed); + perf_rx->ttlb_spaced_ns_sum.store(0, std::memory_order_relaxed); + perf_rx->ttlb_spaced_seen .store(0, std::memory_order_relaxed); + perf_rx->clearLatencySamples(); + + std::cout << "\n[RX] waiting for " << msgs_expected + << " msgs of " << (psz/1024/1024) << " MiB," + << " q" << qsz + << " Prov " << prov + << " #EP " << num_eps << " …\n"; + + /* ---- CPU-load anchors ------------------------------------------ */ + auto wall_start = std::chrono::steady_clock::now(); + double cpu_start = cpu_seconds(); + + /* ---- wait until all expected frames have arrived --------------- */ + while (perf_rx->received_count.load(std::memory_order_relaxed) + < msgs_expected) + { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + } + + /* ---- compute TTLB average --------------------------------------- */ + double spaced_avg_ms = + double(perf_rx->ttlb_spaced_ns_sum.load()) / perf_rx->ttlb_spaced_seen.load() / 1e6; + double full_avg_ms = + double(perf_rx->ttlb_full_ns_sum.load()) / perf_rx->ttlb_full_seen.load() / 1e6; + + auto wall_end = std::chrono::steady_clock::now(); + double wall_sec = std::chrono::duration(wall_end - wall_start).count(); + double cpu_pct = 100.0*(cpu_seconds() - cpu_start)/wall_sec; + + /* percentile helper */ + auto computePercentileMs = [&](std::vector v, double p) { + if (v.empty()) return 0.0; + size_t idx = size_t(p * (v.size() - 1)); + std::nth_element(v.begin(), v.begin() + idx, v.end()); + return double(v[idx]) / 1e6; + }; + + /* fetch TTLB samples */ + auto ttlb_samples = perf_rx->getTtlbFullSamples(); + + /* print TTLB percentiles */ + std::cout << "[RX] TTLB avg=" << full_avg_ms << " ms " + << "P25=" << computePercentileMs(ttlb_samples, 0.25) << " ms " + << "P50=" << computePercentileMs(ttlb_samples, 0.50) << " ms " + << "P90=" << computePercentileMs(ttlb_samples, 0.90) << " ms " + << "P99=" << computePercentileMs(ttlb_samples, 0.99) << " ms\n"; + + std::cout << "[RX] done " << (psz/1024/1024) + << " MiB,q" << qsz + << " missing=" << perf_rx->missing_frames_.load() + << " TTLB=" << full_avg_ms + << " ms CPU=" << cpu_pct << "%\n"; + + /* ---- send StatsMsg to the transmitter -------------------------- */ + StatsMsg sm{ + .payload_mb = static_cast(psz/(1024*1024)), + .queue = static_cast(qsz), + .ttlb_spaced_ms = spaced_avg_ms, + .ttlb_full_ms = full_avg_ms, + .cpu_tx_pct = 0.0, + .cpu_rx_pct = cpu_pct + }; + + int s = socket(AF_INET, SOCK_DGRAM, 0); + sockaddr_in tx{}; tx.sin_family = AF_INET; + tx.sin_port = htons(METRICS_PORT); + inet_pton(AF_INET, TX_IP, &tx.sin_addr); + sendto(s, &sm, sizeof(sm), 0, + reinterpret_cast(&tx), sizeof(tx)); + close(s); + } +} + +} // namespace connection +} // namespace mesh + +/** + * main() for the RX side test executable. + */ +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/media-proxy/tests/rdma_tx_test.cpp b/media-proxy/tests/rdma_tx_test.cpp new file mode 100644 index 000000000..fbe8c7d0a --- /dev/null +++ b/media-proxy/tests/rdma_tx_test.cpp @@ -0,0 +1,403 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +// Include your RDMA Tx and logging interfaces +#include "mesh/conn_rdma_tx.h" +#include "mesh/concurrency.h" +#include "logger.h" +#include +#include "metrics.h" +#include +#include +#include +#include + +using namespace mesh::log; + +namespace mesh { +namespace connection { + +/** Logging level (optional) **/ +Level log_level = Level::fatal; + +/** -------------------------------------------------------------------- + * EmulatedTransmitter: + * - Minimal transmitter that sends data. + * ------------------------------------------------------------------ **/ +class EmulatedTransmitter : public connection::Connection { +public: + EmulatedTransmitter(context::Context& ctx) { + _kind = connection::Kind::transmitter; + set_state(ctx, connection::State::configured); + } + + connection::Result on_establish(context::Context& ctx) override { + set_state(ctx, connection::State::active); + return connection::Result::success; + } + + connection::Result on_shutdown(context::Context& ctx) override { + return connection::Result::success; + } + + connection::Result configure(context::Context& ctx) { + set_state(ctx, connection::State::configured); + return connection::Result::success; + } + + connection::Result transmit_plaintext(context::Context& ctx, const void *ptr, size_t sz) { + // RDMA transmit expects a void*, so cast away const + return transmit(ctx, const_cast(ptr), sz); + } +}; + +/** -------------------------------------------------------------------- + * RdmaRealEndpointsTxTest: + * - GTest fixture for the TX side + * ------------------------------------------------------------------ **/ +class RdmaRealEndpointsTxTest : public ::testing::Test { +protected: + context::Context ctx; + RdmaTx *conn_tx; + EmulatedTransmitter *emulated_tx; + libfabric_ctx *tx_dev_handle; + std::atomic keep_running; + + void SetUp() override { + // We won't establish a connection here, because we want to mimic + // the single-machine approach of re-creating connections for each + // (payload_size, queue_size) pair. We'll do that in the test itself. + ctx = context::WithCancel(context::Background()); + conn_tx = nullptr; + emulated_tx = nullptr; + tx_dev_handle = nullptr; + keep_running = true; + } + + void TearDown() override { + // If a connection is active at the end, clean it up here + if (conn_tx || emulated_tx) { + CleanupRdmaConnectionsTx(); + } + } + +public: + /** + * SetupRdmaConnectionsTx: + * Similar to single-machine SetupRdmaConnections, but only + * creates and configures the TX side (since RX is on another machine). + */ + void SetupRdmaConnectionsTx(size_t payload_size, + int queue_size, + char* provider_name, + int num_endpoints) { + // Create new transmitter components + conn_tx = new RdmaTx(); + emulated_tx = new EmulatedTransmitter(ctx); + tx_dev_handle = nullptr; + + // Prepare connection params + mcm_conn_param tx_request = {}; + tx_request.type = is_tx; + tx_request.local_addr = {.ip = "192.168.2.20", .port = "9003"}; // adjust to your local + tx_request.remote_addr = {.ip = "192.168.2.30", .port = "9002"}; // the remote Rx + tx_request.payload_args.rdma_args.transfer_size = payload_size; + tx_request.payload_args.rdma_args.queue_size = queue_size; + tx_request.payload_args.rdma_args.provider = provider_name; + tx_request.payload_args.rdma_args.num_endpoints = num_endpoints; + + // Configure & establish + ASSERT_EQ(conn_tx->configure(ctx, tx_request, tx_dev_handle), connection::Result::success); + ASSERT_EQ(conn_tx->establish(ctx), connection::Result::success); + + // Configure & establish the emulated Tx + ASSERT_EQ(emulated_tx->configure(ctx), connection::Result::success); + ASSERT_EQ(emulated_tx->establish(ctx), connection::Result::success); + + // Link them + emulated_tx->set_link(ctx, conn_tx); + + keep_running = true; + } + + /** + * CleanupRdmaConnectionsTx: + * Mimic the single-machine CleanupRdmaConnections approach + */ + void CleanupRdmaConnectionsTx() { + keep_running = false; + // In single-machine code, there's an Rx as well; here we only handle Tx side + if (conn_tx) { + ASSERT_EQ(conn_tx->shutdown(ctx), connection::Result::success); + } + // Give some time for the shutdown + std::this_thread::sleep_for(std::chrono::milliseconds(2500)); + + delete conn_tx; + delete emulated_tx; + + conn_tx = nullptr; + emulated_tx = nullptr; + } +}; + +// /** +// * Test 1: Basic Transmission Example +// * - Sends a small amount of data 5 times. +// * (Adapted from simpler usage.) +// */ +// TEST_F(RdmaRealEndpointsTxTest, ConcurrentTransmission) +// { +// // Setup connections with some default size & queue +// SetupRdmaConnectionsTx(/*payload_size*/ 1024, /*queue_size*/ 8); + +// std::thread tx_thread([&]() { +// const char* test_data = "Hello RDMA World!"; +// size_t data_size = strlen(test_data) + 1; + +// for (int i = 0; i < 5 && keep_running; ++i) { +// auto res = emulated_tx->transmit_plaintext(ctx, test_data, data_size); +// EXPECT_EQ(res, connection::Result::success); +// std::cout << "[TX] Sent iteration #" << (i + 1) +// << " data: " << test_data << std::endl; +// std::this_thread::sleep_for(std::chrono::milliseconds(100)); +// } +// keep_running = false; +// }); + +// tx_thread.join(); + +// // Cleanup when done +// CleanupRdmaConnectionsTx(); +// } + +// --------------------------------------------------------------------------- +// RdmaRealEndpointsTxTest +// End-to-end matrix with +// • 200 × 16-byte frames → average **TTFB** +// • 200 × normal-size frames → average **TTLB** +// • 16 GiB raw stream → **Throughput + CPU-load** +// The RX side calculates the two latencies and ships them back in a +// single StatsMsg; the TX side prints the full table. +// --------------------------------------------------------------------------- +TEST_F(RdmaRealEndpointsTxTest, + LatencyAndBandwidthForVaryingPayloadSizesAndQueueSizes) +{ + /* ---------- test matrix ------------------------------------------------- */ + const size_t payload_sizes[] = {568ULL * 320ULL * 4ULL, // 320p + 1280ULL * 720ULL * 4ULL, // 720p + 1920ULL * 1080ULL * 4ULL, // 1080p + 3840ULL * 2160ULL * 4ULL}; // 4k + const int queue_sizes[] = {1, 4, 16}; + char* providers[] = { "tcp", "verbs" }; + const int endpoint_counts[] = { 1, 2, 4 }; + + const size_t TOTAL_STREAM_BYTES = 16ULL * 1024ULL * 1024ULL * 1024ULL; // 16 GiB + + /* ---------- latency-probe parameters ------------------------------------ */ + static constexpr size_t TTLB_ITERS = 200; // normal frames + + const char filler = 'A'; + + /* ---------- helpers ----------------------------------------------------- */ + constexpr int METRICS_PORT = 9999; + + auto cpu_seconds = []() -> double { + rusage ru{}; getrusage(RUSAGE_SELF, &ru); + return ru.ru_utime.tv_sec + ru.ru_utime.tv_usec/1e6 + + ru.ru_stime.tv_sec + ru.ru_stime.tv_usec/1e6; + }; + auto now_ns = []() -> uint64_t { + return std::chrono::time_point_cast( + std::chrono::high_resolution_clock::now()) + .time_since_epoch().count(); + }; + + /* ---------- UDP socket for StatsMsg ------------------------------------- */ + int stat_sock = socket(AF_INET, SOCK_DGRAM, 0); + sockaddr_in si{}; si.sin_family = AF_INET; + si.sin_addr.s_addr = INADDR_ANY; + si.sin_port = htons(METRICS_PORT); + bind(stat_sock, reinterpret_cast(&si), sizeof(si)); + timeval tv{.tv_sec = 5, .tv_usec = 0}; + setsockopt(stat_sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + + // { provider, num_endpoints, payload_mb, queue_size, + // ttlb_spaced_ms, ttlb_full_ms, throughput_gbps, + // cpu_tx_pct, cpu_rx_pct } + std::vector< + std::tuple< + std::string, // provider + int, // #endpoints + double, // payload size (MB) + int, // queue size + double, // TTLB @60 fps (ms) + double, // TTLB full-speed (ms) + double, // throughput (GB/s) + double, // CPU-TX (%) + double // CPU-RX (%) + > + > results; + + /* ======================================================================= */ + for (auto prov : providers) + for (int num_eps : endpoint_counts) + for (int qsz : queue_sizes) + for (size_t psz : payload_sizes) + { + if (prov == "tcp" && num_eps > 1) { + std::cerr << "[TX] ⚠ TCP provider does not support multiple endpoints\n"; + continue; // skip TCP with multiple endpoints + } + + std::cout << "\n[TX] payload " << (psz / 1024 / 1024) << " MB, queue " << qsz << " Prov " + << prov << " #EP " << num_eps << " …\n"; + sleep(1); + + /* ---- fresh RDMA connection --------------------------------------- */ + SetupRdmaConnectionsTx(psz, qsz, prov, num_eps); + + /* ---- pre-allocate buffers ---------------------------------- */ + + std::vector buf_big(psz, filler); // phase A + auto* hdr_big = reinterpret_cast(buf_big.data()); + + std::vector buf_raw(psz, filler); // phase B + auto* hdr_raw = reinterpret_cast(buf_raw.data()); + hdr_raw->tx_ns = 0; // no timestamp + + uint32_t frame = 0; + + /* ---------------------- Warmup Phase ------------------ */ + for (size_t i = 0; i < TTLB_ITERS; ++i) { + hdr_big->frame = htonl(frame++); + hdr_big->tx_ns = htobe64(now_ns()); + EXPECT_EQ(emulated_tx->transmit_plaintext(ctx, + buf_big.data(), buf_big.size()), + connection::Result::success); + } + + sleep(0.2); + + /* ---------------------- (A) normal-probe → TTLB ------------------ */ + for (size_t i = 0; i < TTLB_ITERS; ++i) { + hdr_big->frame = htonl(frame++); + hdr_big->tx_ns = htobe64(now_ns()); + EXPECT_EQ(emulated_tx->transmit_plaintext(ctx, + buf_big.data(), buf_big.size()), + connection::Result::success); + std::this_thread::sleep_for(std::chrono::milliseconds(16)); + } + + sleep(0.2); + + /* ---------------------- (B) normal-probe → TTLB ------------------ */ + for (size_t i = 0; i < TTLB_ITERS; ++i) { + hdr_big->frame = htonl(frame++); + hdr_big->tx_ns = htobe64(now_ns()); + EXPECT_EQ(emulated_tx->transmit_plaintext(ctx, + buf_big.data(), buf_big.size()), + connection::Result::success); + } + + sleep(0.2); + + /* ---------------------- (C) raw throughput ----------------------- */ + size_t sends_needed = TOTAL_STREAM_BYTES / psz; + auto thr_start = std::chrono::steady_clock::now(); + double cpu_start = cpu_seconds(); + + for (size_t i = 0; i < sends_needed; ++i) { + hdr_raw->frame = htonl(frame++); // still unique, no ts + EXPECT_EQ(emulated_tx->transmit_plaintext(ctx, + buf_raw.data(), buf_raw.size()), + connection::Result::success); + } + + sleep(0.2); + + auto thr_end = std::chrono::steady_clock::now(); + double cpu_end = cpu_seconds(); + double thr_sec = std::chrono::duration(thr_end - thr_start).count(); + double cpu_tx_pct = 100.0 * (cpu_end - cpu_start) / thr_sec; + + double total_gib = static_cast(psz) * sends_needed / + (1024.0*1024.0*1024.0); + double gbps = total_gib / thr_sec; + + /* ---- wait for StatsMsg from RX ----------------------------------- */ + StatsMsg sm{}; + sockaddr_in src{}; socklen_t slen = sizeof(src); + ssize_t n = recvfrom(stat_sock, &sm, sizeof(sm), 0, + reinterpret_cast(&src), &slen); + + if (n == sizeof(sm)) { + results.emplace_back( + std::string(prov), // provider + num_eps, // #endpoints + static_cast(psz)/(1024*1024), // payload MB + qsz, // queue + sm.ttlb_spaced_ms, sm.ttlb_full_ms, // TTLB + gbps, // throughput + cpu_tx_pct, sm.cpu_rx_pct); // CPU + + std::cout << "[TX]" << " ms ttlb @60fps=" << sm.ttlb_spaced_ms + << " ms ttlb @max=" << sm.ttlb_full_ms + << " ms thr=" << gbps << " GB/s CPU-TX=" << cpu_tx_pct + << "% CPU-RX=" << sm.cpu_rx_pct << "%\n"; + } else { + std::cerr << "[TX] ⚠ no StatsMsg for " + << (psz/1024/1024) << " MB,q" << qsz << '\n'; + } + + CleanupRdmaConnectionsTx(); + sleep(1); + } + /* ======================================================================= */ + + /* ---------- pretty-print table ----------------------------------------- */ + std::cout << "\n+----------+-----------+-------------------+-------------+-----------+--------------+--------------------+------------+------------+\n" + << "| Provider | #Endpoints| Payload Size (MB) | Queue Size | TTLB (ms) | TTLB (ms) | Maximum Throughput | CPU-TX (%) | CPU-RX (%) |\n" + << "| | | | | @60 fps | @max thr. | (GB/s) | (100% is 1 core) |\n" + << "+----------+-----------+-------------------+-------------+-----------+--------------+--------------------+------------+------------+\n"; + + for (const auto& r : results) { + std::cout << "| " << std::setw(8) << std::get<0>(r) // provider + << " | " << std::setw(9) << std::get<1>(r) // #endpoints + << " | " << std::setw(17) << std::fixed << std::setprecision(2) + << std::get<2>(r) // payload MB + << " | " << std::setw(11) << std::get<3>(r) // queue + << " | " << std::setw(9) << std::fixed << std::setprecision(3) + << std::get<4>(r) // TTLB spaced + << " | " << std::setw(12) << std::fixed << std::setprecision(3) + << std::get<5>(r) // TTLB full-speed + << " | " << std::setw(18) << std::fixed << std::setprecision(3) + << std::get<6>(r) // throughput + << " | " << std::setw(10) << std::fixed << std::setprecision(1) + << std::get<7>(r) // CPU-TX + << " | " << std::setw(10) << std::fixed << std::setprecision(1) + << std::get<8>(r) // CPU-RX + << " |\n"; + } + + std::cout << "+----------+-----------+-------------------+-------------+-----------+--------------+--------------------+------------+------------+\n"; + + +} + +} // namespace connection +} // namespace mesh + +/** + * main() for the TX side test executable. + */ +int main(int argc, char **argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/sdk/include/mcm_dp.h b/sdk/include/mcm_dp.h index b012def66..099191ddc 100644 --- a/sdk/include/mcm_dp.h +++ b/sdk/include/mcm_dp.h @@ -204,6 +204,8 @@ typedef struct { typedef struct { size_t transfer_size; int queue_size; + char *provider; + uint16_t num_endpoints; } mcm_rdma_args; typedef struct {