From 607c9bcca200124745af6c0d9781f4db4b8d3a81 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Tue, 4 Nov 2025 22:16:39 +0200 Subject: [PATCH] chore: disk based backpressure for connections Signed-off-by: Kostas Kyrimis --- src/facade/dragonfly_connection.cc | 209 ++++++++++++++++++++++++++++- src/facade/dragonfly_connection.h | 77 +++++++++++ tests/dragonfly/connection_test.py | 38 ++++++ 3 files changed, 321 insertions(+), 3 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index a4724aa69d44..2a03304517ae 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -13,6 +13,7 @@ #include #include +#include "absl/strings/str_split.h" #include "base/cycle_clock.h" #include "base/flag_utils.h" #include "base/flags.h" @@ -27,7 +28,6 @@ #include "facade/redis_parser.h" #include "facade/service_interface.h" #include "facade/socket_utils.h" -#include "io/file.h" #include "util/fibers/fibers.h" #include "util/fibers/proactor_base.h" @@ -112,6 +112,16 @@ ABSL_FLAG(uint32_t, pipeline_wait_batch_usec, 0, "If non-zero, waits for this time for more I/O " " events to come for the connection in case there is only one command in the pipeline. "); +ABSL_FLAG(size_t, connection_disk_backpressure_watermark, 0, + "Offload dispach queue backpressure to disk when it crosses watermark. (0 to disable)"); + +ABSL_FLAG(size_t, connection_disk_backpressure_file_max_bytes, 50_MB, + "Maximum size of the backing file. When the watermark is reached, connection will " + "stop offloading backpressure to disk"); + +ABSL_FLAG(size_t, connection_disk_backpressure_load_size, 30, + "How many queue backpressure items to load from disk when dispatch queue is drained"); + using namespace util; using namespace std; using absl::GetFlag; @@ -434,6 +444,10 @@ size_t Connection::PipelineMessage::StorageCapacity() const { return storage.capacity() + args.capacity(); } +size_t Connection::PipelineMessage::StorageBytes() const { + return storage.size(); +} + size_t Connection::MessageHandle::UsedMemory() const { struct MessageSize { size_t operator()(const PubMessagePtr& msg) { @@ -676,6 +690,11 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, #endif UpdateLibNameVerMap(lib_name_, lib_ver_, +1); + + const size_t disk_watermark = absl::GetFlag(FLAGS_connection_disk_backpressure_watermark); + if (disk_watermark) { + backing_queue_ = std::make_unique(); + } } Connection::~Connection() { @@ -1162,8 +1181,12 @@ void Connection::ConnectionFlow() { void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_cb, absl::FunctionRef cmd_msg_cb) { - bool optimize_for_async = has_more; QueueBackpressure& qbp = GetQueueBackpressure(); + if (OffloadBackpressureToDiskIfNeeded(cmd_msg_cb)) { + return; + } + + bool optimize_for_async = has_more; if (optimize_for_async && qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size())) { stats_->pipeline_throttle_count++; @@ -1683,12 +1706,23 @@ void Connection::AsyncFiber() { QueueBackpressure& qbp = GetQueueBackpressure(); while (!reply_builder_->GetError()) { DCHECK_EQ(socket()->proactor(), ProactorBase::me()); + + LoadBackpressureFromDiskIfNeeded(); + cnd_.wait(noop_lk, [this] { - return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch); + return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch) || + disk_backpressure_available_; }); + if (cc_->conn_closing) break; + if (disk_backpressure_available_) { + LoadBackpressureFromDiskIfNeeded(); + DCHECK(dispatch_q_.size() > 0); + disk_backpressure_available_ = false; + } + // We really want to have batching in the builder if possible. This is especially // critical in situations where Nagle's algorithm can introduce unwanted high // latencies. However we can only batch if we're sure that there are more commands @@ -2254,4 +2288,173 @@ void ResetStats() { io_req_size_hist->Clear(); } +bool Connection::OffloadBackpressureToDiskIfNeeded(absl::FunctionRef handle) { + // Offload only when dispatch_q_ crosses watermark or when backing queue already + // has pending items. + if (backing_queue_ && + ((dispatch_q_.size() > backing_queue_->Watermark()) || !backing_queue_->Empty())) { + auto ec = backing_queue_->Init(); + LOG_IF(ERROR, ec) << "Failed to init disk backed backpressure with error " << ec.message(); + + MessageHandle msg; + if (!ec) { + msg = handle(); + PipelineMessage* pmsg = std::get(msg.handle).get(); + if (backing_queue_->HasEnoughBackingSpace(pmsg)) { + backing_queue_->OffloadToBacking(pmsg); + if (dispatch_q_.size() == 0) { + disk_backpressure_available_ = true; + cnd_.notify_one(); + } + // Recycle message + QueueBackpressure& qbp = GetQueueBackpressure(); + if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) { + stats_->pipeline_cmd_cache_bytes += pmsg->StorageCapacity(); + pipeline_req_pool_.push_back( + std::move(std::get(msg.handle))); + } + // item offloaded to disk without errors, unblock connection fiber + return true; + } + LOG(WARNING) << "Disk backpressure file size limit reached. Could not offload backpressure."; + } + } + return false; +} + +void Connection::LoadBackpressureFromDiskIfNeeded() { + if (HasDiskBacked()) { + auto q_insert_cb = [this](io::MutableBytes bytes) { + PipelineMessagePtr ptr; + if (ptr = GetFromPipelinePool(); ptr) { + ptr->storage.resize(bytes.size()); + } else { + ptr = make_unique(1, 1); + ptr->storage.resize(bytes.size()); + } + + memcpy(ptr->storage.begin(), bytes.begin(), bytes.size()); + std::string_view read{reinterpret_cast(ptr->storage.data()), bytes.size()}; + ptr->args = absl::StrSplit(read, '\0', absl::SkipEmpty()); + + SendAsync({.handle = std::move(ptr)}); + }; + + backing_queue_->LoadFromDiskToQueue(q_insert_cb); + } +} + +size_t Connection::DiskBackedBackpressureQueue::unique_id = 0; + +Connection::DiskBackedBackpressureQueue::DiskBackedBackpressureQueue() + : max_backing_size_(absl::GetFlag(FLAGS_connection_disk_backpressure_file_max_bytes)), + max_queue_load_size_(absl::GetFlag(FLAGS_connection_disk_backpressure_load_size)), + watermark_(absl::GetFlag(FLAGS_connection_disk_backpressure_watermark)) { + id_ = ++unique_id; +} + +std::error_code Connection::DiskBackedBackpressureQueue::Init() { + if (init_) { + return {}; + } + + std::string backing_name = absl::StrCat("/tmp/backing_", id_); + { + // Kernel transparently handles buffering via the page cache. + auto res = util::fb2::OpenWrite(backing_name, {} /* overwrite mode + non direct io */); + if (!res) { + return res.error(); + } + writer_.reset(*res); + } + + auto res = util::fb2::OpenRead(backing_name); + if (!res) { + return res.error(); + } + reader_.reset(*res); + + VLOG(3) << "Created backing for connection " << this << " " << backing_name; + init_ = true; + + return {}; +} + +bool Connection::DiskBackedBackpressureQueue::Empty() const { + return total_backing_bytes_ == 0; +} + +bool Connection::DiskBackedBackpressureQueue::HasEnoughBackingSpace( + const Connection::PipelineMessage* msg) const { + return (msg->StorageBytes() + total_backing_bytes_) < max_backing_size_; +} + +size_t Connection::DiskBackedBackpressureQueue::TotalInMemoryBytes() const { + return offsets_.size() * sizeof(ItemOffset); +} + +void Connection::DiskBackedBackpressureQueue::OffloadToBacking( + const Connection::PipelineMessage* msg) { + ItemOffset item; + item.offset = next_offset_; + item.total_bytes = msg->FullCommand().size(); + + size_t start = absl::GetCurrentTimeNanos(); + + // TODO we should truncate as the file grows. That way we never end up with large files + // on disk. + auto res = writer_->Write(msg->FullCommand()); + if (res) { + VLOG(2) << "Failed to offload connection " << this << " backpressure with offset " + << item.offset << " of size " << item.total_bytes << " to backing with error: " << res; + return; + } + + // Only update for non error paths + size_t end = absl::GetCurrentTimeNanos(); + max_io_write_latency_ = std::max(max_io_write_latency_, (end - start)); + + total_backing_bytes_ += msg->FullCommand().size(); + offsets_.push_back(item); + next_offset_ += item.total_bytes; + + VLOG(2) << "Offload connection " << this << " backpressure of " << item.total_bytes + << " bytes to disk at offset: " << item.offset; + VLOG(3) << "Command offloaded: " << msg->FullCommand(); +} + +template void Connection::DiskBackedBackpressureQueue::LoadFromDiskToQueue(F f) { + std::string buffer; + size_t up_to = max_queue_load_size_; + + size_t start = absl::GetCurrentTimeNanos(); + + while (!offsets_.empty() && up_to--) { + ItemOffset item = offsets_.front(); + + buffer.resize(item.total_bytes); + + io::MutableBytes bytes{reinterpret_cast(buffer.data()), item.total_bytes}; + auto result = reader_->Read(item.offset, bytes); + if (!result) { + LOG(ERROR) << "Could not load item at offset " << item.offset << " of size " + << item.total_bytes << " from disk with error: " << result.error().value() << " " + << result.error().message(); + return; + } + + VLOG(2) << "Loaded item with offset " << item.offset << " of size " << item.total_bytes + << " for connection " << this; + + f(bytes); + + offsets_.pop_front(); + total_backing_bytes_ -= item.total_bytes; + } + + // Only update for non error paths + size_t end = absl::GetCurrentTimeNanos(); + max_io_read_latency_ = std::max(max_io_read_latency_, (end - start)); +} + } // namespace facade diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 19dfef93f717..a568bf6d9e3c 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -17,6 +17,7 @@ #include "facade/facade_types.h" #include "facade/memcache_parser.h" #include "facade/resp_expr.h" +#include "io/file.h" #include "io/io_buf.h" #include "util/connection.h" #include "util/fibers/fibers.h" @@ -90,6 +91,13 @@ class Connection : public util::Connection { size_t StorageCapacity() const; + // Used by file backed queue back pressure to reconstruct a PipelineMessage + size_t StorageBytes() const; + + std::string_view FullCommand() const { + return {storage.data(), storage.size()}; + } + // mi_stl_allocator uses mi heap internally. // The capacity is chosen so that we allocate a fully utilized (256 bytes) block. using StorageType = absl::InlinedVector; @@ -401,6 +409,13 @@ class Connection : public util::Connection { void ConfigureProvidedBuffer(); + bool HasDiskBacked() const { + return backing_queue_ && !backing_queue_->Empty(); + } + + bool OffloadBackpressureToDiskIfNeeded(absl::FunctionRef handle); + + void LoadBackpressureFromDiskIfNeeded(); // The read buffer with read data that needs to be parsed and processed. // For io_uring bundles we may have available_bytes larger than slice.size() // which means that there are more buffers available to read. @@ -504,6 +519,68 @@ class Connection : public util::Connection { }; bool request_shutdown_ = false; + + class DiskBackedBackpressureQueue { + public: + DiskBackedBackpressureQueue(); + + // Init on first call, no-op afterwards. + std::error_code Init(); + + // Check if backing file is empty, i.e. backing file has 0 bytes. + bool Empty() const; + + // Check if we can offload msg to backing file. + bool HasEnoughBackingSpace(const Connection::PipelineMessage* msg) const; + + // Total size of internal buffers/structures. + size_t TotalInMemoryBytes() const; + + void OffloadToBacking(const Connection::PipelineMessage* msg); + + // For each item loaded from disk it calls f(item) to consume it. + // Reads up to max_queue_load_size_ items on each call + template void LoadFromDiskToQueue(F f); + + size_t Watermark() const { + return watermark_; + } + + private: + static size_t unique_id; + + // File Reader/Writer + std::unique_ptr writer_; + std::unique_ptr reader_; + + // In memory backed file map + struct ItemOffset { + size_t offset = 0; + size_t total_bytes = 0; + }; + + std::deque offsets_; + + // unique id for the file backed + size_t id_ = 0; + + // stats + size_t total_backing_bytes_ = 0; + size_t next_offset_ = 0; + size_t max_io_read_latency_ = 0; + size_t max_io_write_latency_ = 0; + + // Read only constants + const size_t max_backing_size_ = 0; + const size_t max_queue_load_size_ = 0; + const size_t watermark_ = 0; + + // Idempotent init + bool init_ = false; + }; + + std::unique_ptr backing_queue_; + bool disk_backpressure_available_ = false; }; } // namespace facade diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 67c7079bb403..3067ba667be4 100644 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -1501,3 +1501,41 @@ async def test_issue_5949_nil_bulk_string_crash(df_server: DflyInstance): client = df_server.client() await client.ping() assert await client.ping() == True + + +@dfly_args( + { + "connection_disk_backpressure_watermark": 10, + "connection_disk_backpressure_file_max_bytes": 10000000000, + } +) +async def test_send_disk_backpressure(df_server, async_client: aioredis.Redis): + reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port) + total_items_send = 5000 + total_per_item_cmds = 200 + total = total_items_send * total_per_item_cmds + + async def set_task(): + stride = 0 + for i in range(0, total_items_send): + pipe = "" + for j in range(0, total_per_item_cmds): + pipe = pipe + f"SET a{stride + j} foo\n" + stride = stride + total_per_item_cmds + writer.write(pipe.encode()) + await writer.drain() + + set = asyncio.create_task(set_task()) + + await set + logging.info("Offloaded all data to socket. Draining responses...") + # Drain the read buffer + while True: + try: + response = await asyncio.wait_for(reader.readline(), timeout=2) + except asyncio.TimeoutError: + break + + # Check that the offloaded backpressure was peroperly handled + res = await async_client.dbsize() + assert res == total