@@ -102,6 +102,11 @@ static const uint32_t ACK_MSG_RDMA_OK = 0x1;
102102static butil::Mutex* g_rdma_resource_mutex = NULL ;
103103static RdmaResource* g_rdma_resource_list = NULL ;
104104
105+ enum SendType {
106+ SEND_TYPE_RDMA_DATA = 0 ,
107+ SEND_TYPE_RDMA_IMM,
108+ };
109+
105110struct HelloMessage {
106111 void Serialize (void * data) const ;
107112 void Deserialize (void * data);
@@ -189,7 +194,8 @@ RdmaEndpoint::RdmaEndpoint(Socket* s)
189194 , _rq_received(0 )
190195 , _local_window_capacity(0 )
191196 , _remote_window_capacity(0 )
192- , _window_size(0 )
197+ , _remote_rq_window_size(0 )
198+ , _sq_window_size(0 )
193199 , _new_rq_wrs(0 )
194200{
195201 if (_sq_size < MIN_QP_SIZE) {
@@ -227,7 +233,8 @@ void RdmaEndpoint::Reset() {
227233 _sq_unsignaled = 0 ;
228234 _local_window_capacity = 0 ;
229235 _remote_window_capacity = 0 ;
230- _window_size.store (0 , butil::memory_order_relaxed);
236+ _remote_rq_window_size.store (0 , butil::memory_order_relaxed);
237+ _sq_window_size.store (0 , butil::memory_order_relaxed);
231238 _new_rq_wrs = 0 ;
232239 _sq_sent = 0 ;
233240 _rq_received = 0 ;
@@ -513,7 +520,10 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
513520 std::min (ep->_sq_size , remote_msg.rq_size ) - RESERVED_WR_NUM;
514521 ep->_remote_window_capacity =
515522 std::min (ep->_rq_size , remote_msg.sq_size ) - RESERVED_WR_NUM,
516- ep->_window_size .store (ep->_local_window_capacity , butil::memory_order_relaxed);
523+ ep->_remote_rq_window_size .store (
524+ ep->_local_window_capacity , butil::memory_order_relaxed);
525+ ep->_sq_window_size .store (
526+ ep->_local_window_capacity , butil::memory_order_relaxed);
517527
518528 ep->_state = C_BRINGUP_QP;
519529 if (ep->BringUpQp (remote_msg.lid , remote_msg.gid , remote_msg.qp_num ) < 0 ) {
@@ -544,11 +554,11 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) {
544554 if (s->_rdma_state == Socket::RDMA_ON) {
545555 ep->_state = ESTABLISHED;
546556 LOG_IF (INFO, FLAGS_rdma_trace_verbose)
547- << " Handshake ends (use rdma) on " << s->description ();
557+ << " Client handshake ends (use rdma) on " << s->description ();
548558 } else {
549559 ep->_state = FALLBACK_TCP;
550560 LOG_IF (INFO, FLAGS_rdma_trace_verbose)
551- << " Handshake ends (use tcp) on " << s->description ();
561+ << " Client handshake ends (use tcp) on " << s->description ();
552562 }
553563
554564 errno = 0 ;
@@ -621,7 +631,10 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
621631 std::min (ep->_sq_size , remote_msg.rq_size ) - RESERVED_WR_NUM;
622632 ep->_remote_window_capacity =
623633 std::min (ep->_rq_size , remote_msg.sq_size ) - RESERVED_WR_NUM,
624- ep->_window_size .store (ep->_local_window_capacity , butil::memory_order_relaxed);
634+ ep->_remote_rq_window_size .store (
635+ ep->_local_window_capacity , butil::memory_order_relaxed);
636+ ep->_sq_window_size .store (
637+ ep->_local_window_capacity , butil::memory_order_relaxed);
625638
626639 ep->_state = S_ALLOC_QPCQ;
627640 if (ep->AllocateResources () < 0 ) {
@@ -697,13 +710,13 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) {
697710 s->_rdma_state = Socket::RDMA_ON;
698711 ep->_state = ESTABLISHED;
699712 LOG_IF (INFO, FLAGS_rdma_trace_verbose)
700- << " Handshake ends (use rdma) on " << s->description ();
713+ << " Server handshake ends (use rdma) on " << s->description ();
701714 }
702715 } else {
703716 s->_rdma_state = Socket::RDMA_OFF;
704717 ep->_state = FALLBACK_TCP;
705718 LOG_IF (INFO, FLAGS_rdma_trace_verbose)
706- << " Handshake ends (use tcp) on " << s->description ();
719+ << " Server handshake ends (use tcp) on " << s->description ();
707720 }
708721 ep->TryReadOnTcp ();
709722
@@ -716,7 +729,8 @@ bool RdmaEndpoint::IsWritable() const {
716729 return false ;
717730 }
718731
719- return _window_size.load (butil::memory_order_relaxed) > 0 ;
732+ return _remote_rq_window_size.load (butil::memory_order_relaxed) > 0 &&
733+ _sq_window_size.load (butil::memory_order_relaxed) > 0 ;
720734}
721735
722736// RdmaIOBuf inherits from IOBuf to provide a new function.
@@ -786,13 +800,16 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
786800
787801 size_t total_len = 0 ;
788802 size_t current = 0 ;
789- uint32_t window = 0 ;
803+ uint32_t remote_rq_window_size =
804+ _remote_rq_window_size.load (butil::memory_order_relaxed);
805+ uint32_t sq_window_size =
806+ _sq_window_size.load (butil::memory_order_relaxed);
790807 ibv_send_wr wr;
791808 int max_sge = GetRdmaMaxSge ();
792809 ibv_sge sglist[max_sge];
793810 while (current < ndata) {
794- window = _window_size. load (butil::memory_order_relaxed);
795- if (window == 0 ) {
811+ if (remote_rq_window_size == 0 || sq_window_size == 0 ) {
812+ // There is no space left in SQ or remote RQ.
796813 if (total_len > 0 ) {
797814 break ;
798815 } else {
@@ -811,7 +828,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
811828 size_t sge_index = 0 ;
812829 while (sge_index < (uint32_t )max_sge &&
813830 this_len < _remote_recv_block_size) {
814- if (data->size () == 0 ) {
831+ if (data->empty () ) {
815832 // The current IOBuf is empty, find next one
816833 ++current;
817834 if (current == ndata) {
@@ -841,7 +858,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
841858 wr.imm_data = butil::HostToNet32 (imm);
842859 // Avoid too much recv completion event to reduce the cpu overhead
843860 bool solicited = false ;
844- if (window == 1 || current + 1 >= ndata) {
861+ if (remote_rq_window_size == 1 || sq_window_size == 1 || current + 1 >= ndata) {
845862 // Only last message in the write queue or last message in the
846863 // current window will be flagged as solicited.
847864 solicited = true ;
@@ -874,6 +891,7 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
874891 // Refer to:
875892 // http::www.rdmamojo.com/2014/06/30/working-unsignaled-completions/
876893 wr.send_flags |= IBV_SEND_SIGNALED;
894+ wr.wr_id = SEND_TYPE_RDMA_DATA;
877895 _sq_unsignaled = 0 ;
878896 }
879897
@@ -883,7 +901,8 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
883901 // We use other way to guarantee the Send Queue is not full.
884902 // So we just consider this error as an unrecoverable error.
885903 LOG (WARNING) << " Fail to ibv_post_send: " << berror (err)
886- << " , window=" << window
904+ << " , remote_rq_window_size=" << remote_rq_window_size
905+ << " , sq_window_size=" << sq_window_size
887906 << " , sq_current=" << _sq_current;
888907 errno = err;
889908 return -1 ;
@@ -894,11 +913,14 @@ ssize_t RdmaEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
894913 _sq_current = 0 ;
895914 }
896915
897- // Update _window_size. Note that _window_size will never be negative.
916+ // Update `_remote_rq_window_size' and `_sq_window_size'. Note that
917+ // `_remote_rq_window_size' and `_sq_window_size' will never be negative.
898918 // Because there is at most one thread can enter this function for each
899- // Socket, and the other thread of HandleCompletion can only add this
900- // counter.
901- _window_size.fetch_sub (1 , butil::memory_order_relaxed);
919+ // Socket, and the other thread of HandleCompletion can only add these
920+ // counters.
921+ remote_rq_window_size =
922+ _remote_rq_window_size.fetch_sub (1 , butil::memory_order_relaxed) - 1 ;
923+ sq_window_size = _sq_window_size.fetch_sub (1 , butil::memory_order_relaxed) - 1 ;
902924 }
903925
904926 return total_len;
@@ -922,6 +944,7 @@ int RdmaEndpoint::SendImm(uint32_t imm) {
922944 wr.imm_data = butil::HostToNet32 (imm);
923945 wr.send_flags |= IBV_SEND_SOLICITED;
924946 wr.send_flags |= IBV_SEND_SIGNALED;
947+ wr.wr_id = SEND_TYPE_RDMA_IMM;
925948
926949 ibv_send_wr* bad = NULL ;
927950 int err = ibv_post_send (_resource->qp , &wr, &bad);
@@ -938,8 +961,16 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
938961 bool zerocopy = FLAGS_rdma_recv_zerocopy;
939962 switch (wc.opcode ) {
940963 case IBV_WC_SEND: { // send completion
941- // Do nothing
942- break ;
964+ if (SEND_TYPE_RDMA_IMM == wc.wr_id ) {
965+ // Do nothing for imm.
966+ return 0 ;
967+ }
968+ // Update window
969+ uint16_t wnd_to_update = _local_window_capacity / 4 ;
970+ _sq_window_size.fetch_add (wnd_to_update, butil::memory_order_relaxed);
971+ // Wake up writing thread right after every signaled sending cqe.
972+ _socket->WakeAsEpollOut ();
973+ return 0 ;
943974 }
944975 case IBV_WC_RECV: { // recv completion
945976 // Please note that only the first wc.byte_len bytes is valid
@@ -949,9 +980,7 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
949980 }
950981 CHECK (_state != FALLBACK_TCP);
951982 if (zerocopy) {
952- butil::IOBuf tmp;
953- _rbuf[_rq_received].cutn (&tmp, wc.byte_len );
954- _socket->_read_buf .append (tmp);
983+ _rbuf[_rq_received].cutn (&_socket->_read_buf , wc.byte_len );
955984 } else {
956985 // Copy data when the receive data is really small
957986 _socket->_read_buf .append (_rbuf_data[_rq_received], wc.byte_len );
@@ -972,9 +1001,9 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) {
9721001
9731002 // Update window
9741003 uint32_t wnd_thresh = _local_window_capacity / 8 ;
975- if (_window_size .fetch_add (acks, butil::memory_order_relaxed) >= wnd_thresh
1004+ if (_remote_rq_window_size .fetch_add (acks, butil::memory_order_relaxed) >= wnd_thresh
9761005 || acks >= wnd_thresh) {
977- // Do not wake up writing thread right after _window_size > 0.
1006+ // Do not wake up writing thread right after _remote_rq_window_size > 0.
9781007 // Otherwise the writing thread may switch to background too quickly.
9791008 _socket->WakeAsEpollOut ();
9801009 }
@@ -1490,7 +1519,9 @@ std::string RdmaEndpoint::GetStateStr() const {
14901519void RdmaEndpoint::DebugInfo (std::ostream& os) const {
14911520 os << " \n rdma_state=ON"
14921521 << " \n handshake_state=" << GetStateStr ()
1493- << " \n rdma_window_size=" << _window_size.load (butil::memory_order_relaxed)
1522+ << " \n rdma_remote_rq_window_size="
1523+ << _remote_rq_window_size.load (butil::memory_order_relaxed)
1524+ << " \n rdma_sq_window_size=" << _sq_window_size.load (butil::memory_order_relaxed)
14941525 << " \n rdma_local_window_capacity=" << _local_window_capacity
14951526 << " \n rdma_remote_window_capacity=" << _remote_window_capacity
14961527 << " \n rdma_sbuf_head=" << _sq_current
0 commit comments