Skip to content

Commit 38cdb0f

Browse files
authored
chore(facade): Handle Socket::Recv() returning 0 explicitly (#6066)
Currently helio translates 0 in Socket:Recv to ECONNABORTED. This works logically well unless we want to differrentiate between properly closed sockets (result=0) and those that were closed due to an error. This PR starts handling result=0 states for sockets even though helio does not yet return them. Also clean up leftovers from the provided buffers.
1 parent 6d9c325 commit 38cdb0f

File tree

3 files changed

+49
-65
lines changed

3 files changed

+49
-65
lines changed

src/facade/dragonfly_connection.cc

Lines changed: 36 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,11 @@ io::Result<bool> Connection::CheckForHttpProto() {
10371037
if (!recv_sz) {
10381038
return make_unexpected(recv_sz.error());
10391039
}
1040+
if (recv_sz == 0) {
1041+
// Peer closed connection.
1042+
return false;
1043+
}
1044+
10401045
io_buf_.CommitWrite(*recv_sz);
10411046
string_view ib = ToSV(io_buf_.InputBuffer());
10421047
if (ib.size() >= 2 && ib[0] == 22 && ib[1] == 3) {
@@ -1139,13 +1144,12 @@ void Connection::ConnectionFlow() {
11391144
// to reproduce: nc localhost 6379 and then run invalid sequence: *1 <enter> *1 <enter>
11401145
error_code ec2 = socket_->Shutdown(SHUT_WR);
11411146
LOG_IF(WARNING, ec2) << "Could not shutdown socket " << ec2;
1142-
if (!ec2) {
1143-
while (true) {
1144-
// Discard any received data.
1145-
io_buf_.Clear();
1146-
if (!socket_->Recv(io_buf_.AppendBuffer())) {
1147-
break;
1148-
}
1147+
while (!ec2) {
1148+
// Discard any received data.
1149+
io_buf_.Clear();
1150+
auto recv_sz = socket_->Recv(io_buf_.AppendBuffer());
1151+
if (!recv_sz || *recv_sz == 0) {
1152+
break; // Peer closed connection.
11491153
}
11501154
}
11511155
}
@@ -1220,15 +1224,9 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
12201224

12211225
auto dispatch_async = [this]() -> MessageHandle { return {FromArgs(tmp_parse_args_)}; };
12221226

1223-
ReadBuffer read_buffer;
1224-
read_buffer.slice = io_buf_.InputBuffer();
1225-
read_buffer.available_bytes = io_buf_.InputLen();
1226-
1227+
io::Bytes read_buffer = io_buf_.InputBuffer();
12271228
do {
1228-
if (read_buffer.ShouldAdvance()) { // can happen only with io_uring/bundles
1229-
read_buffer.slice = NextBundleBuffer(read_buffer.available_bytes);
1230-
}
1231-
result = redis_parser_->Parse(read_buffer.slice, &consumed, &tmp_parse_args_);
1229+
result = redis_parser_->Parse(read_buffer, &consumed, &tmp_parse_args_);
12321230
request_consumed_bytes_ += consumed;
12331231
if (result == RedisParser::OK && !tmp_parse_args_.empty()) {
12341232
// If we get a non-STRING type (e.g., NIL, ARRAY), it's a protocol error.
@@ -1245,7 +1243,7 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
12451243
if (io_req_size_hist)
12461244
io_req_size_hist->Add(request_consumed_bytes_);
12471245
request_consumed_bytes_ = 0;
1248-
bool has_more = consumed < read_buffer.available_bytes;
1246+
bool has_more = consumed < read_buffer.size();
12491247

12501248
if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) {
12511249
LogTraffic(id_, has_more, absl::MakeSpan(tmp_parse_args_),
@@ -1257,9 +1255,9 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
12571255
if (result != RedisParser::OK && result != RedisParser::INPUT_PENDING) {
12581256
// We do not expect that a replica sends an invalid command so we log if it happens.
12591257
LOG_IF(WARNING, cntx()->replica_conn)
1260-
<< "Redis parser error: " << result << " during parse: " << ToSV(read_buffer.slice);
1258+
<< "Redis parser error: " << result << " during parse: " << ToSV(read_buffer);
12611259
}
1262-
read_buffer.Consume(consumed);
1260+
read_buffer.remove_prefix(consumed);
12631261

12641262
// We must yield from time to time to allow other fibers to run.
12651263
// Specifically, if a client sends a huge chunk of data resulting in a very long pipeline,
@@ -1268,8 +1266,7 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
12681266
stats_->num_read_yields++;
12691267
ThisFiber::Yield();
12701268
}
1271-
} while (RedisParser::OK == result && read_buffer.available_bytes > 0 &&
1272-
!reply_builder_->GetError());
1269+
} while (RedisParser::OK == result && read_buffer.size() > 0 && !reply_builder_->GetError());
12731270

12741271
io_buf_.ConsumeInput(io_buf_.InputLen());
12751272

@@ -1278,7 +1275,7 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
12781275
return OK;
12791276

12801277
if (result == RedisParser::INPUT_PENDING) {
1281-
DCHECK_EQ(read_buffer.available_bytes, 0u);
1278+
DCHECK_EQ(read_buffer.size(), 0u);
12821279

12831280
return NEED_MORE;
12841281
}
@@ -1411,28 +1408,26 @@ void Connection::HandleMigrateRequest() {
14111408
}
14121409
}
14131410

1414-
error_code Connection::HandleRecvSocket() {
1411+
io::Result<size_t> Connection::HandleRecvSocket() {
14151412
phase_ = READ_SOCKET;
14161413

14171414
io::MutableBytes append_buf = io_buf_.AppendBuffer();
14181415
DCHECK(!append_buf.empty());
14191416
::io::Result<size_t> recv_sz = socket_->Recv(append_buf);
14201417
last_interaction_ = time(nullptr);
14211418

1422-
if (!recv_sz) {
1423-
return recv_sz.error();
1424-
}
1425-
1426-
size_t commit_sz = *recv_sz;
1427-
io_buf_.CommitWrite(commit_sz);
1428-
1429-
stats_->io_read_bytes += commit_sz;
1430-
local_stats_.net_bytes_in += commit_sz;
1419+
// In case the socket was closed orderly, we get 0 bytes read.
1420+
if (recv_sz && *recv_sz) {
1421+
size_t commit_sz = *recv_sz;
1422+
io_buf_.CommitWrite(commit_sz);
14311423

1432-
++stats_->io_read_cnt;
1433-
++local_stats_.read_cnt;
1424+
stats_->io_read_bytes += commit_sz;
1425+
local_stats_.net_bytes_in += commit_sz;
14341426

1435-
return {};
1427+
++stats_->io_read_cnt;
1428+
++local_stats_.read_cnt;
1429+
}
1430+
return recv_sz;
14361431
}
14371432

14381433
auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
@@ -1446,10 +1441,13 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
14461441

14471442
do {
14481443
HandleMigrateRequest();
1449-
ec = HandleRecvSocket();
1450-
if (ec) {
1451-
LOG_IF(WARNING, cntx()->replica_conn) << "HandleRecvSocket() error: " << ec;
1452-
return ec;
1444+
auto recv_sz = HandleRecvSocket();
1445+
if (!recv_sz) {
1446+
LOG_IF(WARNING, cntx()->replica_conn) << "HandleRecvSocket() error: " << recv_sz.error();
1447+
return recv_sz.error();
1448+
}
1449+
if (*recv_sz == 0) {
1450+
break;
14531451
}
14541452

14551453
phase_ = PROCESS;
@@ -2065,19 +2063,6 @@ void Connection::BreakOnce(uint32_t ev_mask) {
20652063
}
20662064
}
20672065

2068-
io::Bytes Connection::NextBundleBuffer(size_t total_len) {
2069-
++recv_buf_.buf_pos;
2070-
io::Bytes res;
2071-
#ifdef __linux__
2072-
fb2::UringProactor* up = static_cast<fb2::UringProactor*>(socket_->proactor());
2073-
recv_buf_.buf_id = up->GetBufIdByPos(kRecvSockGid, recv_buf_.buf_pos);
2074-
uint8_t* src = up->GetBufRingPtr(kRecvSockGid, recv_buf_.buf_id);
2075-
res = {src, std::min<size_t>(kRecvBufSize, total_len)};
2076-
#endif
2077-
2078-
return res;
2079-
}
2080-
20812066
void Connection::IncrNumConns() {
20822067
if (IsMainOrMemcache())
20832068
++stats_->num_conns_main;

src/facade/dragonfly_connection.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ class Connection : public util::Connection {
382382
PipelineMessagePtr GetFromPipelinePool();
383383

384384
void HandleMigrateRequest();
385-
std::error_code HandleRecvSocket();
385+
io::Result<size_t> HandleRecvSocket();
386386

387387
bool ShouldEndAsyncFiber(const MessageHandle& msg);
388388

@@ -406,18 +406,12 @@ class Connection : public util::Connection {
406406
size_t available_bytes;
407407
io::Bytes slice;
408408

409-
bool ShouldAdvance() const {
410-
return slice.empty();
411-
}
412-
413409
void Consume(size_t len) {
414410
available_bytes -= len;
415411
slice.remove_prefix(len);
416412
}
417413
};
418414

419-
io::Bytes NextBundleBuffer(size_t total_len);
420-
421415
void IncrNumConns();
422416
void DecrNumConns();
423417

src/server/protocol_client.cc

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,11 @@ error_code ProtocolClient::Recv(FiberSocketBase* input, base::IoBuf* dest) {
6161
return exp_size.error();
6262
}
6363

64+
if (*exp_size == 0) {
65+
VLOG(1) << "Connection closed by peer";
66+
return make_error_code(errc::connection_aborted);
67+
}
68+
6469
TouchIoTime();
6570

6671
dest->CommitWrite(*exp_size);
@@ -72,21 +77,21 @@ std::string ProtocolClient::ServerContext::Description() const {
7277
}
7378

7479
void ValidateClientTlsFlags() {
75-
if (!absl::GetFlag(FLAGS_tls_replication)) {
80+
if (!GetFlag(FLAGS_tls_replication)) {
7681
return;
7782
}
7883

7984
bool has_auth = false;
8085

81-
if (!absl::GetFlag(FLAGS_tls_key_file).empty()) {
82-
if (absl::GetFlag(FLAGS_tls_cert_file).empty()) {
86+
if (!GetFlag(FLAGS_tls_key_file).empty()) {
87+
if (GetFlag(FLAGS_tls_cert_file).empty()) {
8388
LOG(ERROR) << "tls_cert_file flag should be set";
8489
exit(1);
8590
}
8691
has_auth = true;
8792
}
8893

89-
if (!absl::GetFlag(FLAGS_masterauth).empty())
94+
if (!GetFlag(FLAGS_masterauth).empty())
9095
has_auth = true;
9196

9297
if (!has_auth) {
@@ -97,7 +102,7 @@ void ValidateClientTlsFlags() {
97102

98103
void ProtocolClient::MaybeInitSslCtx() {
99104
#ifdef DFLY_USE_SSL
100-
if (absl::GetFlag(FLAGS_tls_replication)) {
105+
if (GetFlag(FLAGS_tls_replication)) {
101106
ssl_ctx_ = CreateSslCntx(facade::TlsContextRole::CLIENT);
102107
}
103108
#endif
@@ -207,8 +212,8 @@ error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_time
207212

208213
// CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
209214

210-
auto masterauth = absl::GetFlag(FLAGS_masterauth);
211-
auto masteruser = absl::GetFlag(FLAGS_masteruser);
215+
auto masterauth = GetFlag(FLAGS_masterauth);
216+
auto masteruser = GetFlag(FLAGS_masteruser);
212217
ResetParser(RedisParser::Mode::CLIENT);
213218
if (!masterauth.empty()) {
214219
auto cmd = masteruser.empty() ? StrCat("AUTH ", masterauth)

0 commit comments

Comments
 (0)