Skip to content

Commit c8753e3

Browse files
authored
Fix some issues with transport (#3206)
1. The return value of CreateTransport should be std::unique_ptr. 2. Delete BAIDU_REGISTER_ERRNO in transport_factory.h. 3. Optimize some code formatting.
1 parent 834484a commit c8753e3

File tree

11 files changed

+40
-40
lines changed

11 files changed

+40
-40
lines changed

src/brpc/input_messenger.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,8 @@ void InputMessenger::OnNewMessages(Socket* m) {
377377
}
378378
}
379379

380-
if (messenger->ProcessNewMessage(m, nr, read_eof, received_us, base_realtime, last_msg) < 0) {
380+
if (messenger->ProcessNewMessage(m, nr, read_eof, received_us,
381+
base_realtime, last_msg) < 0) {
381382
return;
382383
}
383384
}

src/brpc/rdma_transport.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ void RdmaTransport::Init(Socket *socket, const SocketOptions &options) {
3535
if (!_rdma_ep) {
3636
const int saved_errno = errno;
3737
PLOG(ERROR) << "Fail to create RdmaEndpoint";
38-
socket->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
39-
berror(saved_errno));
38+
socket->SetFailed(
39+
saved_errno, "Fail to create RdmaEndpoint: %s", berror(saved_errno));
4040
}
4141
_rdma_state = RDMA_UNKNOWN;
4242
} else {
@@ -95,8 +95,7 @@ ssize_t RdmaTransport::CutFromIOBufList(butil::IOBuf **buf, size_t ndata) {
9595
int RdmaTransport::WaitEpollOut(butil::atomic<int> *_epollout_butex,
9696
bool pollin, const timespec duetime) {
9797
if (_rdma_state == RDMA_ON) {
98-
const int expected_val = _epollout_butex
99-
->load(butil::memory_order_acquire);
98+
const int expected_val = _epollout_butex->load(butil::memory_order_acquire);
10099
CHECK(_rdma_ep != NULL);
101100
if (!_rdma_ep->IsWritable()) {
102101
g_vars->nwaitepollout << 1;
@@ -105,9 +104,9 @@ int RdmaTransport::WaitEpollOut(butil::atomic<int> *_epollout_butex,
105104
const int saved_errno = errno;
106105
PLOG(WARNING) << "Fail to wait rdma window of " << _socket;
107106
_socket->SetFailed(saved_errno,
108-
"Fail to wait rdma window of %s: %s",
109-
_socket->description().c_str(),
110-
berror(saved_errno));
107+
"Fail to wait rdma window of %s: %s",
108+
_socket->description().c_str(),
109+
berror(saved_errno));
111110
}
112111
if (_socket->Failed()) {
113112
// NOTE:
@@ -140,7 +139,8 @@ void RdmaTransport::ProcessEvent(bthread_attr_t attr) {
140139
}
141140
}
142141

143-
void RdmaTransport::QueueMessage(InputMessageClosure& input_msg, int* num_bthread_created, bool last_msg) {
142+
void RdmaTransport::QueueMessage(InputMessageClosure& input_msg,
143+
int* num_bthread_created, bool last_msg) {
144144
if (last_msg && !rdma::FLAGS_rdma_use_polling) {
145145
return;
146146
}
@@ -234,5 +234,5 @@ bool RdmaTransport::OptionsAvailableOverRdma(const ServerOptions* opt) {
234234
}
235235
return true;
236236
}
237-
}
237+
} // namespace brpc
238238
#endif

src/brpc/rdma_transport.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,6 @@ class RdmaTransport : public Transport {
6060
RdmaState _rdma_state;
6161
std::shared_ptr<TcpTransport> _tcp_transport;
6262
};
63-
}
63+
} // namespace brpc
6464
#endif // BRPC_WITH_RDMA
6565
#endif //BRPC_RDMA_TRANSPORT_H

src/brpc/socket.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,6 @@ Socket::Socket(Forbidden f)
474474
, _ssl_state(SSL_UNKNOWN)
475475
, _ssl_session(NULL)
476476
, _socket_mode(SOCKET_MODE_TCP)
477-
, _transport(nullptr)
478477
, _connection_type_for_progressive_read(CONNECTION_TYPE_UNKNOWN)
479478
, _controller_released_socket(false)
480479
, _overcrowded(false)

src/brpc/socket.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -926,8 +926,8 @@ friend class TransportFactory;
926926
std::shared_ptr<SocketSSLContext> _ssl_ctx;
927927

928928
// Should use SOCKET_MODE_RDMA or SOCKET_MODE_TCP or Other, default is SOCKET_MODE_TCP Transport
929-
SocketMode _socket_mode{SOCKET_MODE_TCP};
930-
std::shared_ptr<Transport> _transport;
929+
SocketMode _socket_mode;
930+
std::unique_ptr<Transport> _transport;
931931

932932
// Pass from controller, for progressive reading.
933933
ConnectionType _connection_type_for_progressive_read;

src/brpc/socket_mode.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
#ifndef BRPC_COMMON_H
19-
#define BRPC_COMMON_H
18+
#ifndef BRPC_SOCKET_MODE_H
19+
#define BRPC_SOCKET_MODE_H
2020
namespace brpc {
2121
enum SocketMode {
2222
SOCKET_MODE_TCP = 0,
2323
SOCKET_MODE_RDMA = 1
2424
};
25-
}
26-
#endif //BRPC_COMMON_H
25+
} // namespace brpc
26+
#endif //BRPC_SOCKET_MODE_H

src/brpc/tcp_transport.cpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
#include "tcp_transport.h"
18+
#include "brpc/tcp_transport.h"
19+
1920
namespace brpc {
2021
DECLARE_bool(usercode_in_coroutine);
2122
DECLARE_bool(usercode_in_pthread);
@@ -49,14 +50,15 @@ ssize_t TcpTransport::CutFromIOBufList(butil::IOBuf** buf, size_t ndata) {
4950
return butil::IOBuf::cut_multiple_into_file_descriptor(_socket->fd(), buf, ndata);
5051
}
5152

52-
int TcpTransport::WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, const timespec duetime) {
53+
int TcpTransport::WaitEpollOut(butil::atomic<int>* _epollout_butex,
54+
bool pollin, timespec duetime) {
5355
g_vars->nwaitepollout << 1;
5456
const int rc = _socket->WaitEpollOut(_socket->fd(), pollin, &duetime);
5557
if (rc < 0 && errno != ETIMEDOUT) {
5658
const int saved_errno = errno;
5759
PLOG(WARNING) << "Fail to wait epollout of " << _socket;
5860
_socket->SetFailed(saved_errno, "Fail to wait epollout of %s: %s",
59-
_socket->description().c_str(), berror(saved_errno));
61+
_socket->description().c_str(), berror(saved_errno));
6062
return 1;
6163
}
6264
return 0;
@@ -71,15 +73,18 @@ void TcpTransport::ProcessEvent(bthread_attr_t attr) {
7173
OnEdge(_socket);
7274
}
7375
}
74-
void TcpTransport::QueueMessage(InputMessageClosure& input_msg, int* num_bthread_created, bool last_msg) {
76+
void TcpTransport::QueueMessage(InputMessageClosure& input_msg,
77+
int* num_bthread_created, bool) {
7578
InputMessageBase* to_run_msg = input_msg.release();
7679
if (!to_run_msg) {
7780
return;
7881
}
7982
// Create bthread for last_msg. The bthread is not scheduled
8083
// until bthread_flush() is called (in the worse case).
8184
bthread_t th;
82-
bthread_attr_t tmp = (FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) | BTHREAD_NOSIGNAL;
85+
bthread_attr_t tmp =
86+
(FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL) |
87+
BTHREAD_NOSIGNAL;
8388
tmp.keytable_pool = _socket->keytable_pool();
8489
tmp.tag = bthread_self_tag();
8590
bthread_attr_set_name(&tmp, "ProcessInputMessage");
@@ -90,5 +95,5 @@ void TcpTransport::QueueMessage(InputMessageClosure& input_msg, int* num_bthread
9095
ProcessInputMessage(to_run_msg);
9196
}
9297
}
93-
void TcpTransport::Debug(std::ostream &os) {}
94-
}
98+
99+
} // namespace brpc

src/brpc/tcp_transport.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ class TcpTransport : public Transport {
3131
std::shared_ptr<AppConnect> Connect() override;
3232
int CutFromIOBuf(butil::IOBuf* buf) override;
3333
ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) override;
34-
int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, const timespec duetime) override;
34+
int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, timespec duetime) override;
3535
void ProcessEvent(bthread_attr_t attr) override;
3636
void QueueMessage(InputMessageClosure& input_msg, int* num_bthread_created, bool last_msg) override;
37-
void Debug(std::ostream &os) override;
37+
void Debug(std::ostream &os) override {}
3838
};
39-
}
39+
} // namespace brpc
4040

4141
#endif //BRPC_TCP_TRANSPORT_H

src/brpc/transport.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class Transport {
4646
virtual std::shared_ptr<AppConnect> Connect() = 0;
4747
virtual int CutFromIOBuf(butil::IOBuf* buf) = 0;
4848
virtual ssize_t CutFromIOBufList(butil::IOBuf** buf, size_t ndata) = 0;
49-
virtual int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, const timespec duetime) = 0;
49+
virtual int WaitEpollOut(butil::atomic<int>* _epollout_butex, bool pollin, timespec duetime) = 0;
5050
virtual void ProcessEvent(bthread_attr_t attr) = 0;
5151
virtual void QueueMessage(InputMessageClosure& input_msg, int* num_bthread_created, bool last_msg) = 0;
5252
virtual void Debug(std::ostream &os) = 0;

src/brpc/transport_factory.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
#include "transport_factory.h"
18+
#include "brpc/transport_factory.h"
1919
#include "brpc/tcp_transport.h"
2020
#include "brpc/rdma_transport.h"
21+
2122
namespace brpc {
2223
int TransportFactory::ContextInitOrDie(SocketMode mode, bool serverOrNot, const void* _options) {
2324
if (mode == SOCKET_MODE_TCP) {
@@ -34,7 +35,7 @@ int TransportFactory::ContextInitOrDie(SocketMode mode, bool serverOrNot, const
3435
}
3536
}
3637

37-
std::shared_ptr<Transport> TransportFactory::CreateTransport(SocketMode mode) {
38+
std::unique_ptr<Transport> TransportFactory::CreateTransport(SocketMode mode) {
3839
if (mode == SOCKET_MODE_TCP) {
3940
return std::unique_ptr<TcpTransport>(new TcpTransport());
4041
}

0 commit comments

Comments
 (0)