Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/facade/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ cxx_link(dfly_parser_lib base strings_lib)

add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc
memcache_parser.cc reply_builder.cc op_status.cc service_interface.cc
reply_capture.cc cmd_arg_parser.cc tls_helpers.cc socket_utils.cc)
reply_capture.cc cmd_arg_parser.cc tls_helpers.cc socket_utils.cc
disk_backed_queue.cc)

if (DF_USE_SSL)
set(TLS_LIB tls_lib)
Expand All @@ -20,6 +21,7 @@ cxx_test(memcache_parser_test dfly_facade LABELS DFLY)
cxx_test(redis_parser_test facade_test LABELS DFLY)
cxx_test(reply_builder_test facade_test LABELS DFLY)
cxx_test(cmd_arg_parser_test facade_test LABELS DFLY)
cxx_test(disk_backed_queue_test facade_test LABELS DFLY)

add_executable(ok_backend ok_main.cc)
cxx_link(ok_backend dfly_facade)
Expand Down
154 changes: 154 additions & 0 deletions src/facade/disk_backed_queue.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2025, DragonflyDB authors. All rights reserved.
//
// See LICENSE for licensing terms.
//

#include "facade/disk_backed_queue.h"

#include <absl/strings/str_cat.h>

#include <string>

#include "base/flags.h"
#include "base/logging.h"
#include "facade/facade_types.h"
#include "io/io.h"
#include "util/fibers/uring_file.h"

using facade::operator""_MB;

ABSL_FLAG(std::string, disk_backpressure_folder, "/tmp/",
"Folder to store disk-backed connection backpressure");

ABSL_FLAG(size_t, disk_backpressure_file_max_bytes, 50_MB,
"Maximum size of the backing file. When max size is reached, connection will "
"stop offloading backpressure to disk and block on client read.");

ABSL_FLAG(size_t, disk_backpressure_load_size, 30,
"How many items to load in dispatch queue from the disk-backed file.");

namespace facade {

DiskBackedQueue::DiskBackedQueue(uint32_t conn_id)
: max_backing_size_(absl::GetFlag(FLAGS_disk_backpressure_file_max_bytes)),
max_queue_load_size_(absl::GetFlag(FLAGS_disk_backpressure_load_size)),
id_(conn_id) {
}

std::error_code DiskBackedQueue::Init() {
std::string backing_name = absl::StrCat(absl::GetFlag(FLAGS_disk_backpressure_folder), id_);
{
// Kernel transparently handles buffering via the page cache.
auto res = util::fb2::OpenWrite(backing_name, {} /* overwrite mode + non-direct io */);
if (!res) {
return res.error();
}
writer_.reset(*res);
}

auto res = util::fb2::OpenRead(backing_name);
if (!res) {
return res.error();
}
reader_.reset(*res);

VLOG(3) << "Created backing for connection " << this << " " << backing_name;

return {};
}

DiskBackedQueue::~DiskBackedQueue() {
}

std::error_code DiskBackedQueue::CloseWriter() {
if (writer_) {
auto ec = writer_->Close();
LOG_IF(WARNING, ec) << ec.message();
return ec;
}
return {};
}

std::error_code DiskBackedQueue::CloseReader() {
if (reader_) {
auto ec = reader_->Close();
LOG_IF(WARNING, ec) << ec.message();
return ec;
}
return {};
}

// Check if backing file is empty, i.e. backing file has 0 bytes.
bool DiskBackedQueue::Empty() const {
return total_backing_bytes_ == 0;
}

bool DiskBackedQueue::HasEnoughBackingSpaceFor(size_t bytes) const {
return (bytes + total_backing_bytes_) < max_backing_size_;
}

std::error_code DiskBackedQueue::Push(std::string_view blob) {
// TODO we should truncate as the file grows. That way we never end up with large files
// on disk.
uint32_t sz = blob.size();
// We serialize the string as is and we prefix with 4 bytes denoting its size. The layout is:
// 4bytes(str size) + followed by blob.size() bytes
iovec offset_data[2]{{&sz, sizeof(uint32_t)}, {const_cast<char*>(blob.data()), blob.size()}};

auto ec = writer_->Write(offset_data, 2);
if (ec) {
VLOG(2) << "Failed to offload blob of size " << sz << " to backing with error: " << ec;
return ec;
}

total_backing_bytes_ += blob.size();
++total_backing_items_;

if (next_item_total_bytes_ == 0) {
next_item_total_bytes_ = blob.size();
}

VLOG(2) << "Offload connection " << this << " backpressure of " << blob.size();
VLOG(3) << "Command offloaded: " << blob;
return {};
}

std::error_code DiskBackedQueue::Pop(std::string* out) {
// We read the next item and (if there are more) we also prefetch the next item's size.
uint32_t read_sz = next_item_total_bytes_ + (total_backing_items_ > 1 ? sizeof(uint32_t) : 0);
buffer.resize(read_sz);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that you use buffer here, parallel pull and pop are not allowed. But either way, can't we use out directly to read into?


io::MutableBytes bytes{reinterpret_cast<uint8_t*>(buffer.data()), read_sz};
auto result = reader_->Read(next_read_offset_, bytes);
if (!result) {
LOG(ERROR) << "Could not load item at offset " << next_read_offset_ << " of size " << read_sz
<< " from disk with error: " << result.error().value() << " "
<< result.error().message();
return result.error();
}

VLOG(2) << "Loaded item with offset " << next_read_offset_ << " of size " << read_sz
<< " for connection " << this;

next_read_offset_ += bytes.size();

if (total_backing_items_ > 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we push and pop at the same time? The variable might have changed

auto buf = bytes.subspan(bytes.size() - sizeof(uint32_t));
uint32_t val = ((uint32_t)buf[0]) | ((uint32_t)buf[1] << 8) | ((uint32_t)buf[2] << 16) |
((uint32_t)buf[3] << 24);
bytes = bytes.subspan(0, next_item_total_bytes_);
next_item_total_bytes_ = val;
} else {
next_item_total_bytes_ = 0;
}

std::string read(reinterpret_cast<const char*>(bytes.data()), bytes.size());
*out = std::move(read);

total_backing_bytes_ -= next_item_total_bytes_;
--total_backing_items_;

return {};
}

} // namespace facade
60 changes: 60 additions & 0 deletions src/facade/disk_backed_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2025, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <util/fibers/uring_file.h>

#include <deque>
#include <functional>
#include <memory>
#include <string_view>
#include <system_error>

#include "io/io.h"

namespace facade {

class DiskBackedQueue {
public:
explicit DiskBackedQueue(uint32_t conn_id);
~DiskBackedQueue();

std::error_code Init();

// Check if we can offload bytes to backing file.
bool HasEnoughBackingSpaceFor(size_t bytes) const;

std::error_code Push(std::string_view blob);

std::error_code Pop(std::string* out);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not a good interface now - as you do not pop an element. You will usually have a destination buffer which you will want to fill till the end of its capacity or less. Similarly to socket Recv() interface.


// Check if backing file is empty, i.e. backing file has 0 bytes.
bool Empty() const;

std::error_code CloseReader();
Copy link
Collaborator

@romange romange Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to control Reader and Writer independently?
Seems that if Empty(), you just want to shut the queue altogether

std::error_code CloseWriter();

private:
// File Reader/Writer
std::unique_ptr<io::WriteFile> writer_;
std::unique_ptr<io::ReadonlyFile> reader_;

size_t total_backing_bytes_ = 0;
size_t total_backing_items_ = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

total_backing_items_ is redundant now


size_t next_read_offset_ = 4;
size_t next_item_total_bytes_ = 0;

// Read only constants
const size_t max_backing_size_ = 0;
const size_t max_queue_load_size_ = 0;

// same as connection id. Used to uniquely identify the backed file
const size_t id_ = 0;

std::string buffer;
};

} // namespace facade
66 changes: 66 additions & 0 deletions src/facade/disk_backed_queue_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2025, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "facade/disk_backed_queue.h"

#include <absl/strings/str_cat.h>
#include <gmock/gmock.h>

#include <string>
#include <thread>
#include <vector>

#include "base/flags.h"
#include "base/gtest.h"
#include "base/logging.h"
#include "io/io.h"
#include "util/fibers/uring_proactor.h"

namespace dfly {
namespace {

using namespace facade;

TEST(DiskBackedQueueTest, ReadWrite) {
auto proactor = std::make_unique<util::fb2::UringProactor>();

auto pthread = std::thread{[ptr = proactor.get()] {
static_cast<util::fb2::UringProactor*>(ptr)->Init(0, 8);
ptr->Run();
}};

proactor->Await([]() {
DiskBackedQueue backing(1 /* id */);
EXPECT_FALSE(backing.Init());

std::vector<std::string> commands;
for (size_t i = 0; i < 100; ++i) {
commands.push_back(absl::StrCat("SET FOO", i, " BAR"));
auto ec = backing.Push(commands.back());
EXPECT_FALSE(ec);
}

std::vector<std::string> results;
for (size_t i = 0; i < 100; ++i) {
std::string res;
auto ec = backing.Pop(&res);
EXPECT_FALSE(ec);
results.push_back(std::move(res));
}

EXPECT_EQ(results.size(), commands.size());

for (size_t i = 0; i < results.size(); ++i) {
EXPECT_EQ(results[i], commands[i]);
}
EXPECT_FALSE(backing.CloseReader());
EXPECT_FALSE(backing.CloseWriter());
});

proactor->Stop();
pthread.join();
}

} // namespace
} // namespace dfly
17 changes: 16 additions & 1 deletion src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// Copyright 2025, DragonflyDB authors. All rights reserved.
//
// See LICENSE for licensing terms.
//
Expand All @@ -22,6 +22,7 @@
#include "base/stl_util.h"
#include "core/heap_size.h"
#include "facade/conn_context.h"
#include "facade/disk_backed_queue.h"
#include "facade/dragonfly_listener.h"
#include "facade/memcache_parser.h"
#include "facade/redis_parser.h"
Expand Down Expand Up @@ -112,6 +113,9 @@ ABSL_FLAG(uint32_t, pipeline_wait_batch_usec, 0,
"If non-zero, waits for this time for more I/O "
" events to come for the connection in case there is only one command in the pipeline. ");

ABSL_FLAG(size_t, disk_queue_offload_watermark, 0,
"Offload backpressure to disk when dispatch queue size crosses the watermark.");

using namespace util;
using namespace std;
using absl::GetFlag;
Expand Down Expand Up @@ -676,6 +680,17 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
#endif

UpdateLibNameVerMap(lib_name_, lib_ver_, +1);

backpressure_to_disk_watermark_ = absl::GetFlag(FLAGS_disk_queue_offload_watermark);
if (backpressure_to_disk_watermark_ > 0) {
backing_queue_ = std::make_unique<DiskBackedQueue>(id_);
auto ec = backing_queue_->Init();
if (ec) {
LOG(ERROR) << "Error initializing disk backpressure file for connection " << id_ << ": "
<< ec.message();
backing_queue_.reset();
}
}
}

Connection::~Connection() {
Expand Down
6 changes: 5 additions & 1 deletion src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// Copyright 2025, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

Expand Down Expand Up @@ -45,6 +45,7 @@ class ConnectionContext;
class RedisParser;
class ServiceInterface;
class SinkReplyBuilder;
class DiskBackedQueue;

// Connection represents an active connection for a client.
//
Expand Down Expand Up @@ -503,6 +504,9 @@ class Connection : public util::Connection {
};
};

std::unique_ptr<DiskBackedQueue> backing_queue_;
size_t backpressure_to_disk_watermark_ = 0;

bool request_shutdown_ = false;
};

Expand Down
Loading