diff --git a/src/facade/CMakeLists.txt b/src/facade/CMakeLists.txt index 3cfc0b361231..22c100bdae7a 100644 --- a/src/facade/CMakeLists.txt +++ b/src/facade/CMakeLists.txt @@ -3,7 +3,8 @@ cxx_link(dfly_parser_lib base strings_lib) add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc memcache_parser.cc reply_builder.cc op_status.cc service_interface.cc - reply_capture.cc cmd_arg_parser.cc tls_helpers.cc socket_utils.cc) + reply_capture.cc cmd_arg_parser.cc tls_helpers.cc socket_utils.cc + disk_backed_queue.cc) if (DF_USE_SSL) set(TLS_LIB tls_lib) @@ -20,6 +21,7 @@ cxx_test(memcache_parser_test dfly_facade LABELS DFLY) cxx_test(redis_parser_test facade_test LABELS DFLY) cxx_test(reply_builder_test facade_test LABELS DFLY) cxx_test(cmd_arg_parser_test facade_test LABELS DFLY) +cxx_test(disk_backed_queue_test facade_test LABELS DFLY) add_executable(ok_backend ok_main.cc) cxx_link(ok_backend dfly_facade) diff --git a/src/facade/disk_backed_queue.cc b/src/facade/disk_backed_queue.cc new file mode 100644 index 000000000000..bce606a8e2c9 --- /dev/null +++ b/src/facade/disk_backed_queue.cc @@ -0,0 +1,126 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// +// See LICENSE for licensing terms. +// + +#include "facade/disk_backed_queue.h" + +#include + +#include + +#include "base/flags.h" +#include "base/logging.h" +#include "facade/facade_types.h" +#include "util/fibers/uring_file.h" + +using facade::operator""_MB; + +ABSL_FLAG(std::string, disk_backpressure_folder, "/tmp/", + "Folder to store disk-backed connection backpressure"); + +ABSL_FLAG(size_t, disk_backpressure_file_max_bytes, 50_MB, + "Maximum size of the backing file. When max size is reached, connection will " + "stop offloading backpressure to disk and block on client read."); + +ABSL_FLAG(size_t, disk_backpressure_load_size, 30, + "How many items to load in dispatch queue from the disk-backed file."); + +namespace facade { + +DiskBackedQueue::DiskBackedQueue(uint32_t conn_id) + : max_backing_size_(absl::GetFlag(FLAGS_disk_backpressure_file_max_bytes)), + max_queue_load_size_(absl::GetFlag(FLAGS_disk_backpressure_load_size)), + id_(conn_id) { +} + +std::error_code DiskBackedQueue::Init() { + std::string backing_name = absl::StrCat(absl::GetFlag(FLAGS_disk_backpressure_folder), id_); + { + // Kernel transparently handles buffering via the page cache. + auto res = util::fb2::OpenWrite(backing_name, {} /* overwrite mode + non-direct io */); + if (!res) { + return res.error(); + } + writer_.reset(*res); + } + + auto res = util::fb2::OpenRead(backing_name); + if (!res) { + return res.error(); + } + reader_.reset(*res); + + VLOG(3) << "Created backing for connection " << this << " " << backing_name; + + return {}; +} + +DiskBackedQueue::~DiskBackedQueue() { +} + +std::error_code DiskBackedQueue::CloseWriter() { + if (writer_) { + auto ec = writer_->Close(); + LOG_IF(WARNING, ec) << ec.message(); + return ec; + } + return {}; +} + +std::error_code DiskBackedQueue::CloseReader() { + if (reader_) { + auto ec = reader_->Close(); + LOG_IF(WARNING, ec) << ec.message(); + return ec; + } + return {}; +} + +// Check if backing file is empty, i.e. backing file has 0 bytes. +bool DiskBackedQueue::Empty() const { + return total_backing_bytes_ == 0; +} + +bool DiskBackedQueue::HasEnoughBackingSpaceFor(size_t bytes) const { + return (bytes + total_backing_bytes_) < max_backing_size_; +} + +std::error_code DiskBackedQueue::Push(std::string_view blob) { + auto ec = writer_->Write(blob); + if (ec) { + VLOG(2) << "Failed to offload blob of size " << blob.size() << " to backing with error: " << ec; + return ec; + } + + total_backing_bytes_ += blob.size(); + + VLOG(2) << "Offload connection " << this << " backpressure of " << blob.size(); + VLOG(3) << "Command offloaded: " << blob; + return {}; +} + +std::error_code DiskBackedQueue::Pop(std::string* out) { + const size_t k_read_size = 4096; + const size_t to_read = std::min(k_read_size, total_backing_bytes_); + out->resize(to_read); + + io::MutableBytes bytes{reinterpret_cast(out->data()), to_read}; + auto result = reader_->Read(next_read_offset_, bytes); + if (!result) { + LOG(ERROR) << "Could not load item at offset " << next_read_offset_ << " of size " << to_read + << " from disk with error: " << result.error().value() << " " + << result.error().message(); + return result.error(); + } + + VLOG(2) << "Loaded item with offset " << next_read_offset_ << " of size " << to_read + << " for connection " << this; + + next_read_offset_ += bytes.size(); + total_backing_bytes_ -= bytes.size(); + + return {}; +} + +} // namespace facade diff --git a/src/facade/disk_backed_queue.h b/src/facade/disk_backed_queue.h new file mode 100644 index 000000000000..6fb71b5334c1 --- /dev/null +++ b/src/facade/disk_backed_queue.h @@ -0,0 +1,56 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include +#include +#include +#include + +namespace facade { + +class DiskBackedQueue { + public: + explicit DiskBackedQueue(uint32_t conn_id); + ~DiskBackedQueue(); + + std::error_code Init(); + + // Check if we can offload bytes to backing file. + bool HasEnoughBackingSpaceFor(size_t bytes) const; + + std::error_code Push(std::string_view blob); + + std::error_code Pop(std::string* out); + + // Check if backing file is empty, i.e. backing file has 0 bytes. + bool Empty() const; + + std::error_code CloseReader(); + std::error_code CloseWriter(); + + private: + // File Reader/Writer + std::unique_ptr writer_; + std::unique_ptr reader_; + + size_t total_backing_bytes_ = 0; + size_t total_backing_items_ = 0; + + size_t next_read_offset_ = 0; + + // Read only constants + const size_t max_backing_size_ = 0; + const size_t max_queue_load_size_ = 0; + + // same as connection id. Used to uniquely identify the backed file + const size_t id_ = 0; + + std::string buffer; +}; + +} // namespace facade diff --git a/src/facade/disk_backed_queue_test.cc b/src/facade/disk_backed_queue_test.cc new file mode 100644 index 000000000000..e976364cada5 --- /dev/null +++ b/src/facade/disk_backed_queue_test.cc @@ -0,0 +1,64 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "facade/disk_backed_queue.h" + +#include +#include + +#include +#include +#include + +#include "base/flags.h" +#include "base/gtest.h" +#include "base/logging.h" +#include "io/io.h" +#include "util/fibers/uring_proactor.h" + +namespace dfly { +namespace { + +using namespace facade; + +TEST(DiskBackedQueueTest, ReadWrite) { + auto proactor = std::make_unique(); + + auto pthread = std::thread{[ptr = proactor.get()] { + static_cast(ptr)->Init(0, 8); + ptr->Run(); + }}; + + proactor->Await([]() { + DiskBackedQueue backing(1 /* id */); + EXPECT_FALSE(backing.Init()); + + std::string commands; + for (size_t i = 0; i < 100; ++i) { + auto cmd = absl::StrCat("SET FOO", i, " BAR"); + EXPECT_FALSE(backing.Push(cmd)); + absl::StrAppend(&commands, cmd); + } + + std::string results; + while (!backing.Empty()) { + std::string res; + auto ec = backing.Pop(&res); + EXPECT_FALSE(ec); + absl::StrAppend(&results, res); + } + + EXPECT_EQ(results.size(), commands.size()); + EXPECT_EQ(results, commands); + + EXPECT_FALSE(backing.CloseReader()); + EXPECT_FALSE(backing.CloseWriter()); + }); + + proactor->Stop(); + pthread.join(); +} + +} // namespace +} // namespace dfly diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index a4724aa69d44..03b4288bfffd 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1,4 +1,4 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. +// Copyright 2025, DragonflyDB authors. All rights reserved. // // See LICENSE for licensing terms. // @@ -22,6 +22,7 @@ #include "base/stl_util.h" #include "core/heap_size.h" #include "facade/conn_context.h" +#include "facade/disk_backed_queue.h" #include "facade/dragonfly_listener.h" #include "facade/memcache_parser.h" #include "facade/redis_parser.h" @@ -112,6 +113,9 @@ ABSL_FLAG(uint32_t, pipeline_wait_batch_usec, 0, "If non-zero, waits for this time for more I/O " " events to come for the connection in case there is only one command in the pipeline. "); +ABSL_FLAG(size_t, disk_queue_offload_watermark, 0, + "Offload backpressure to disk when dispatch queue size crosses the watermark."); + using namespace util; using namespace std; using absl::GetFlag; @@ -676,6 +680,17 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, #endif UpdateLibNameVerMap(lib_name_, lib_ver_, +1); + + backpressure_to_disk_watermark_ = absl::GetFlag(FLAGS_disk_queue_offload_watermark); + if (backpressure_to_disk_watermark_ > 0) { + backing_queue_ = std::make_unique(id_); + auto ec = backing_queue_->Init(); + if (ec) { + LOG(ERROR) << "Error initializing disk backpressure file for connection " << id_ << ": " + << ec.message(); + backing_queue_.reset(); + } + } } Connection::~Connection() { diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 19dfef93f717..546fe6a59321 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -1,4 +1,4 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. +// Copyright 2025, DragonflyDB authors. All rights reserved. // See LICENSE for licensing terms. // @@ -45,6 +45,7 @@ class ConnectionContext; class RedisParser; class ServiceInterface; class SinkReplyBuilder; +class DiskBackedQueue; // Connection represents an active connection for a client. // @@ -503,6 +504,9 @@ class Connection : public util::Connection { }; }; + std::unique_ptr backing_queue_; + size_t backpressure_to_disk_watermark_ = 0; + bool request_shutdown_ = false; };