Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion media-proxy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ include_directories(
add_library(media_proxy_lib STATIC ${proxy_srcs} ${PROTO_SRCS} ${PROTO_HDRS} ${GRPC_SRCS}
${GRPC_HDRS})
target_link_libraries(media_proxy_lib PUBLIC m ${MTL_LIB} ${MEMIF_LIB} ${LIBFABRIC}
gRPC::grpc++ protobuf::libprotobuf)
gRPC::grpc++ protobuf::libprotobuf rdmacm)

add_executable(media_proxy ${proxy_srcs} src/media_proxy.cc)
target_link_libraries(media_proxy PRIVATE media_proxy_lib)
Expand Down
2 changes: 2 additions & 0 deletions media-proxy/include/libfabric_cq.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ extern "C" {
#endif

#include <rdma/fi_eq.h>
#include <stdbool.h>

/* forward declaration */
typedef struct ep_ctx_t ep_ctx_t;
Expand All @@ -30,6 +31,7 @@ typedef struct {
uint64_t cq_cntr;
int cq_fd;
int (*eq_read)(ep_ctx_t *ep_ctx, struct fi_cq_err_entry *entry, int timeout);
bool external;
} cq_ctx_t;

#ifdef UNIT_TESTS_ENABLED
Expand Down
23 changes: 23 additions & 0 deletions media-proxy/include/libfabric_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,35 @@ extern "C" {
} \
} while (0)

/**
* Kind
*
* Definition of connection kinds.
*/
typedef enum {
FI_KIND_UNDEFINED = 0,
FI_KIND_TRANSMITTER,
FI_KIND_RECEIVER,
} conn_kind;
typedef struct {
struct fid_fabric *fabric;
struct fid_domain *domain;
struct fi_info *info;
conn_kind kind;
const char *local_ip;
const char *remote_ip;
const char *local_port;
const char *remote_port;
bool is_initialized; // Indicates if context is fully initialized
int ep_attr_type; // Store EP type for endpoint creation
int addr_format; // Store address format for endpoint creation
const char *provider_name; // Provider name (e.g., "tcp", "verbs")
} libfabric_ctx;

static void rdma_free_res(libfabric_ctx *rdma_ctx);
static void rdma_freehints(struct fi_info *hints);
static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints);

#ifdef UNIT_TESTS_ENABLED
int rdma_init(libfabric_ctx **ctx);
/* Deinitialize RDMA */
Expand Down
1 change: 1 addition & 0 deletions media-proxy/include/libfabric_ep.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ typedef struct {
rdma_addr remote_addr;
rdma_addr local_addr;
enum direction dir;
struct fid_cq *shared_rx_cq;
} ep_cfg_t;

/**
Expand Down
13 changes: 12 additions & 1 deletion media-proxy/include/mesh/conn_rdma.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <cstddef>
#include <atomic>
#include <queue>
#include <array>

#ifndef RDMA_DEFAULT_TIMEOUT
#define RDMA_DEFAULT_TIMEOUT 1
Expand All @@ -27,6 +28,9 @@
#ifndef PAGE_SIZE
#define PAGE_SIZE 4096
#endif
#ifndef RDMA_NUM_EPS
#define RDMA_NUM_EPS 1
#endif

namespace mesh::connection {

Expand Down Expand Up @@ -79,14 +83,21 @@ class Rdma : public Connection {

// RDMA-specific members
libfabric_ctx *m_dev_handle; // RDMA device handle
ep_ctx_t *ep_ctx; // RDMA endpoint context
// Configurable number of RDMA endpoints (default RDMA_NUM_EPS)
size_t rdma_num_eps = RDMA_NUM_EPS;
// Configurable provider: "tcp" or "verbs"
std::string rdma_provider = "verbs"; // Default provider
// Endpoint contexts (one pointer per EP)
std::vector<ep_ctx_t*> ep_ctxs;
ep_cfg_t ep_cfg; // RDMA endpoint configuration
size_t trx_sz; // Data transfer size
bool init; // Indicates if RDMA is initialized
void *buffer_block; // Pointer to the allocated buffer block
int queue_size; // Number of buffers in the queue
static std::atomic<int> active_connections; // Number of active RDMA connections

static constexpr size_t TRAILER = sizeof(uint64_t); // Size of the trailer for sequence number

// Queue for managing buffers
std::queue<void *> buffer_queue; // Queue holding available buffers
std::mutex queue_mutex; // Mutex for buffer queue synchronization
Expand Down
4 changes: 4 additions & 0 deletions media-proxy/include/mesh/conn_rdma_rx.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class RdmaRx : public Rdma {
// Receive data using RDMA
void process_buffers_thread(context::Context& ctx);
void rdma_cq_thread(context::Context& ctx);
std::atomic<uint32_t> next_rx_idx;
static constexpr size_t REORDER_WINDOW = 256; // > max expected out-of-order
std::array<void *, REORDER_WINDOW> reorder_ring{{nullptr}};
uint64_t reorder_head = UINT64_MAX;
};

} // namespace mesh::connection
Expand Down
3 changes: 3 additions & 0 deletions media-proxy/include/mesh/conn_rdma_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class RdmaTx : public Rdma {
protected:
virtual Result start_threads(context::Context& ctx);
void rdma_cq_thread(context::Context& ctx);
// one 64-bit counter shared by all RdmaTx
inline static std::atomic<uint64_t> global_seq{0};
std::atomic<uint32_t> next_tx_idx;
};

} // namespace mesh::connection
Expand Down
209 changes: 178 additions & 31 deletions media-proxy/src/libfabric_dev.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <getopt.h>

#include <rdma/fi_cm.h>
#include <rdma/rdma_verbs.h>
#include <rdma/rdma_cma.h>

#include "libfabric_dev.h"

Expand All @@ -23,6 +25,15 @@ static void rdma_freehints(struct fi_info *hints)
if (!hints)
return;

if (hints->tx_attr) {
free(hints->tx_attr);
hints->tx_attr = NULL;
}
if (hints->rx_attr) {
free(hints->rx_attr);
hints->rx_attr = NULL;
}

if (hints->domain_attr->name) {
free(hints->domain_attr->name);
hints->domain_attr->name = NULL;
Expand Down Expand Up @@ -53,7 +64,7 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints)
{
int ret;

ret = fi_getinfo(FI_VERSION(1, 21), NULL, NULL, 0, hints, &rdma_ctx->info);
ret = fi_getinfo(FI_VERSION(2, 0), NULL, NULL, 0, hints, &rdma_ctx->info);
if (ret) {
RDMA_PRINTERR("fi_getinfo", ret);
return ret;
Expand All @@ -65,19 +76,19 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints)
return ret;
}

ret = fi_domain(rdma_ctx->fabric, rdma_ctx->info, &rdma_ctx->domain, NULL);
if (ret) {
RDMA_PRINTERR("fi_domain", ret);
return ret;
}

/* TODO: future improvement: Add and monitor EQ to catch errors.
* ret = fi_eq_open(rdma_ctx->fabric, &eq_attr, &eq, NULL);
* if (ret) {
* RDMA_PRINTERR("fi_eq_open", ret);
* return ret;
* } */

ret = fi_domain(rdma_ctx->fabric, rdma_ctx->info, &rdma_ctx->domain, NULL);
if (ret) {
RDMA_PRINTERR("fi_domain", ret);
return ret;
}

/* TODO: future improvement: Add and monitor EQ to catch errors.
* ret = fi_domain_bind(rdma_ctx->domain, &eq->fid, 0);
* if (ret) {
Expand All @@ -90,42 +101,178 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints)

int rdma_init(libfabric_ctx **ctx)
{
struct fi_info *hints;
int op, ret = 0;
*ctx = calloc(1, sizeof(libfabric_ctx));
if (*ctx == NULL) {
RDMA_PRINTERR("calloc", -ENOMEM);
return -ENOMEM;
printf("[rdma_init] Entering function\n");
struct fi_info *hints = NULL;
struct rdma_addrinfo *rai_res = NULL;
struct rdma_addrinfo rai_hints;
int ret = 0;

if (!ctx || !(*ctx)) {
printf("[rdma_init] ERROR: ctx is NULL\n");
return -EINVAL;
}

// Clear any previous initialization
if ((*ctx)->fabric || (*ctx)->domain || (*ctx)->info) {
printf("[rdma_init] WARNING: Context already contains initialized resources\n");
rdma_free_res(*ctx);
}

memset(&rai_hints, 0, sizeof(rai_hints));
printf("[rdma_init] Initialized rai_hints to zero\n");

/* Allocate fi_info hints */
hints = fi_allocinfo();
if (!hints) {
RDMA_PRINTERR("fi_allocinfo", ret);
libfabric_dev_ops.rdma_deinit(ctx);
printf("[rdma_init] ERROR: fi_allocinfo failed\n");
return -ENOMEM;
}
printf("[rdma_init] fi_allocinfo returned valid hints at %p\n", hints);

hints->domain_attr->mr_mode =
FI_MR_LOCAL | FI_MR_ENDPOINT | (FI_MR_ALLOCATED | FI_MR_PROV_KEY | FI_MR_VIRT_ADDR);
hints->ep_attr->type = FI_EP_RDM;
hints->caps = FI_MSG;
hints->addr_format = FI_SOCKADDR_IN;
hints->fabric_attr->prov_name = strdup(LIB_FABRIC_ATTR_PROV_NAME_TCP);
hints->tx_attr->tclass = FI_TC_BULK_DATA;
hints->domain_attr->resource_mgmt = FI_RM_ENABLED;
hints->mode = FI_OPT_ENDPOINT;
hints->tx_attr->size = 32; // Transmit queue size
hints->rx_attr->size = 32; // Receive queue size
if ((*ctx)->provider_name && strcmp((*ctx)->provider_name, "verbs") == 0) {
hints->fabric_attr->prov_name = strdup(LIB_FABRIC_ATTR_PROV_NAME_VERBS);
hints->ep_attr->type = FI_EP_RDM; // Reliable datagram
hints->ep_attr->protocol = FI_PROTO_RXM;
hints->addr_format = FI_SOCKADDR_IN; // IPv4
hints->domain_attr->av_type = FI_AV_UNSPEC;
hints->domain_attr->mr_mode =
FI_MR_LOCAL | FI_MR_ALLOCATED | FI_MR_VIRT_ADDR | FI_MR_PROV_KEY;
hints->domain_attr->data_progress = FI_PROGRESS_AUTO; // Use auto progress

ret = rdma_init_fabric(*ctx, hints);
rdma_freehints(hints);
/* Adjust capabilities based on the direction */
if ((*ctx)->kind == FI_KIND_RECEIVER) {
hints->caps = FI_MSG | FI_RECV | FI_RMA | FI_REMOTE_READ | FI_LOCAL_COMM;
if (hints->rx_attr) {
hints->rx_attr->size = 1024; // Larger receive queue
}
} else {
hints->caps = FI_MSG | FI_SEND | FI_RMA | FI_REMOTE_WRITE | FI_LOCAL_COMM;
if (hints->tx_attr) {
hints->tx_attr->size = 1024; // Larger send queue
}
}
} else {
hints->domain_attr->mr_mode =
FI_MR_LOCAL | FI_MR_VIRT_ADDR;
hints->ep_attr->type = FI_EP_RDM;
hints->caps = FI_MSG;
hints->addr_format = FI_SOCKADDR_IN;
hints->fabric_attr->prov_name = strdup(LIB_FABRIC_ATTR_PROV_NAME_TCP);
hints->tx_attr->tclass = FI_TC_BULK_DATA;
hints->domain_attr->resource_mgmt = FI_RM_ENABLED;
hints->mode = FI_OPT_ENDPOINT;
hints->tx_attr->size = 1024; // Transmit queue size
hints->rx_attr->size = 1024; // Receive queue size
hints->domain_attr->threading = FI_THREAD_UNSPEC;
}

/* Store configuration for later endpoint use */
(*ctx)->is_initialized = false;
(*ctx)->ep_attr_type = hints->ep_attr->type;
(*ctx)->addr_format = hints->addr_format;

/* Adjust capabilities based on the connection kind */
if ((*ctx)->kind == FI_KIND_RECEIVER) {
/* For RX, resolve the local address */
ret = rdma_getaddrinfo((*ctx)->local_ip, (*ctx)->local_port, &rai_hints, &rai_res);
if (ret) {
printf("[rdma_init] ERROR: rdma_getaddrinfo (RX) returned %d\n", ret);
goto err;
}

hints->src_addr = malloc(rai_res->ai_src_len);
if (!hints->src_addr) {
ret = -ENOMEM;
goto err;
}

// Set the FI_SOURCE flag to tell provider to use src_addr for binding
hints->caps |= FI_SOURCE;

memcpy(hints->src_addr, rai_res->ai_src_addr, rai_res->ai_src_len);
hints->src_addrlen = rai_res->ai_src_len;
} else {
/* For TX, resolve the remote address */
ret = rdma_getaddrinfo((*ctx)->remote_ip, (*ctx)->remote_port, &rai_hints, &rai_res);
if (ret) {
printf("[rdma_init] ERROR: rdma_getaddrinfo (TX) returned %d\n", ret);
goto err;
}

// Handle source address
if (rai_res->ai_src_len > 0) {
hints->src_addr = malloc(rai_res->ai_src_len);
if (!hints->src_addr) {
ret = -ENOMEM;
goto err;
}
memcpy(hints->src_addr, rai_res->ai_src_addr, rai_res->ai_src_len);
hints->src_addrlen = rai_res->ai_src_len;
}

// Handle destination address
if (rai_res->ai_dst_len > 0) {
hints->dest_addr = malloc(rai_res->ai_dst_len);
if (!hints->dest_addr) {
ret = -ENOMEM;
goto err;
}
memcpy(hints->dest_addr, rai_res->ai_dst_addr, rai_res->ai_dst_len);
hints->dest_addrlen = rai_res->ai_dst_len;
}
}

/* Create fabric info, domain and fabric */
// For RX, pass hostname and port to enforce binding
if ((*ctx)->kind == FI_KIND_RECEIVER) {
ret = fi_getinfo(FI_VERSION(2, 0), (*ctx)->local_ip, (*ctx)->local_port,
FI_SOURCE, hints, &(*ctx)->info);
} else {
// For TX, use the remote address info to force connectivity to specific endpoint
printf("[rdma_init] TX - Enforcing connection to remote %s:%s\n", (*ctx)->remote_ip,
(*ctx)->remote_port);

ret = fi_getinfo(FI_VERSION(2, 0), (*ctx)->remote_ip, (*ctx)->remote_port, 0, hints,
&(*ctx)->info);
}
if (ret) {
libfabric_dev_ops.rdma_deinit(ctx);
ERROR("Failed to initialize RDMA device");
return ret;
fprintf(stderr, "[rdma_init] ERROR: fi_getinfo returned %d (%s)\n", ret, fi_strerror(-ret));
goto err;
}

ret = fi_fabric((*ctx)->info->fabric_attr, &(*ctx)->fabric, NULL);
if (ret) {
printf("[rdma_init] ERROR: fi_fabric returned %d\n", ret);
goto err_info;
}

ret = fi_domain((*ctx)->fabric, (*ctx)->info, &(*ctx)->domain, NULL);
if (ret) {
printf("[rdma_init] ERROR: fi_domain returned %d\n", ret);
goto err_fabric;
}

/* Clean up temporary resources */
rdma_freeaddrinfo(rai_res);
rdma_freehints(hints); // We've already stored the info in ctx->info

(*ctx)->is_initialized = true;
printf("[rdma_init] Device context successfully initialized\n");
return 0;

err_fabric:
fi_close(&(*ctx)->fabric->fid);

err_info:
fi_freeinfo((*ctx)->info);
free(*ctx);
*ctx = NULL;

err:
rdma_freeaddrinfo(rai_res);
rdma_freehints(hints);
fprintf(stderr, "[rdma_init] Failed with error %d\n", ret);
return ret;
}

static void rdma_free_res(libfabric_ctx *rdma_ctx)
Expand Down
Loading
Loading