Skip to content

Commit 985c3b3

Browse files
committed
Deallocate polling cq
1 parent ea03983 commit 985c3b3

File tree

3 files changed

+23
-26
lines changed

3 files changed

+23
-26
lines changed

src/brpc/rdma/rdma_endpoint.cpp

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ void RdmaEndpoint::Reset() {
230230
_rq_received = 0;
231231
_local_window_capacity = 0;
232232
_remote_window_capacity = 0;
233+
_sq_imm_window_size = 0;
233234
_remote_rq_window_size.store(0, butil::memory_order_relaxed);
234235
_sq_window_size.store(0, butil::memory_order_relaxed);
235236
_new_rq_wrs.store(0, butil::memory_order_relaxed);
@@ -841,8 +842,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
841842
}
842843

843844
ssize_t len = data->cut_into_sglist_and_iobuf(
844-
sglist, &sge_index, to, max_sge,
845-
_remote_recv_block_size - this_len);
845+
sglist, &sge_index, to, max_sge, _remote_recv_block_size - this_len);
846846
if (len < 0) {
847847
return -1;
848848
}
@@ -957,6 +957,10 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
957957
LOG(WARNING) << "Fail to ibv_post_send: " << berror(err) << " " << oss.str();
958958
return -1;
959959
}
960+
961+
// `_sq_imm_window_size' will never be negative.
962+
// Because IMM can only be sent if
963+
// `_sq_imm_window_size` is greater than 0.
960964
_sq_imm_window_size -= 1;
961965
return 0;
962966
}
@@ -967,6 +971,7 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
967971
case IBV_WC_SEND: { // send completion
968972
if (0 == wc.wr_id) {
969973
_sq_imm_window_size += 1;
974+
// If there are any unacknowledged recvs, send an ack.
970975
SendAck(0);
971976
return 0;
972977
}
@@ -1298,14 +1303,11 @@ int RdmaEndpoint::BringUpQp(uint16_t lid, ibv_gid gid, uint32_t qp_num) {
12981303
return 0;
12991304
}
13001305

1301-
void RdmaEndpoint::DeallocateCq(ibv_cq* cq, unsigned int cq_events) {
1306+
static void DeallocateCq(ibv_cq* cq) {
13021307
if (NULL == cq) {
13031308
return;
13041309
}
13051310

1306-
if (cq_events > 0) {
1307-
IbvAckCqEvents(cq, cq_events);
1308-
}
13091311
int err = IbvDestroyCq(cq);
13101312
LOG_IF(WARNING, 0 != err) << "Fail to destroy CQ: " << berror(err);
13111313
}
@@ -1328,6 +1330,13 @@ void RdmaEndpoint::DeallocateResources() {
13281330
}
13291331
}
13301332

1333+
if (NULL != _resource->send_cq) {
1334+
IbvAckCqEvents(_resource->send_cq, _send_cq_events);
1335+
}
1336+
if (NULL != _resource->recv_cq) {
1337+
IbvAckCqEvents(_resource->recv_cq, _recv_cq_events);
1338+
}
1339+
13311340
bool remove_consumer = true;
13321341
if (!move_to_rdma_resource_list) {
13331342
if (NULL != _resource->qp) {
@@ -1336,8 +1345,9 @@ void RdmaEndpoint::DeallocateResources() {
13361345
_resource->qp = NULL;
13371346
}
13381347

1339-
DeallocateCq(_resource->send_cq, _send_cq_events);
1340-
DeallocateCq(_resource->recv_cq, _recv_cq_events);
1348+
DeallocateCq(_resource->polling_cq);
1349+
DeallocateCq(_resource->send_cq);
1350+
DeallocateCq(_resource->recv_cq);
13411351

13421352
if (NULL != _resource->comp_channel) {
13431353
// Destroy send_comp_channel will destroy this fd,
@@ -1350,6 +1360,7 @@ void RdmaEndpoint::DeallocateResources() {
13501360

13511361
}
13521362

1363+
_resource->polling_cq = NULL;
13531364
_resource->send_cq = NULL;
13541365
_resource->recv_cq = NULL;
13551366
_resource->comp_channel = NULL;
@@ -1369,12 +1380,6 @@ void RdmaEndpoint::DeallocateResources() {
13691380
}
13701381

13711382
if (move_to_rdma_resource_list) {
1372-
if (NULL != _resource->send_cq) {
1373-
IbvAckCqEvents(_resource->send_cq, _send_cq_events);
1374-
}
1375-
if (NULL != _resource->recv_cq) {
1376-
IbvAckCqEvents(_resource->recv_cq, _recv_cq_events);
1377-
}
13781383
BAIDU_SCOPED_LOCK(*g_rdma_resource_mutex);
13791384
_resource->next = g_rdma_resource_list;
13801385
g_rdma_resource_list = _resource;
@@ -1399,7 +1404,7 @@ int RdmaEndpoint::GetAndAckEvents(SocketUniquePtr& s) {
13991404
}
14001405
if (cq == _resource->send_cq) {
14011406
++_send_cq_events;
1402-
} else {
1407+
} else if (cq == _resource->recv_cq) {
14031408
++_recv_cq_events;
14041409
}
14051410
}
@@ -1418,7 +1423,8 @@ int RdmaEndpoint::GetAndAckEvents(SocketUniquePtr& s) {
14181423

14191424
int RdmaEndpoint::ReqNotifyCq(bool send_cq) {
14201425
errno = ibv_req_notify_cq(
1421-
send_cq ? _resource->send_cq : _resource->recv_cq, send_cq ? 0 : 1);
1426+
send_cq ? _resource->send_cq : _resource->recv_cq,
1427+
send_cq ? 0 : 1);
14221428
if (0 != errno) {
14231429
const int saved_errno = errno;
14241430
PLOG(WARNING) << "Fail to arm" << (send_cq ? "send" : "recv")

src/brpc/rdma/rdma_endpoint.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,6 @@ friend class Socket;
134134
// Process handshake at the server
135135
static void* ProcessHandshakeAtServer(void* arg);
136136

137-
// Deallocate CQ resource.
138-
static void DeallocateCq(ibv_cq* cq, unsigned int cq_events);
139-
140137
// Allocate resources
141138
// Return 0 if success, -1 if failed and errno set
142139
int AllocateResources();
@@ -277,8 +274,6 @@ friend class Socket;
277274
butil::atomic<uint16_t> _sq_window_size;
278275
// The number of new WRs posted in the local Recv Queue
279276
butil::atomic<uint16_t> _new_rq_wrs;
280-
// The number of inflight send IMM.
281-
butil::atomic<uint16_t> _imm_inflight;
282277

283278
// butex for inform read events on TCP fd during handshake
284279
butil::atomic<int> *_read_butex;

test/brpc_rdma_unittest.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,7 @@ TEST_F(RdmaTest, client_hello_msg_invalid_magic_str) {
209209

210210
uint8_t data[RDMA_HELLO_MSG_LEN];
211211
memcpy(data, "PRPC", 4); // send as normal baidu_std protocol
212-
memset(data + 4, 0, 32);
213-
ASSERT_EQ(38, write(sockfd, data, 38));
212+
ASSERT_EQ(4, write(sockfd, data, 4));
214213
usleep(100000); // wait for server to handle the msg
215214
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
216215

@@ -660,9 +659,6 @@ TEST_F(RdmaTest, client_send_data_on_tcp_after_ack_send) {
660659
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
661660
ASSERT_EQ(sizeof(flags), write(sockfd1, &flags, sizeof(flags)));
662661
usleep(100000);
663-
ASSERT_EQ(rdma::RdmaEndpoint::FALLBACK_TCP, s->_rdma_ep->_state);
664-
close(sockfd1);
665-
usleep(100000); // wait for server to handle the msg
666662
ASSERT_EQ(NULL, GetSocketFromServer(0));
667663

668664
butil::fd_guard sockfd2(socket(AF_INET, SOCK_STREAM, 0));

0 commit comments

Comments
 (0)