Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 206 additions & 3 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <numeric>
#include <variant>

#include "absl/strings/str_split.h"
#include "base/cycle_clock.h"
#include "base/flag_utils.h"
#include "base/flags.h"
Expand All @@ -27,7 +28,6 @@
#include "facade/redis_parser.h"
#include "facade/service_interface.h"
#include "facade/socket_utils.h"
#include "io/file.h"
#include "util/fibers/fibers.h"
#include "util/fibers/proactor_base.h"

Expand Down Expand Up @@ -112,6 +112,16 @@ ABSL_FLAG(uint32_t, pipeline_wait_batch_usec, 0,
"If non-zero, waits for this time for more I/O "
" events to come for the connection in case there is only one command in the pipeline. ");

ABSL_FLAG(size_t, connection_disk_backpressure_watermark, 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the unit here? what crosses the watermark?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dispatch_q_ > watermark

Will edit the comment/description accordingly

"Offload dispach queue backpressure to disk when it crosses watermark. (0 to disable)");

ABSL_FLAG(size_t, connection_disk_backpressure_file_max_bytes, 50_MB,
"Maximum size of the backing file. When the watermark is reached, connection will "
"stop offloading backpressure to disk");

ABSL_FLAG(size_t, connection_disk_backpressure_load_size, 30,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

30 is low IMO, any preference for default value ?

"How many queue backpressure items to load from disk when dispatch queue is drained");

using namespace util;
using namespace std;
using absl::GetFlag;
Expand Down Expand Up @@ -434,6 +444,10 @@ size_t Connection::PipelineMessage::StorageCapacity() const {
return storage.capacity() + args.capacity();
}

size_t Connection::PipelineMessage::StorageBytes() const {
return storage.size();
}

size_t Connection::MessageHandle::UsedMemory() const {
struct MessageSize {
size_t operator()(const PubMessagePtr& msg) {
Expand Down Expand Up @@ -676,6 +690,11 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
#endif

UpdateLibNameVerMap(lib_name_, lib_ver_, +1);

const size_t disk_watermark = absl::GetFlag(FLAGS_connection_disk_backpressure_watermark);
if (disk_watermark) {
backing_queue_ = std::make_unique<DiskBackedBackpressureQueue>();
}
}

Connection::~Connection() {
Expand Down Expand Up @@ -1162,8 +1181,12 @@ void Connection::ConnectionFlow() {

void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
absl::FunctionRef<MessageHandle()> cmd_msg_cb) {
bool optimize_for_async = has_more;
QueueBackpressure& qbp = GetQueueBackpressure();
if (OffloadBackpressureToDiskIfNeeded(cmd_msg_cb)) {
return;
}

bool optimize_for_async = has_more;
if (optimize_for_async &&
qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size())) {
stats_->pipeline_throttle_count++;
Expand Down Expand Up @@ -1683,12 +1706,23 @@ void Connection::AsyncFiber() {
QueueBackpressure& qbp = GetQueueBackpressure();
while (!reply_builder_->GetError()) {
DCHECK_EQ(socket()->proactor(), ProactorBase::me());

LoadBackpressureFromDiskIfNeeded();

cnd_.wait(noop_lk, [this] {
return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch);
return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch) ||
disk_backpressure_available_;
});

if (cc_->conn_closing)
break;

if (disk_backpressure_available_) {
LoadBackpressureFromDiskIfNeeded();
DCHECK(dispatch_q_.size() > 0);
disk_backpressure_available_ = false;
}

// We really want to have batching in the builder if possible. This is especially
// critical in situations where Nagle's algorithm can introduce unwanted high
// latencies. However we can only batch if we're sure that there are more commands
Expand Down Expand Up @@ -2254,4 +2288,173 @@ void ResetStats() {
io_req_size_hist->Clear();
}

bool Connection::OffloadBackpressureToDiskIfNeeded(absl::FunctionRef<MessageHandle()> handle) {
// Offload only when dispatch_q_ crosses watermark or when backing queue already
// has pending items.
if (backing_queue_ &&
((dispatch_q_.size() > backing_queue_->Watermark()) || !backing_queue_->Empty())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i prefer that DiskOffloadThreshold() would be in QueueBackpressure rather than in backing_queue_ , this way you do not need to touch backing_queue_ object during the happy path.

auto ec = backing_queue_->Init();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have not looked at Init, but i prefer to have explicit initalization of resources at the beginning of the flow (where you create backing_queue_) rather than doing it lazily

LOG_IF(ERROR, ec) << "Failed to init disk backed backpressure with error " << ec.message();

MessageHandle msg;
if (!ec) {
msg = handle();
PipelineMessage* pmsg = std::get<Connection::PipelineMessagePtr>(msg.handle).get();
if (backing_queue_->HasEnoughBackingSpace(pmsg)) {
backing_queue_->OffloadToBacking(pmsg);
if (dispatch_q_.size() == 0) {
disk_backpressure_available_ = true;
cnd_.notify_one();
}
// Recycle message
QueueBackpressure& qbp = GetQueueBackpressure();
if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) {
stats_->pipeline_cmd_cache_bytes += pmsg->StorageCapacity();
pipeline_req_pool_.push_back(
std::move(std::get<Connection::PipelineMessagePtr>(msg.handle)));
}
// item offloaded to disk without errors, unblock connection fiber
return true;
}
LOG(WARNING) << "Disk backpressure file size limit reached. Could not offload backpressure.";
}
}
return false;
}

void Connection::LoadBackpressureFromDiskIfNeeded() {
if (HasDiskBacked()) {
auto q_insert_cb = [this](io::MutableBytes bytes) {
PipelineMessagePtr ptr;
if (ptr = GetFromPipelinePool(); ptr) {
ptr->storage.resize(bytes.size());
} else {
ptr = make_unique<PipelineMessage>(1, 1);
ptr->storage.resize(bytes.size());
}

memcpy(ptr->storage.begin(), bytes.begin(), bytes.size());
std::string_view read{reinterpret_cast<char*>(ptr->storage.data()), bytes.size()};
ptr->args = absl::StrSplit(read, '\0', absl::SkipEmpty());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does it mean? we do not support binary strings ?

Copy link
Contributor Author

@kostasrim kostasrim Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's serialized and deserialized as is: when we parse the command from the socket we create a PipelineMsg. Within that, there is:

  1. a flat storage which is the whole command + args separated by \0
  2. a view (span) that can iterate over each argument (cmd + args)

(see FromArgs{} function)

When we offload this, we offload the flat blob storage as is to disk. Loading is easy as reading the bytes directly back to a PipelinePtr. the StrSplit is a fancy way to build the (2) view of the flat storage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is 1-1 with FromArgs and since the object layout was already flat -- it was easy to write it to disk as is

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, writing ito do disk is easy but I think your conclusion is wrong, and you can not load binary strings that contain \0 inside using split(). you will need to serialize argument sizes to make it correct.

In fact, I would like to challenge your design choices:

  1. the pipeline is offloaded after the parsing.
  2. this means you need to serialize structured data, we already saw you keep offsets to keep track of the blob sizes, for example inside your disk queue.
  3. but you also need to parse back these argument sets. it require more complicated serialisation to do it properly.

Instead you could offload raw socket data before parsing:

  1. you save CPU on parsing data that you later offload
  2. you do not need to store blob sizes, so your on-disk queue is simpler
  3. there is no on-disk format at all - you do not need to worry about serializing command arguments properly.

So your file becomes a full substitute for socket until it depletes.

Copy link
Contributor Author

@kostasrim kostasrim Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, writing ito do disk is easy but I think your conclusion is wrong

I am not sure I understand why \0 is a problem here. For example a command set foo bar is parsed and stored as set\0foo\0bar\0 both in memory and on disk. Using split() after loading this to build a view of the command arguments (e,g {set, foo, bar} is perfectly valid in that context. Am I missing something ?

Instead you could offload raw socket data before parsing:

My concern for this would be that parsing should now also be a part of AsyncFb. Furthermore, parsing a raw socket blob that was written to disk needs to handle cases like INPUT_PENDING, which introduces additional preemption points to the async fiber outside of a) blocking on queue backpressure b) blocking on loading the next batch to execute from disk. For example:

async_fb-> blocks on send because client is not reading, dispatch queue grows
conn_fb -> reads from socket starts offloading disk queue
client -> stops sending commands
conn_fb-> blocks on Connection::HandleRecvSocket
async_fb->drains the dispatch_queue

We need to load from disk the blob, parse it and place it in the dispatch_q. conn_fiber is blocked on socket io. There are two options:

  1. coordinate and somehow unblock HandleRecvSocket such that ConnFiber can do a disk read + parsing, construct the msg and send it to the dispatch_q.
  2. read from disk and do the parsing inside AsyncFiber.

If we choose (2) and since ConnFb offloads unparsed data, now AsyncFb can parse half entries (because of INPUT_PENDING). So if the last entry written to disk is half (because the other one will arrive soon in the socket -- in other words we just drained the disk backpressure):

  1. AsyncFb needs to read more to finish parsing -> preempts
  2. ConnFb reads from socket -> writes it to backing.

So now ConnFb must notify the AsyncFb to continue parsing.

This is something that can be handled but nevertheless it seems that now parsing responsibilities of the ConnFb leak to the AsyncFb. What is more, now the complicated parsing stages (like INPUT_PENDING) must also be handled by AsyncFb.

you save CPU on parsing data that you later offload

I am slightly confused over this as well. Didn't you suggest not to use the uring api at all which implies a full proactor block on that thread on io ? Also, assuming that the client will eventually read everything, won't we parse the data anyway once? What we offload is what we will eventually load and this is already parsed(apart of the split which just builds a view). So we don't actually save anything here -- we just delegate the parsing for a later time. What I mean here:

parse -> write to disk size(data) + parsed -> parse 2 -> write to disk size(data 2)
parse 2 times, writes 2 times
vs
write to disk 1 -> write to disk 2 -> parse 1 -> parse2
parse 2 times, writes 2 times

All in all, I am not against your solution I just want to understand it a little bit more because the tradeoff here seems to be:

prefix the parsed command with its size (and now we don't need extra memory) before writting to disk -> self contained change within DiskBackpressure

vs

write blobs -> requires parsing within AsyncFb

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kostasrim with set foo bar there is no problem. the problem is with binary strings that you send with RESP protocol like *3$3\r\nSET\r\n$3\r\nf\0\0\r\n$3\r\nb\0\0\r\n or in other words where we send set f\0\0 b\0\0.
And it's trivially to test and reproduce the problem.

regarding your second point - no, AsyncFb should not handle parsing, ConnFb should do it. And I agree that HandleRecvSocket is problematic, but lets first understand what SHOULD be done and then discuss how we do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh interesting so, set f\0\0 is a valid command. I now understand what you were talking about.

Ok let's sync internally.


SendAsync({.handle = std::move(ptr)});
};

backing_queue_->LoadFromDiskToQueue(q_insert_cb);
}
}

size_t Connection::DiskBackedBackpressureQueue::unique_id = 0;

Connection::DiskBackedBackpressureQueue::DiskBackedBackpressureQueue()
: max_backing_size_(absl::GetFlag(FLAGS_connection_disk_backpressure_file_max_bytes)),
max_queue_load_size_(absl::GetFlag(FLAGS_connection_disk_backpressure_load_size)),
watermark_(absl::GetFlag(FLAGS_connection_disk_backpressure_watermark)) {
id_ = ++unique_id;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is buggy as DiskBackedBackpressureQueue is initialized in multiple threads.

Copy link
Contributor Author

@kostasrim kostasrim Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's a data race indeed. I wanted a unique identifier. will fix

}

std::error_code Connection::DiskBackedBackpressureQueue::Init() {
if (init_) {
return {};
}

std::string backing_name = absl::StrCat("/tmp/backing_", id_);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we introduce a flag for those backing files or /tmp/ is enough?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should provide a configurable prefix, /tmp/ is not enough.

{
// Kernel transparently handles buffering via the page cache.
auto res = util::fb2::OpenWrite(backing_name, {} /* overwrite mode + non direct io */);
if (!res) {
return res.error();
}
writer_.reset(*res);
}

auto res = util::fb2::OpenRead(backing_name);
if (!res) {
return res.error();
}
reader_.reset(*res);

VLOG(3) << "Created backing for connection " << this << " " << backing_name;
init_ = true;

return {};
}

bool Connection::DiskBackedBackpressureQueue::Empty() const {
return total_backing_bytes_ == 0;
}

bool Connection::DiskBackedBackpressureQueue::HasEnoughBackingSpace(
const Connection::PipelineMessage* msg) const {
return (msg->StorageBytes() + total_backing_bytes_) < max_backing_size_;
}

size_t Connection::DiskBackedBackpressureQueue::TotalInMemoryBytes() const {
return offsets_.size() * sizeof(ItemOffset);
}

void Connection::DiskBackedBackpressureQueue::OffloadToBacking(
const Connection::PipelineMessage* msg) {
ItemOffset item;
item.offset = next_offset_;
item.total_bytes = msg->FullCommand().size();

size_t start = absl::GetCurrentTimeNanos();

// TODO we should truncate as the file grows. That way we never end up with large files
// on disk.
auto res = writer_->Write(msg->FullCommand());
if (res) {
VLOG(2) << "Failed to offload connection " << this << " backpressure with offset "
<< item.offset << " of size " << item.total_bytes << " to backing with error: " << res;
return;
}

// Only update for non error paths
size_t end = absl::GetCurrentTimeNanos();
max_io_write_latency_ = std::max(max_io_write_latency_, (end - start));

total_backing_bytes_ += msg->FullCommand().size();
offsets_.push_back(item);
next_offset_ += item.total_bytes;

VLOG(2) << "Offload connection " << this << " backpressure of " << item.total_bytes
<< " bytes to disk at offset: " << item.offset;
VLOG(3) << "Command offloaded: " << msg->FullCommand();
}

template <typename F> void Connection::DiskBackedBackpressureQueue::LoadFromDiskToQueue(F f) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

template is not justified here. please use std::function instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy to use a function here.

template is not justified here.

For completeness, this is not how I think about it. std::function allocates dynamically and a deduced lambda argument from a function template parameter does not. Using the most generic form (std::function) in a non generic context (a plain function) is the overkill here.

std::string buffer;
size_t up_to = max_queue_load_size_;

size_t start = absl::GetCurrentTimeNanos();

while (!offsets_.empty() && up_to--) {
ItemOffset item = offsets_.front();

buffer.resize(item.total_bytes);

io::MutableBytes bytes{reinterpret_cast<uint8_t*>(buffer.data()), item.total_bytes};
auto result = reader_->Read(item.offset, bytes);
if (!result) {
LOG(ERROR) << "Could not load item at offset " << item.offset << " of size "
<< item.total_bytes << " from disk with error: " << result.error().value() << " "
<< result.error().message();
return;
}

VLOG(2) << "Loaded item with offset " << item.offset << " of size " << item.total_bytes
<< " for connection " << this;

f(bytes);

offsets_.pop_front();
total_backing_bytes_ -= item.total_bytes;
}

// Only update for non error paths
size_t end = absl::GetCurrentTimeNanos();
max_io_read_latency_ = std::max(max_io_read_latency_, (end - start));
}

} // namespace facade
77 changes: 77 additions & 0 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "facade/facade_types.h"
#include "facade/memcache_parser.h"
#include "facade/resp_expr.h"
#include "io/file.h"
#include "io/io_buf.h"
#include "util/connection.h"
#include "util/fibers/fibers.h"
Expand Down Expand Up @@ -90,6 +91,13 @@ class Connection : public util::Connection {

size_t StorageCapacity() const;

// Used by file backed queue back pressure to reconstruct a PipelineMessage
size_t StorageBytes() const;

std::string_view FullCommand() const {
return {storage.data(), storage.size()};
}

// mi_stl_allocator uses mi heap internally.
// The capacity is chosen so that we allocate a fully utilized (256 bytes) block.
using StorageType = absl::InlinedVector<char, kReqStorageSize>;
Expand Down Expand Up @@ -401,6 +409,13 @@ class Connection : public util::Connection {

void ConfigureProvidedBuffer();

bool HasDiskBacked() const {
return backing_queue_ && !backing_queue_->Empty();
}

bool OffloadBackpressureToDiskIfNeeded(absl::FunctionRef<MessageHandle()> handle);

void LoadBackpressureFromDiskIfNeeded();
// The read buffer with read data that needs to be parsed and processed.
// For io_uring bundles we may have available_bytes larger than slice.size()
// which means that there are more buffers available to read.
Expand Down Expand Up @@ -504,6 +519,68 @@ class Connection : public util::Connection {
};

bool request_shutdown_ = false;

class DiskBackedBackpressureQueue {
Copy link
Collaborator

@romange romange Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the pr is large, lets do the usual:
split it in self contained chunks. specifically, this should reside in dedicated files.

public:
DiskBackedBackpressureQueue();

// Init on first call, no-op afterwards.
std::error_code Init();

// Check if backing file is empty, i.e. backing file has 0 bytes.
bool Empty() const;

// Check if we can offload msg to backing file.
bool HasEnoughBackingSpace(const Connection::PipelineMessage* msg) const;

// Total size of internal buffers/structures.
size_t TotalInMemoryBytes() const;

void OffloadToBacking(const Connection::PipelineMessage* msg);

// For each item loaded from disk it calls f(item) to consume it.
// Reads up to max_queue_load_size_ items on each call
template <typename F> void LoadFromDiskToQueue(F f);

size_t Watermark() const {
return watermark_;
}

private:
static size_t unique_id;

// File Reader/Writer
std::unique_ptr<io::WriteFile> writer_;
std::unique_ptr<io::ReadonlyFile> reader_;

// In memory backed file map
struct ItemOffset {
size_t offset = 0;
size_t total_bytes = 0;
};

std::deque<ItemOffset> offsets_;

// unique id for the file backed
size_t id_ = 0;

// stats
size_t total_backing_bytes_ = 0;
size_t next_offset_ = 0;
size_t max_io_read_latency_ = 0;
size_t max_io_write_latency_ = 0;

// Read only constants
const size_t max_backing_size_ = 0;
const size_t max_queue_load_size_ = 0;
const size_t watermark_ = 0;

// Idempotent init
bool init_ = false;
};

std::unique_ptr<DiskBackedBackpressureQueue> backing_queue_;
bool disk_backpressure_available_ = false;
};

} // namespace facade
Loading
Loading