Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 142 additions & 2 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
#include <absl/time/time.h>

#include <numeric>
#include <utility>
#include <variant>

#include "absl/cleanup/cleanup.h"
#include "base/cycle_clock.h"
#include "base/flag_utils.h"
#include "base/flags.h"
Expand Down Expand Up @@ -112,6 +114,8 @@ ABSL_FLAG(uint32_t, pipeline_wait_batch_usec, 0,
"If non-zero, waits for this time for more I/O "
" events to come for the connection in case there is only one command in the pipeline. ");

ABSL_FLAG(bool, expiremental_io_loop_v2, false, "new io loop");

using namespace util;
using namespace std;
using absl::GetFlag;
Expand Down Expand Up @@ -1091,7 +1095,12 @@ void Connection::ConnectionFlow() {
// Main loop.
if (parse_status != ERROR && !ec) {
UpdateIoBufCapacity(io_buf_, stats_, [&]() { io_buf_.EnsureCapacity(64); });
auto res = IoLoop();
variant<error_code, Connection::ParserStatus> res;
if (GetFlag(FLAGS_expiremental_io_loop_v2)) {
res = IoLoopV2();
} else {
res = IoLoop();
}

if (holds_alternative<error_code>(res)) {
ec = get<error_code>(res);
Expand Down Expand Up @@ -1150,6 +1159,10 @@ void Connection::ConnectionFlow() {
}
}

if (GetFlag(FLAGS_expiremental_io_loop_v2)) {
socket_->ResetOnRecvHook();
}

if (ec && !FiberSocketBase::IsConnClosed(ec)) {
string conn_info = service_->GetContextInfo(cc_.get()).Format();
LOG_EVERY_T(WARNING, 1) << "Socket error for connection " << conn_info << " " << GetName()
Expand Down Expand Up @@ -1435,7 +1448,7 @@ error_code Connection::HandleRecvSocket() {
return {};
}

auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
variant<error_code, Connection::ParserStatus> Connection::IoLoop() {
error_code ec;
ParserStatus parse_status = OK;

Expand Down Expand Up @@ -2175,6 +2188,133 @@ bool Connection::WeakRef::operator==(const WeakRef& other) const {
return client_id_ == other.client_id_;
}

void Connection::DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n) {
absl::Cleanup on_exit([this]() { io_event_.notify(); });

if (std::holds_alternative<std::error_code>(n.read_result)) {
io_ec_ = std::get<std::error_code>(n.read_result);
return;
}

// EnableRecvMultishot path
// if (std::holds_alternative<io::MutableBytes>(n.read_result)) {
// auto buf = std::get<io::MutableBytes>(n.read_result);
// // TODO get rid of the copy
// io_buf_.WriteAndCommit(buf.data(), buf.size());
// return;
// }

if (std::holds_alternative<monostate>(n.read_result)) {
while (true) {
io::MutableBytes buf = io_buf_.AppendBuffer();
size_t buf_sz = buf.size();
int res = recv(socket_->native_handle(), buf.data(), buf.size(), 0);
if (res < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// we are done, there is no more socket data available to be read
return;
}
LOG(FATAL) << "Recv error: " << strerror(-res) << " errno " << errno;
return;
}
if (res == 0) {
io_ec_ = make_error_code(errc::connection_aborted);
return;
}
io_buf_.CommitWrite(res);
if (buf_sz == size_t(res)) {
return;
}
}
}

DCHECK(false) << "Sould not reach here";
}

variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
error_code ec;
ParserStatus parse_status = OK;

size_t max_iobfuf_len = GetFlag(FLAGS_max_client_iobuf_len);

auto* peer = socket_.get();
recv_buf_.res_len = 0;

// TODO EnableRecvMultishot on iourin ?

// Breaks with TLS. RegisterOnRecv is unimplemented.
peer->RegisterOnRecv([this](const FiberSocketBase::RecvNotification& n) {
DoReadOnRecv(n);
io_event_.notify();
});

do {
HandleMigrateRequest();

io_event_.await([this]() { return io_buf_.InputLen() > 0 || io_ec_; });

if (io_ec_) {
LOG_IF(WARNING, cntx()->replica_conn) << "async io error: " << ec;
return std::exchange(io_ec_, {});
}

phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;

if (redis_parser_) {
parse_status = ParseRedis(max_busy_read_cycles_cached);
} else {
DCHECK(memcache_parser_);
parse_status = ParseMemcache();
}

if (reply_builder_->GetError()) {
return reply_builder_->GetError();
}

if (parse_status == NEED_MORE) {
parse_status = OK;

size_t capacity = io_buf_.Capacity();
if (capacity < max_iobfuf_len) {
size_t parser_hint = 0;
if (redis_parser_)
parser_hint = redis_parser_->parselen_hint(); // Could be done for MC as well.

// If we got a partial request and we managed to parse its
// length, make sure we have space to store it instead of
// increasing space incrementally.
// (Note: The buffer object is only working in power-of-2 sizes,
// so there's no danger of accidental O(n^2) behavior.)
if (parser_hint > capacity) {
UpdateIoBufCapacity(io_buf_, stats_,
[&]() { io_buf_.Reserve(std::min(max_iobfuf_len, parser_hint)); });
}

// If we got a partial request because iobuf was full, grow it up to
// a reasonable limit to save on Recv() calls.
if (is_iobuf_full && capacity < max_iobfuf_len / 2) {
// Last io used most of the io_buf to the end.
UpdateIoBufCapacity(io_buf_, stats_, [&]() {
io_buf_.Reserve(capacity * 2); // Valid growth range.
});
}

if (io_buf_.AppendLen() == 0U) {
// it can happen with memcached but not for RedisParser, because RedisParser fully
// consumes the passed buffer
LOG_EVERY_T(WARNING, 10)
<< "Maximum io_buf length reached, consider to increase max_client_iobuf_len flag";
}
}
} else if (parse_status != OK) {
break;
}
} while (peer->IsOpen());

return parse_status;
}

void ResetStats() {
auto& cstats = tl_facade_stats->conn_stats;
cstats.pipelined_cmd_cnt = 0;
Expand Down
8 changes: 8 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "io/io_buf.h"
#include "util/connection.h"
#include "util/fibers/fibers.h"
#include "util/fibers/synchronization.h"
#include "util/http/http_handler.h"

typedef struct ssl_ctx_st SSL_CTX;
Expand Down Expand Up @@ -349,6 +350,10 @@ class Connection : public util::Connection {
// Main loop reading client messages and passing requests to dispatch queue.
std::variant<std::error_code, ParserStatus> IoLoop();

void DoReadOnRecv(const util::FiberSocketBase::RecvNotification& n);
// Main loop reading client messages and passing requests to dispatch queue.
std::variant<std::error_code, ParserStatus> IoLoopV2();

// Returns true if HTTP header is detected.
io::Result<bool> CheckForHttpProto();

Expand Down Expand Up @@ -427,6 +432,9 @@ class Connection : public util::Connection {
util::fb2::CondVarAny cnd_; // dispatch queue waker
util::fb2::Fiber async_fb_; // async fiber (if started)

std::error_code io_ec_;
util::fb2::EventCount io_event_;

uint64_t pending_pipeline_cmd_cnt_ = 0; // how many queued Redis async commands in dispatch_q
size_t pending_pipeline_bytes_ = 0; // how many bytes of the queued Redis async commands

Expand Down
Loading