Skip to content

Commit 81e563b

Browse files
authored
Refactor and enhance RDMA implementation (verbs and tcp providers) (#400)
* Refactor and enhance RDMA implementation (verbs and tcp providers) - Introduced a new test suite for libfabric development, including comprehensive tests for RDMA initialization and teardown. - Added custom fake functions for RDMA address info handling to improve test isolation and reliability. - Created a new header file for libfabric development tests to encapsulate test setup and teardown logic. - Implemented performance testing for RDMA receiver and transmitter, focusing on TTLB and bandwidth metrics across varying payload and queue sizes. - Enhanced the mcm_rdma_args structure to include provider and number of endpoints for better configuration flexibility. - Updated existing tests to remove unnecessary calls to fi_getinfo, ensuring tests focus on relevant functionality. - Improved memory management in tests to prevent leaks and ensure proper cleanup of allocated resources. * Remove unused reorder buffer and window size constants from RDMA processing threads * Refactor RDMA connection handling and enhance performance metrics - Moved variables `next_tx_idx` and `next_rx_idx` from `Rdma` constructor to tx and rx respectively. - Updated default value for `rdma_num_eps` to 1 in `Rdma::configure` method. - Changed endpoint kind definitions from `KIND_RECEIVER` and `KIND_TRANSMITTER` to `FI_KIND_RECEIVER` and `FI_KIND_TRANSMITTER`. - Introduced cleanup helper function to manage memory for endpoint clones in `Rdma::on_establish`. - Improved error handling and resource cleanup during RDMA endpoint initialization. - Added `metrics.h` to define structures for wire headers and statistics messages. - Enhanced `PerfReceiver` class to track latency and throughput metrics. - Updated RDMA receiver and transmitter tests to incorporate new metrics and validate performance. - Implemented a new test structure for measuring latency and bandwidth across varying payload sizes and queue sizes. * Fix memory leak by freeing duplicated RDMA provider string in create_bridge * Rename KIND_UNDEFINED to FI_KIND_UNDEFINED for consistency and clarity in connection kind enumeration. Remove unnecessary double allocation of certain attributes in hints.
1 parent 63087fe commit 81e563b

22 files changed

+2046
-481
lines changed

media-proxy/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ include_directories(
139139
add_library(media_proxy_lib STATIC ${proxy_srcs} ${PROTO_SRCS} ${PROTO_HDRS} ${GRPC_SRCS}
140140
${GRPC_HDRS})
141141
target_link_libraries(media_proxy_lib PUBLIC m ${MTL_LIB} ${MEMIF_LIB} ${LIBFABRIC}
142-
gRPC::grpc++ protobuf::libprotobuf)
142+
gRPC::grpc++ protobuf::libprotobuf rdmacm)
143143

144144
add_executable(media_proxy ${proxy_srcs} src/media_proxy.cc)
145145
target_link_libraries(media_proxy PRIVATE media_proxy_lib)

media-proxy/include/libfabric_cq.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ extern "C" {
1212
#endif
1313

1414
#include <rdma/fi_eq.h>
15+
#include <stdbool.h>
1516

1617
/* forward declaration */
1718
typedef struct ep_ctx_t ep_ctx_t;
@@ -30,6 +31,7 @@ typedef struct {
3031
uint64_t cq_cntr;
3132
int cq_fd;
3233
int (*eq_read)(ep_ctx_t *ep_ctx, struct fi_cq_err_entry *entry, int timeout);
34+
bool external;
3335
} cq_ctx_t;
3436

3537
#ifdef UNIT_TESTS_ENABLED

media-proxy/include/libfabric_dev.h

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,35 @@ extern "C" {
115115
} \
116116
} while (0)
117117

118+
/**
119+
* Kind
120+
*
121+
* Definition of connection kinds.
122+
*/
123+
typedef enum {
124+
FI_KIND_UNDEFINED = 0,
125+
FI_KIND_TRANSMITTER,
126+
FI_KIND_RECEIVER,
127+
} conn_kind;
118128
typedef struct {
119129
struct fid_fabric *fabric;
120130
struct fid_domain *domain;
121131
struct fi_info *info;
132+
conn_kind kind;
133+
const char *local_ip;
134+
const char *remote_ip;
135+
const char *local_port;
136+
const char *remote_port;
137+
bool is_initialized; // Indicates if context is fully initialized
138+
int ep_attr_type; // Store EP type for endpoint creation
139+
int addr_format; // Store address format for endpoint creation
140+
const char *provider_name; // Provider name (e.g., "tcp", "verbs")
122141
} libfabric_ctx;
123142

143+
static void rdma_free_res(libfabric_ctx *rdma_ctx);
144+
static void rdma_freehints(struct fi_info *hints);
145+
static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints);
146+
124147
#ifdef UNIT_TESTS_ENABLED
125148
int rdma_init(libfabric_ctx **ctx);
126149
/* Deinitialize RDMA */

media-proxy/include/libfabric_ep.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ typedef struct {
4343
rdma_addr remote_addr;
4444
rdma_addr local_addr;
4545
enum direction dir;
46+
struct fid_cq *shared_rx_cq;
4647
} ep_cfg_t;
4748

4849
/**

media-proxy/include/mesh/conn_rdma.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <cstddef>
1515
#include <atomic>
1616
#include <queue>
17+
#include <array>
1718

1819
#ifndef RDMA_DEFAULT_TIMEOUT
1920
#define RDMA_DEFAULT_TIMEOUT 1
@@ -27,6 +28,9 @@
2728
#ifndef PAGE_SIZE
2829
#define PAGE_SIZE 4096
2930
#endif
31+
#ifndef RDMA_NUM_EPS
32+
#define RDMA_NUM_EPS 1
33+
#endif
3034

3135
namespace mesh::connection {
3236

@@ -79,14 +83,21 @@ class Rdma : public Connection {
7983

8084
// RDMA-specific members
8185
libfabric_ctx *m_dev_handle; // RDMA device handle
82-
ep_ctx_t *ep_ctx; // RDMA endpoint context
86+
// Configurable number of RDMA endpoints (default RDMA_NUM_EPS)
87+
size_t rdma_num_eps = RDMA_NUM_EPS;
88+
// Configurable provider: "tcp" or "verbs"
89+
std::string rdma_provider = "verbs"; // Default provider
90+
// Endpoint contexts (one pointer per EP)
91+
std::vector<ep_ctx_t*> ep_ctxs;
8392
ep_cfg_t ep_cfg; // RDMA endpoint configuration
8493
size_t trx_sz; // Data transfer size
8594
bool init; // Indicates if RDMA is initialized
8695
void *buffer_block; // Pointer to the allocated buffer block
8796
int queue_size; // Number of buffers in the queue
8897
static std::atomic<int> active_connections; // Number of active RDMA connections
8998

99+
static constexpr size_t TRAILER = sizeof(uint64_t); // Size of the trailer for sequence number
100+
90101
// Queue for managing buffers
91102
std::queue<void *> buffer_queue; // Queue holding available buffers
92103
std::mutex queue_mutex; // Mutex for buffer queue synchronization

media-proxy/include/mesh/conn_rdma_rx.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ class RdmaRx : public Rdma {
2828
// Receive data using RDMA
2929
void process_buffers_thread(context::Context& ctx);
3030
void rdma_cq_thread(context::Context& ctx);
31+
std::atomic<uint32_t> next_rx_idx;
32+
static constexpr size_t REORDER_WINDOW = 256; // > max expected out-of-order
33+
std::array<void *, REORDER_WINDOW> reorder_ring{{nullptr}};
34+
uint64_t reorder_head = UINT64_MAX;
3135
};
3236

3337
} // namespace mesh::connection

media-proxy/include/mesh/conn_rdma_tx.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ class RdmaTx : public Rdma {
2828
protected:
2929
virtual Result start_threads(context::Context& ctx);
3030
void rdma_cq_thread(context::Context& ctx);
31+
// one 64-bit counter shared by all RdmaTx
32+
inline static std::atomic<uint64_t> global_seq{0};
33+
std::atomic<uint32_t> next_tx_idx;
3134
};
3235

3336
} // namespace mesh::connection

media-proxy/src/libfabric_dev.c

Lines changed: 178 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
#include <getopt.h>
1010

1111
#include <rdma/fi_cm.h>
12+
#include <rdma/rdma_verbs.h>
13+
#include <rdma/rdma_cma.h>
1214

1315
#include "libfabric_dev.h"
1416

@@ -23,6 +25,15 @@ static void rdma_freehints(struct fi_info *hints)
2325
if (!hints)
2426
return;
2527

28+
if (hints->tx_attr) {
29+
free(hints->tx_attr);
30+
hints->tx_attr = NULL;
31+
}
32+
if (hints->rx_attr) {
33+
free(hints->rx_attr);
34+
hints->rx_attr = NULL;
35+
}
36+
2637
if (hints->domain_attr->name) {
2738
free(hints->domain_attr->name);
2839
hints->domain_attr->name = NULL;
@@ -53,7 +64,7 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints)
5364
{
5465
int ret;
5566

56-
ret = fi_getinfo(FI_VERSION(1, 21), NULL, NULL, 0, hints, &rdma_ctx->info);
67+
ret = fi_getinfo(FI_VERSION(2, 0), NULL, NULL, 0, hints, &rdma_ctx->info);
5768
if (ret) {
5869
RDMA_PRINTERR("fi_getinfo", ret);
5970
return ret;
@@ -65,19 +76,19 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints)
6576
return ret;
6677
}
6778

79+
ret = fi_domain(rdma_ctx->fabric, rdma_ctx->info, &rdma_ctx->domain, NULL);
80+
if (ret) {
81+
RDMA_PRINTERR("fi_domain", ret);
82+
return ret;
83+
}
84+
6885
/* TODO: future improvement: Add and monitor EQ to catch errors.
6986
* ret = fi_eq_open(rdma_ctx->fabric, &eq_attr, &eq, NULL);
7087
* if (ret) {
7188
* RDMA_PRINTERR("fi_eq_open", ret);
7289
* return ret;
7390
* } */
7491

75-
ret = fi_domain(rdma_ctx->fabric, rdma_ctx->info, &rdma_ctx->domain, NULL);
76-
if (ret) {
77-
RDMA_PRINTERR("fi_domain", ret);
78-
return ret;
79-
}
80-
8192
/* TODO: future improvement: Add and monitor EQ to catch errors.
8293
* ret = fi_domain_bind(rdma_ctx->domain, &eq->fid, 0);
8394
* if (ret) {
@@ -90,42 +101,178 @@ static int rdma_init_fabric(libfabric_ctx *rdma_ctx, struct fi_info *hints)
90101

91102
int rdma_init(libfabric_ctx **ctx)
92103
{
93-
struct fi_info *hints;
94-
int op, ret = 0;
95-
*ctx = calloc(1, sizeof(libfabric_ctx));
96-
if (*ctx == NULL) {
97-
RDMA_PRINTERR("calloc", -ENOMEM);
98-
return -ENOMEM;
104+
printf("[rdma_init] Entering function\n");
105+
struct fi_info *hints = NULL;
106+
struct rdma_addrinfo *rai_res = NULL;
107+
struct rdma_addrinfo rai_hints;
108+
int ret = 0;
109+
110+
if (!ctx || !(*ctx)) {
111+
printf("[rdma_init] ERROR: ctx is NULL\n");
112+
return -EINVAL;
113+
}
114+
115+
// Clear any previous initialization
116+
if ((*ctx)->fabric || (*ctx)->domain || (*ctx)->info) {
117+
printf("[rdma_init] WARNING: Context already contains initialized resources\n");
118+
rdma_free_res(*ctx);
99119
}
100120

121+
memset(&rai_hints, 0, sizeof(rai_hints));
122+
printf("[rdma_init] Initialized rai_hints to zero\n");
123+
124+
/* Allocate fi_info hints */
101125
hints = fi_allocinfo();
102126
if (!hints) {
103-
RDMA_PRINTERR("fi_allocinfo", ret);
104-
libfabric_dev_ops.rdma_deinit(ctx);
127+
printf("[rdma_init] ERROR: fi_allocinfo failed\n");
105128
return -ENOMEM;
106129
}
130+
printf("[rdma_init] fi_allocinfo returned valid hints at %p\n", hints);
107131

108-
hints->domain_attr->mr_mode =
109-
FI_MR_LOCAL | FI_MR_ENDPOINT | (FI_MR_ALLOCATED | FI_MR_PROV_KEY | FI_MR_VIRT_ADDR);
110-
hints->ep_attr->type = FI_EP_RDM;
111-
hints->caps = FI_MSG;
112-
hints->addr_format = FI_SOCKADDR_IN;
113-
hints->fabric_attr->prov_name = strdup(LIB_FABRIC_ATTR_PROV_NAME_TCP);
114-
hints->tx_attr->tclass = FI_TC_BULK_DATA;
115-
hints->domain_attr->resource_mgmt = FI_RM_ENABLED;
116-
hints->mode = FI_OPT_ENDPOINT;
117-
hints->tx_attr->size = 32; // Transmit queue size
118-
hints->rx_attr->size = 32; // Receive queue size
132+
if ((*ctx)->provider_name && strcmp((*ctx)->provider_name, "verbs") == 0) {
133+
hints->fabric_attr->prov_name = strdup(LIB_FABRIC_ATTR_PROV_NAME_VERBS);
134+
hints->ep_attr->type = FI_EP_RDM; // Reliable datagram
135+
hints->ep_attr->protocol = FI_PROTO_RXM;
136+
hints->addr_format = FI_SOCKADDR_IN; // IPv4
137+
hints->domain_attr->av_type = FI_AV_UNSPEC;
138+
hints->domain_attr->mr_mode =
139+
FI_MR_LOCAL | FI_MR_ALLOCATED | FI_MR_VIRT_ADDR | FI_MR_PROV_KEY;
140+
hints->domain_attr->data_progress = FI_PROGRESS_AUTO; // Use auto progress
119141

120-
ret = rdma_init_fabric(*ctx, hints);
121-
rdma_freehints(hints);
142+
/* Adjust capabilities based on the direction */
143+
if ((*ctx)->kind == FI_KIND_RECEIVER) {
144+
hints->caps = FI_MSG | FI_RECV | FI_RMA | FI_REMOTE_READ | FI_LOCAL_COMM;
145+
if (hints->rx_attr) {
146+
hints->rx_attr->size = 1024; // Larger receive queue
147+
}
148+
} else {
149+
hints->caps = FI_MSG | FI_SEND | FI_RMA | FI_REMOTE_WRITE | FI_LOCAL_COMM;
150+
if (hints->tx_attr) {
151+
hints->tx_attr->size = 1024; // Larger send queue
152+
}
153+
}
154+
} else {
155+
hints->domain_attr->mr_mode =
156+
FI_MR_LOCAL | FI_MR_VIRT_ADDR;
157+
hints->ep_attr->type = FI_EP_RDM;
158+
hints->caps = FI_MSG;
159+
hints->addr_format = FI_SOCKADDR_IN;
160+
hints->fabric_attr->prov_name = strdup(LIB_FABRIC_ATTR_PROV_NAME_TCP);
161+
hints->tx_attr->tclass = FI_TC_BULK_DATA;
162+
hints->domain_attr->resource_mgmt = FI_RM_ENABLED;
163+
hints->mode = FI_OPT_ENDPOINT;
164+
hints->tx_attr->size = 1024; // Transmit queue size
165+
hints->rx_attr->size = 1024; // Receive queue size
166+
hints->domain_attr->threading = FI_THREAD_UNSPEC;
167+
}
168+
169+
/* Store configuration for later endpoint use */
170+
(*ctx)->is_initialized = false;
171+
(*ctx)->ep_attr_type = hints->ep_attr->type;
172+
(*ctx)->addr_format = hints->addr_format;
173+
174+
/* Adjust capabilities based on the connection kind */
175+
if ((*ctx)->kind == FI_KIND_RECEIVER) {
176+
/* For RX, resolve the local address */
177+
ret = rdma_getaddrinfo((*ctx)->local_ip, (*ctx)->local_port, &rai_hints, &rai_res);
178+
if (ret) {
179+
printf("[rdma_init] ERROR: rdma_getaddrinfo (RX) returned %d\n", ret);
180+
goto err;
181+
}
182+
183+
hints->src_addr = malloc(rai_res->ai_src_len);
184+
if (!hints->src_addr) {
185+
ret = -ENOMEM;
186+
goto err;
187+
}
188+
189+
// Set the FI_SOURCE flag to tell provider to use src_addr for binding
190+
hints->caps |= FI_SOURCE;
191+
192+
memcpy(hints->src_addr, rai_res->ai_src_addr, rai_res->ai_src_len);
193+
hints->src_addrlen = rai_res->ai_src_len;
194+
} else {
195+
/* For TX, resolve the remote address */
196+
ret = rdma_getaddrinfo((*ctx)->remote_ip, (*ctx)->remote_port, &rai_hints, &rai_res);
197+
if (ret) {
198+
printf("[rdma_init] ERROR: rdma_getaddrinfo (TX) returned %d\n", ret);
199+
goto err;
200+
}
201+
202+
// Handle source address
203+
if (rai_res->ai_src_len > 0) {
204+
hints->src_addr = malloc(rai_res->ai_src_len);
205+
if (!hints->src_addr) {
206+
ret = -ENOMEM;
207+
goto err;
208+
}
209+
memcpy(hints->src_addr, rai_res->ai_src_addr, rai_res->ai_src_len);
210+
hints->src_addrlen = rai_res->ai_src_len;
211+
}
212+
213+
// Handle destination address
214+
if (rai_res->ai_dst_len > 0) {
215+
hints->dest_addr = malloc(rai_res->ai_dst_len);
216+
if (!hints->dest_addr) {
217+
ret = -ENOMEM;
218+
goto err;
219+
}
220+
memcpy(hints->dest_addr, rai_res->ai_dst_addr, rai_res->ai_dst_len);
221+
hints->dest_addrlen = rai_res->ai_dst_len;
222+
}
223+
}
224+
225+
/* Create fabric info, domain and fabric */
226+
// For RX, pass hostname and port to enforce binding
227+
if ((*ctx)->kind == FI_KIND_RECEIVER) {
228+
ret = fi_getinfo(FI_VERSION(2, 0), (*ctx)->local_ip, (*ctx)->local_port,
229+
FI_SOURCE, hints, &(*ctx)->info);
230+
} else {
231+
// For TX, use the remote address info to force connectivity to specific endpoint
232+
printf("[rdma_init] TX - Enforcing connection to remote %s:%s\n", (*ctx)->remote_ip,
233+
(*ctx)->remote_port);
234+
235+
ret = fi_getinfo(FI_VERSION(2, 0), (*ctx)->remote_ip, (*ctx)->remote_port, 0, hints,
236+
&(*ctx)->info);
237+
}
122238
if (ret) {
123-
libfabric_dev_ops.rdma_deinit(ctx);
124-
ERROR("Failed to initialize RDMA device");
125-
return ret;
239+
fprintf(stderr, "[rdma_init] ERROR: fi_getinfo returned %d (%s)\n", ret, fi_strerror(-ret));
240+
goto err;
241+
}
242+
243+
ret = fi_fabric((*ctx)->info->fabric_attr, &(*ctx)->fabric, NULL);
244+
if (ret) {
245+
printf("[rdma_init] ERROR: fi_fabric returned %d\n", ret);
246+
goto err_info;
126247
}
127248

249+
ret = fi_domain((*ctx)->fabric, (*ctx)->info, &(*ctx)->domain, NULL);
250+
if (ret) {
251+
printf("[rdma_init] ERROR: fi_domain returned %d\n", ret);
252+
goto err_fabric;
253+
}
254+
255+
/* Clean up temporary resources */
256+
rdma_freeaddrinfo(rai_res);
257+
rdma_freehints(hints); // We've already stored the info in ctx->info
258+
259+
(*ctx)->is_initialized = true;
260+
printf("[rdma_init] Device context successfully initialized\n");
128261
return 0;
262+
263+
err_fabric:
264+
fi_close(&(*ctx)->fabric->fid);
265+
266+
err_info:
267+
fi_freeinfo((*ctx)->info);
268+
free(*ctx);
269+
*ctx = NULL;
270+
271+
err:
272+
rdma_freeaddrinfo(rai_res);
273+
rdma_freehints(hints);
274+
fprintf(stderr, "[rdma_init] Failed with error %d\n", ret);
275+
return ret;
129276
}
130277

131278
static void rdma_free_res(libfabric_ctx *rdma_ctx)

0 commit comments

Comments
 (0)