Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion media-proxy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ include_directories(
add_library(media_proxy_lib STATIC ${proxy_srcs} ${PROTO_SRCS} ${PROTO_HDRS} ${GRPC_SRCS}
${GRPC_HDRS})
target_link_libraries(media_proxy_lib PUBLIC m ${MTL_LIB} ${MEMIF_LIB} ${LIBFABRIC}
gRPC::grpc++ protobuf::libprotobuf)
gRPC::grpc++ protobuf::libprotobuf rdmacm)

add_executable(media_proxy ${proxy_srcs} src/media_proxy.cc)
target_link_libraries(media_proxy PRIVATE media_proxy_lib)
Expand Down
2 changes: 2 additions & 0 deletions media-proxy/include/libfabric_cq.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ extern "C" {
#endif

#include <rdma/fi_eq.h>
#include <stdbool.h>

/* forward declaration */
typedef struct ep_ctx_t ep_ctx_t;
Expand All @@ -30,6 +31,7 @@ typedef struct {
uint64_t cq_cntr;
int cq_fd;
int (*eq_read)(ep_ctx_t *ep_ctx, struct fi_cq_err_entry *entry, int timeout);
bool external;
} cq_ctx_t;

#ifdef UNIT_TESTS_ENABLED
Expand Down
23 changes: 23 additions & 0 deletions media-proxy/include/libfabric_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,35 @@ extern "C" {
} \
} while (0)

/**
* Kind
*
* Definition of connection kinds.
*/
typedef enum {
KIND_UNDEFINED = 0,
KIND_TRANSMITTER,
KIND_RECEIVER,
} conn_kind;
typedef struct {
struct fid_fabric *fabric;
struct fid_domain *domain;
struct fi_info *info;
conn_kind kind;
const char *local_ip;
const char *remote_ip;
const char *local_port;
const char *remote_port;
bool is_initialized; // Indicates if context is fully initialized
int ep_attr_type; // Store EP type for endpoint creation
int addr_format; // Store address format for endpoint creation
const char *provider_name; // Provider name (e.g., "tcp", "verbs")
} libfabric_ctx;

static void rdma_free_res(libfabric_ctx *rdma_ctx);
static void rdma_freehints(struct fi_info *hints);
static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints);

#ifdef UNIT_TESTS_ENABLED
int rdma_init(libfabric_ctx **ctx);
/* Deinitialize RDMA */
Expand Down
1 change: 1 addition & 0 deletions media-proxy/include/libfabric_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ typedef struct {
rdma_addr remote_addr;
rdma_addr local_addr;
enum direction dir;
struct fid_cq *shared_rx_cq;
} ep_cfg_t;

/**
Expand Down
18 changes: 17 additions & 1 deletion media-proxy/include/mesh/conn_rdma.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <cstddef>
#include <atomic>
#include <queue>
#include <array>

#ifndef RDMA_DEFAULT_TIMEOUT
#define RDMA_DEFAULT_TIMEOUT 1
Expand All @@ -27,6 +28,9 @@
#ifndef PAGE_SIZE
#define PAGE_SIZE 4096
#endif
#ifndef RDMA_NUM_EPS
#define RDMA_NUM_EPS 1
#endif

namespace mesh::connection {

Expand Down Expand Up @@ -79,14 +83,26 @@ class Rdma : public Connection {

// RDMA-specific members
libfabric_ctx *m_dev_handle; // RDMA device handle
ep_ctx_t *ep_ctx; // RDMA endpoint context
// Configurable number of RDMA endpoints (default RDMA_NUM_EPS)
size_t rdma_num_eps = RDMA_NUM_EPS;
// Configurable provider: "tcp" or "verbs"
std::string rdma_provider = "verbs"; // Default provider
// Endpoint contexts (one pointer per EP)
std::vector<ep_ctx_t*> ep_ctxs;
std::atomic<uint32_t> next_tx_idx;
std::atomic<uint32_t> next_rx_idx;
ep_cfg_t ep_cfg; // RDMA endpoint configuration
size_t trx_sz; // Data transfer size
bool init; // Indicates if RDMA is initialized
void *buffer_block; // Pointer to the allocated buffer block
int queue_size; // Number of buffers in the queue
static std::atomic<int> active_connections; // Number of active RDMA connections

static constexpr size_t REORDER_WINDOW = 256; // > max expected out-of-order
std::array<void*,REORDER_WINDOW> reorder_ring{{nullptr}};
uint64_t reorder_head = UINT64_MAX;
static constexpr size_t TRAILER = sizeof(uint64_t); // Size of the trailer for sequence number

// Queue for managing buffers
std::queue<void *> buffer_queue; // Queue holding available buffers
std::mutex queue_mutex; // Mutex for buffer queue synchronization
Expand Down
2 changes: 2 additions & 0 deletions media-proxy/include/mesh/conn_rdma_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class RdmaTx : public Rdma {
protected:
virtual Result start_threads(context::Context& ctx);
void rdma_cq_thread(context::Context& ctx);
// one 64-bit counter shared by all RdmaTx
inline static std::atomic<uint64_t> global_seq{0};
};

} // namespace mesh::connection
Expand Down
210 changes: 178 additions & 32 deletions media-proxy/src/libfabric_dev.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <getopt.h>

#include <rdma/fi_cm.h>
#include <rdma/rdma_verbs.h>
#include <rdma/rdma_cma.h>

#include "libfabric_dev.h"

Expand Down Expand Up @@ -53,7 +55,7 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints)
{
int ret;

ret = fi_getinfo(FI_VERSION(1, 21), NULL, NULL, 0, hints, &rdma_ctx->info);
ret = fi_getinfo(FI_VERSION(2, 0), NULL, NULL, 0, hints, &rdma_ctx->info);
if (ret) {
RDMA_PRINTERR("fi_getinfo", ret);
return ret;
Expand All @@ -65,19 +67,19 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints)
return ret;
}

ret = fi_domain(rdma_ctx->fabric, rdma_ctx->info, &rdma_ctx->domain, NULL);
if (ret) {
RDMA_PRINTERR("fi_domain", ret);
return ret;
}

/* TODO: future improvement: Add and monitor EQ to catch errors.
* ret = fi_eq_open(rdma_ctx->fabric, &eq_attr, &eq, NULL);
* if (ret) {
* RDMA_PRINTERR("fi_eq_open", ret);
* return ret;
* } */

ret = fi_domain(rdma_ctx->fabric, rdma_ctx->info, &rdma_ctx->domain, NULL);
if (ret) {
RDMA_PRINTERR("fi_domain", ret);
return ret;
}

/* TODO: future improvement: Add and monitor EQ to catch errors.
* ret = fi_domain_bind(rdma_ctx->domain, &eq->fid, 0);
* if (ret) {
Expand All @@ -90,42 +92,186 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints)

int rdma_init(libfabric_ctx **ctx)
{
struct fi_info *hints;
int op, ret = 0;
*ctx = calloc(1, sizeof(libfabric_ctx));
if (*ctx == NULL) {
RDMA_PRINTERR("calloc", -ENOMEM);
return -ENOMEM;
printf("[rdma_init] Entering function\n");
struct fi_info *hints = NULL;
struct rdma_addrinfo *rai_res = NULL;
struct rdma_addrinfo rai_hints;
int ret = 0;

if (!ctx || !(*ctx)) {
printf("[rdma_init] ERROR: ctx is NULL\n");
return -EINVAL;
}

// Clear any previous initialization
if ((*ctx)->fabric || (*ctx)->domain || (*ctx)->info) {
printf("[rdma_init] WARNING: Context already contains initialized resources\n");
rdma_free_res(*ctx);
}

memset(&rai_hints, 0, sizeof(rai_hints));
printf("[rdma_init] Initialized rai_hints to zero\n");

/* Allocate fi_info hints */
hints = fi_allocinfo();
if (!hints) {
RDMA_PRINTERR("fi_allocinfo", ret);
libfabric_dev_ops.rdma_deinit(ctx);
printf("[rdma_init] ERROR: fi_allocinfo failed\n");
return -ENOMEM;
}
printf("[rdma_init] fi_allocinfo returned valid hints at %p\n", hints);

if ((*ctx)->provider_name && strcmp((*ctx)->provider_name, "verbs") == 0) {
hints->fabric_attr->prov_name = strdup(LIB_FABRIC_ATTR_PROV_NAME_VERBS);
hints->ep_attr->type = FI_EP_RDM; // Reliable datagram
hints->ep_attr->protocol = FI_PROTO_RXM;
hints->addr_format = FI_SOCKADDR_IN; // IPv4
hints->domain_attr->av_type = FI_AV_UNSPEC;
hints->domain_attr->mr_mode =
FI_MR_LOCAL | FI_MR_ALLOCATED | FI_MR_VIRT_ADDR | FI_MR_PROV_KEY;
hints->domain_attr->data_progress = FI_PROGRESS_AUTO; // Use auto progress

/* Adjust capabilities based on the direction */
if ((*ctx)->kind == KIND_RECEIVER) {
hints->caps = FI_MSG | FI_RECV | FI_RMA | FI_REMOTE_READ | FI_LOCAL_COMM;
hints->rx_attr = calloc(1, sizeof(struct fi_rx_attr));
if (hints->rx_attr) {
hints->rx_attr->size = 1024; // Larger receive queue
}
} else {
hints->caps = FI_MSG | FI_SEND | FI_RMA | FI_REMOTE_WRITE | FI_LOCAL_COMM;
hints->tx_attr = calloc(1, sizeof(struct fi_tx_attr));
if (hints->tx_attr) {
hints->tx_attr->size = 1024; // Larger send queue
}
}
} else {
hints->domain_attr->mr_mode =
FI_MR_LOCAL | FI_MR_VIRT_ADDR;
hints->ep_attr->type = FI_EP_RDM;
hints->caps = FI_MSG;
hints->addr_format = FI_SOCKADDR_IN;
hints->fabric_attr->prov_name = strdup(LIB_FABRIC_ATTR_PROV_NAME_TCP);
hints->tx_attr->tclass = FI_TC_BULK_DATA;
hints->domain_attr->resource_mgmt = FI_RM_ENABLED;
hints->mode = FI_OPT_ENDPOINT;
hints->tx_attr->size = 1024; // Transmit queue size
hints->rx_attr->size = 1024; // Receive queue size
hints->domain_attr->threading = FI_THREAD_UNSPEC;
}

hints->domain_attr->mr_mode =
FI_MR_LOCAL | FI_MR_ENDPOINT | (FI_MR_ALLOCATED | FI_MR_PROV_KEY | FI_MR_VIRT_ADDR);
hints->ep_attr->type = FI_EP_RDM;
hints->caps = FI_MSG;
hints->addr_format = FI_SOCKADDR_IN;
hints->fabric_attr->prov_name = strdup(LIB_FABRIC_ATTR_PROV_NAME_TCP);
hints->tx_attr->tclass = FI_TC_BULK_DATA;
hints->domain_attr->resource_mgmt = FI_RM_ENABLED;
hints->mode = FI_OPT_ENDPOINT;
hints->tx_attr->size = 32; // Transmit queue size
hints->rx_attr->size = 32; // Receive queue size

ret = rdma_init_fabric(*ctx, hints);
rdma_freehints(hints);
/* Store configuration for later endpoint use */
(*ctx)->is_initialized = false;
(*ctx)->ep_attr_type = hints->ep_attr->type;
(*ctx)->addr_format = hints->addr_format;

/* Adjust capabilities based on the connection kind */
if ((*ctx)->kind == KIND_RECEIVER) {
/* For RX, resolve the local address */
ret = rdma_getaddrinfo((*ctx)->local_ip, (*ctx)->local_port, &rai_hints, &rai_res);
if (ret) {
printf("[rdma_init] ERROR: rdma_getaddrinfo (RX) returned %d\n", ret);
goto err;
}

hints->src_addr = malloc(rai_res->ai_src_len);
if (!hints->src_addr) {
ret = -ENOMEM;
goto err;
}

// Set the FI_SOURCE flag to tell provider to use src_addr for binding
hints->caps |= FI_SOURCE;

memcpy(hints->src_addr, rai_res->ai_src_addr, rai_res->ai_src_len);
hints->src_addrlen = rai_res->ai_src_len;
} else {
/* For TX, resolve the remote address */
ret = rdma_getaddrinfo((*ctx)->remote_ip, (*ctx)->remote_port, &rai_hints, &rai_res);
if (ret) {
printf("[rdma_init] ERROR: rdma_getaddrinfo (TX) returned %d\n", ret);
goto err;
}

// Handle source address
if (rai_res->ai_src_len > 0) {
hints->src_addr = malloc(rai_res->ai_src_len);
if (!hints->src_addr) {
ret = -ENOMEM;
goto err;
}
memcpy(hints->src_addr, rai_res->ai_src_addr, rai_res->ai_src_len);
hints->src_addrlen = rai_res->ai_src_len;
}

// Handle destination address
if (rai_res->ai_dst_len > 0) {
hints->dest_addr = malloc(rai_res->ai_dst_len);
if (!hints->dest_addr) {
ret = -ENOMEM;
goto err;
}
memcpy(hints->dest_addr, rai_res->ai_dst_addr, rai_res->ai_dst_len);
hints->dest_addrlen = rai_res->ai_dst_len;
}
}

/* Create fabric info, domain and fabric */
// For RX, pass hostname and port to enforce binding
if ((*ctx)->kind == KIND_RECEIVER) {
ret = fi_getinfo(FI_VERSION(2, 0), (*ctx)->local_ip, (*ctx)->local_port,
FI_SOURCE, hints, &(*ctx)->info);
} else {
// For TX, use the remote address info to force connectivity to specific endpoint
printf("[rdma_init] TX - Enforcing connection to remote %s:%s\n", (*ctx)->remote_ip,
(*ctx)->remote_port);

ret = fi_getinfo(FI_VERSION(2, 0), (*ctx)->remote_ip, (*ctx)->remote_port, 0, hints,
&(*ctx)->info);
}
if (ret) {
libfabric_dev_ops.rdma_deinit(ctx);
ERROR("Failed to initialize RDMA device");
return ret;
fprintf(stderr, "[rdma_init] ERROR: fi_getinfo returned %d (%s)\n", ret, fi_strerror(-ret));
goto err;
}

ret = fi_fabric((*ctx)->info->fabric_attr, &(*ctx)->fabric, NULL);
if (ret) {
printf("[rdma_init] ERROR: fi_fabric returned %d\n", ret);
goto err_info;
}

ret = fi_domain((*ctx)->fabric, (*ctx)->info, &(*ctx)->domain, NULL);
if (ret) {
printf("[rdma_init] ERROR: fi_domain returned %d\n", ret);
goto err_fabric;
}

/* Clean up temporary resources */
rdma_freeaddrinfo(rai_res);
rdma_freehints(hints); // We've already stored the info in ctx->info

(*ctx)->is_initialized = true;
printf("[rdma_init] Device context successfully initialized\n");
return 0;

err_fabric:
fi_close(&(*ctx)->fabric->fid);
(*ctx)->fabric = NULL;

err_info:
fi_freeinfo((*ctx)->info);
(*ctx)->info = NULL;
free(*ctx);
*ctx = NULL;

err:
if (rai_res) {
rdma_freeaddrinfo(rai_res);
}
if (hints) {
rdma_freehints(hints);
}
fprintf(stderr, "[rdma_init] Failed with error %d\n", ret);
return ret;
}

static void rdma_free_res(libfabric_ctx *rdma_ctx)
Expand Down
Loading
Loading