Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/facade/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connecti
memcache_parser.cc reply_builder.cc op_status.cc parsed_command.cc service_interface.cc
reply_capture.cc cmd_arg_parser.cc tls_helpers.cc socket_utils.cc)

target_compile_options(dfly_facade PRIVATE "-fcoroutines")


if (DF_USE_SSL)
set(TLS_LIB tls_lib)
target_compile_definitions(dfly_facade PRIVATE DFLY_USE_SSL)
Expand Down
125 changes: 55 additions & 70 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,8 @@ void Connection::AsyncOperations::operator()(ParsedCommand& cmd) {
DVLOG(2) << "Dispatching pipeline: " << cmd.Front();

++self->local_stats_.cmds;
self->service_->DispatchCommand(ParsedArgs{cmd}, &cmd);
self->service_->DispatchCommand(ParsedArgs{cmd}, &cmd,
ServiceInterface::AsyncPreference::ONLY_SYNC);

self->last_interaction_ = time(nullptr);
self->skip_next_squashing_ = false;
Expand Down Expand Up @@ -577,7 +578,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
static atomic_uint32_t next_id{1};

constexpr size_t kReqSz = sizeof(ParsedCommand);
static_assert(kReqSz <= 256);
// static_assert(kReqSz <= 256);
Copy link

Copilot AI Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The static assertion checking ParsedCommand size is commented out. This should either be updated to reflect the new size (232) or removed with explanation, as size constraints may be important for memory layout.

Suggested change
// static_assert(kReqSz <= 256);
static_assert(kReqSz == 232,
"ParsedCommand size changed; review memory-layout assumptions and update this assert.");

Copilot uses AI. Check for mistakes.

// TODO: to move parser initialization to where we initialize the reply builder.
switch (protocol) {
Expand Down Expand Up @@ -1181,7 +1182,8 @@ Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {

auto dispatch_sync = [this] {
FillBackedArgs(tmp_parse_args_, parsed_cmd_);
service_->DispatchCommand(ParsedArgs{*parsed_cmd_}, parsed_cmd_);
service_->DispatchCommand(ParsedArgs{*parsed_cmd_}, parsed_cmd_,
ServiceInterface::AsyncPreference::ONLY_SYNC);
};

auto dispatch_async = [this]() -> MessageHandle { return {FromArgs(tmp_parse_args_)}; };
Expand Down Expand Up @@ -2032,7 +2034,7 @@ bool Connection::ParseMCBatch() {
memcache_parser_->Reset();
auto client_error = [](string_view msg) { return absl::StrCat("CLIENT_ERROR ", msg); };

parsed_tail_->SetDeferredReply();
parsed_tail_->SetDeferredReply(true);
switch (result) {
case MemcacheParser::UNKNOWN_CMD:
parsed_tail_->SendSimpleString("ERROR");
Expand All @@ -2054,49 +2056,29 @@ bool Connection::ParseMCBatch() {

bool Connection::ExecuteMCBatch() {
// Execute sequentially all parsed commands.
while (parsed_to_execute_) {
auto* cmd = parsed_to_execute_;
bool is_head = (cmd == parsed_head_);

bool has_replied = false;

if (is_head) {
// Protocol parse errors create commands with already cached replies.
// Try sending payload now in case it's already set.
has_replied = cmd->SendPayload();
DVLOG(2) << "Maybe replying head: " << has_replied;
} else {
// We are not the head command, so we can not reply directly.
if (cmd->IsDeferredReply()) {
has_replied = true; // The error reply is filled by the parser.
} else {
cmd->SetDeferredReply();
}
}

if (!has_replied) {
service_->DispatchMC(cmd);

// If the reply was not deferred, then DispatchMC has surely replied.
has_replied = !cmd->IsDeferredReply();
DVLOG(2) << "Executed command, has_replied: " << has_replied;
}
parsed_to_execute_ = cmd->next;

// Only if commands have deferred replies we need to keep them in the parsed queue
// until they complete.
if (is_head && has_replied) {
// This is head and it replied to the client socket, so we can remove it from the parsed
// queue right away.
// This optimization makes the ReplyMCBatch call a no-op unless we actually run asynchronous
// commands with deferred replies.
parsed_head_ = parsed_to_execute_;
for (auto*& cmd = parsed_to_execute_; cmd; cmd = cmd->next) {
if (cmd->IsReady()) // in case parser error was set immediately
continue;

// If there are pending async commands, we require async
bool is_head = parsed_head_ == parsed_to_execute_;
auto async_pref = is_head ? ServiceInterface::AsyncPreference::PREFER_ASYNC
: ServiceInterface::AsyncPreference::REQUIRE_ASYNC;

DispatchResult result = service_->DispatchMC(cmd, async_pref);
VLOG(0) << int(result) << " -> " << int(async_pref);
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug VLOG statement at level 0 should be removed or changed to a higher level before production use.

Suggested change
VLOG(0) << int(result) << " -> " << int(async_pref);
DVLOG(1) << int(result) << " -> " << int(async_pref);

Copilot uses AI. Check for mistakes.
if (result == DispatchResult::WOULD_BLOCK)
break; // handle command next time, TODO: add flag to avoid polling

// If executed synchronously, remove from queue
if (!cmd->IsDeferredReply()) {
DCHECK_EQ(cmd, parsed_head_);
parsed_head_ = parsed_head_->next;
ReleaseParsedCommand(cmd, parsed_head_ != nullptr /* is_pipelined */);
}

if (reply_builder_->GetError()) {
if (reply_builder_->GetError())
return false;
}
}
if (parsed_head_ == nullptr)
parsed_tail_ = nullptr;
Expand All @@ -2109,20 +2091,15 @@ bool Connection::ReplyMCBatch() {
return true;
}

while (parsed_head_ != parsed_to_execute_) {
auto* cmd = parsed_head_;
if (!cmd->PollHeadForCompletion())
for (auto*& cmd = parsed_head_; cmd != parsed_to_execute_; cmd = cmd->next) {
if (!cmd->IsReady()) // can't reply yet, just break
break;

// This command finished processing and can be replied.
auto* next = cmd->next;
cmd->Reply();
ReleaseParsedCommand(cmd, cmd->next != parsed_to_execute_ /* is_pipelined */);

cmd->SendPayload();
ReleaseParsedCommand(cmd, next != parsed_to_execute_ /* is_pipelined */);
parsed_head_ = next;
if (reply_builder_->GetError()) {
if (reply_builder_->GetError())
return false;
}
}

if (parsed_head_ == nullptr)
Expand Down Expand Up @@ -2188,14 +2165,10 @@ void Connection::ReleaseParsedCommand(ParsedCommand* cmd, bool is_pipelined) {
void Connection::DestroyParsedQueue() {
while (parsed_head_ != nullptr) {
auto* cmd = parsed_head_;
stats_->dispatch_queue_bytes -= cmd->UsedMemory();
// stats_->dispatch_queue_bytes -= cmd->UsedMemory();
parsed_head_ = cmd->next;
Comment on lines +2168 to 2169
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory accounting for dispatch queue bytes is disabled. This will cause incorrect memory statistics and potential memory leak tracking issues.

Suggested change
// stats_->dispatch_queue_bytes -= cmd->UsedMemory();
parsed_head_ = cmd->next;
stats_->dispatch_queue_bytes -= cmd->UsedMemory();
parsed_head_ = cmd->next;
delete cmd;

Copilot uses AI. Check for mistakes.

if (cmd->MarkForDestruction()) { // whether async operation finished or not started
DVLOG(2) << "Deleting parsed command " << cmd;
delete cmd;
}
}

parsed_tail_ = nullptr;
parsed_cmd_q_len_ = 0;
delete parsed_cmd_;
Expand Down Expand Up @@ -2352,6 +2325,14 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {

ParserStatus parse_status = OK;

bool head_ready = false;
auto head_updater = [this, &head_ready]() {
VLOG(0) << "Waiter triggered";
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug VLOG statement at level 0 should be removed or changed to a higher level before production use.

Suggested change
VLOG(0) << "Waiter triggered";
DVLOG(2) << "Waiter triggered";

Copilot uses AI. Check for mistakes.
head_ready = true;
io_event_.notify();
};
util::fb2::detail::Waiter head_waiter{head_updater};

do {
HandleMigrateRequest();

Expand All @@ -2364,10 +2345,15 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
// The exception is when we use io_uring with multishot recv enabled, in which case
// we rely on the kernel to keep feeding us data until we multishot is disabled.
DoReadOnRecv(FiberSocketBase::RecvNotification{true});
io_event_.await([this]() {
return io_buf_.InputLen() > 0 || (parsed_head_ && parsed_head_->PollHeadForCompletion()) ||
io_ec_;
});

// Update head state and subscribe to updates if its not ready
if (parsed_head_ && parsed_head_ != parsed_to_execute_ && !head_ready)
head_ready |= parsed_head_->OnCompletion(&head_waiter);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please write explanation here what exactly we do here. It's gentle, so I prefer we have comments than going through low-level code to understand


VLOG(0) << "Rolling in with head_ready: " << head_ready;
Copy link

Copilot AI Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug VLOG statement at level 0 should be removed or changed to a higher level before production use.

Suggested change
VLOG(0) << "Rolling in with head_ready: " << head_ready;
VLOG(1) << "Rolling in with head_ready: " << head_ready;

Copilot uses AI. Check for mistakes.

io_event_.await(
[this, &head_ready]() { return io_buf_.InputLen() > 0 || head_ready || io_ec_; });
}

if (io_ec_) {
Expand All @@ -2378,6 +2364,12 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
phase_ = PROCESS;
bool is_iobuf_full = io_buf_.AppendLen() == 0;

if (reply_builder_->GetError()) // Abort on error
return reply_builder_->GetError();

if (exchange(head_ready, false)) // Proceed with replies
ReplyMCBatch();

if (io_buf_.InputLen() > 0) {
if (redis_parser_) {
parse_status = ParseRedis(max_busy_read_cycles_cached);
Expand All @@ -2387,13 +2379,6 @@ variant<error_code, Connection::ParserStatus> Connection::IoLoopV2() {
}
} else {
parse_status = NEED_MORE;
if (parsed_head_) {
ReplyMCBatch();
}
}

if (reply_builder_->GetError()) {
return reply_builder_->GetError();
}

if (parse_status == NEED_MORE) {
Expand Down
95 changes: 34 additions & 61 deletions src/facade/parsed_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "facade/parsed_command.h"

#include "base/logging.h"
#include "core/overloaded.h"
#include "facade/conn_context.h"
#include "facade/dragonfly_connection.h"
#include "facade/reply_builder.h"
Expand Down Expand Up @@ -47,11 +48,9 @@ string MCRender::RenderDeleted() const {
}

void ParsedCommand::ResetForReuse() {
allow_async_execution_ = false;
is_deferred_reply_ = false;
reply_payload_ = std::monostate{};
reply_ = std::monostate{};

state_.store(0, std::memory_order_relaxed);
offsets_.clear();
if (HeapMemory() > 1024) {
storage_.clear(); // also deallocates the heap.
Expand All @@ -63,8 +62,7 @@ void ParsedCommand::SendError(std::string_view str, std::string_view type) {
if (!is_deferred_reply_) {
rb_->SendError(str, type);
} else {
reply_payload_ = payload::make_error(str, type);
NotifyReplied();
reply_ = payload::make_error(str, type);
}
}

Expand All @@ -76,10 +74,9 @@ void ParsedCommand::SendError(facade::OpStatus status) {
rb_->SendError(StatusToMsg(status));
} else {
if (status == OpStatus::OK)
reply_payload_ = payload::SimpleString{"OK"};
reply_ = payload::SimpleString{"OK"};
else
reply_payload_ = payload::make_error(StatusToMsg(status));
NotifyReplied();
reply_ = payload::make_error(StatusToMsg(status));
}
}

Expand All @@ -93,75 +90,51 @@ void ParsedCommand::SendSimpleString(std::string_view str) {
if (!is_deferred_reply_) {
rb_->SendSimpleString(str);
} else {
reply_payload_ = payload::make_simple_or_noreply(str);
NotifyReplied();
}
}

void ParsedCommand::SendLong(long val) {
if (is_deferred_reply_) {
reply_payload_ = long(val);
NotifyReplied();
} else {
rb_->SendLong(val);
reply_ = payload::make_simple_or_noreply(str);
}
}

void ParsedCommand::SendNull() {
if (is_deferred_reply_) {
reply_payload_ = payload::Null{};
NotifyReplied();
reply_ = payload::Null{};
} else {
DCHECK(mc_cmd_ == nullptr); // RESP only
static_cast<RedisReplyBuilder*>(rb_)->SendNull();
}
}

bool ParsedCommand::SendPayload() {
if (is_deferred_reply_) {
CapturingReplyBuilder::Apply(std::move(reply_payload_), rb_);
reply_payload_ = {};
return true;
}
return false;
void ParsedCommand::Resolve(ErrorReply&& error) {
DCHECK(is_deferred_reply_);
SendError(error);
}

bool ParsedCommand::CheckDoneAndMarkHead() {
uint8_t state = state_.load(std::memory_order_acquire);

while ((state & ASYNC_REPLY_DONE) == 0) {
// If we marked it as head already, return false.
if (state & HEAD_REPLY) {
return false;
}

// Mark it as head. If succeeded (i.e ASYNC_REPLY_DONE is still not set), return false
if (state_.compare_exchange_weak(state, state | HEAD_REPLY, std::memory_order_acq_rel)) {
return false;
}
// Otherwise, retry with updated state.
}

// ASYNC_REPLY_DONE is set, return true.
return true;
bool ParsedCommand::IsReady() const {
dfly::Overloaded ov{[](const payload::Payload& pl) { return pl.index() > 0; },
[](const AsyncTask& task) { return task.blocker->IsCompleted(); }};
return visit(ov, reply_);
}

void ParsedCommand::NotifyReplied() {
// A synchronization point. We set ASYNC_REPLY_DONE to mark it's safe now to read the payload.
uint8_t prev_state = state_.fetch_or(ASYNC_REPLY_DONE, std::memory_order_acq_rel);

DVLOG(1) << "ParsedCommand::NotifyReplied with state " << unsigned(prev_state);
bool ParsedCommand::OnCompletion(util::fb2::detail::Waiter* waiter) {
dfly::Overloaded ov{[](const payload::Payload& pl) {
DCHECK(pl.index() > 0);
return true;
},
[waiter](const AsyncTask& task) {
task.blocker->OnCompletion(waiter);
return false;
}};
return visit(ov, reply_);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how the access to reply_ is synchronized between threads?
before, with state_ one a single thread could access reply_payload.
now I see OnCompletion is called from the io thread but you also write into reply_ from the shard thread

Copy link
Contributor Author

@dranikpg dranikpg Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never write into reply from the shard thread, its either assigned to the return value of the handler or set to the callback that will reply

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will need to see SET command I guess - how it propagates the Stored reply from the shard thread.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it won't propagate from the shard thread, it will work via a replier

}

if (prev_state & DELETE_INTENT) {
delete this;
return;
}
// If it was marked as head already, notify the connection that the head is done.
if (prev_state & HEAD_REPLY) {
// TODO: this might crash as we currently do not wait for async commands on connection close.
DCHECK(conn_cntx_);
conn_cntx_->conn()->Notify();
}
void ParsedCommand::Reply() {
dfly::Overloaded ov{
[this](payload::Payload&& pl) { CapturingReplyBuilder::Apply(std::move(pl), rb_); },
[](AsyncTask&& task) {
DCHECK(task.blocker->IsCompleted());
while (!task.replier.done())
task.replier.resume();
}};
visit(ov, exchange(reply_, monostate{}));
}

} // namespace facade
Loading