Skip to content

Commit e43e064

Browse files
committed
chore: disk backpressure class utility
Signed-off-by: Kostas Kyrimis <[email protected]>
1 parent 7701d1d commit e43e064

File tree

5 files changed

+220
-3
lines changed

5 files changed

+220
-3
lines changed

src/facade/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ cxx_link(dfly_parser_lib base strings_lib)
33

44
add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc
55
memcache_parser.cc reply_builder.cc op_status.cc service_interface.cc
6-
reply_capture.cc cmd_arg_parser.cc tls_helpers.cc socket_utils.cc)
6+
reply_capture.cc cmd_arg_parser.cc tls_helpers.cc socket_utils.cc
7+
disk_connection_backpressure.cc)
78

89
if (DF_USE_SSL)
910
set(TLS_LIB tls_lib)
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
//
3+
// See LICENSE for licensing terms.
4+
//
5+
6+
#include "facade/disk_connection_backpressure.h"
7+
8+
#include <absl/strings/str_cat.h>
9+
10+
#include <string>
11+
12+
#include "base/flags.h"
13+
#include "base/logging.h"
14+
#include "facade/facade_types.h"
15+
#include "io/io.h"
16+
#include "util/fibers/uring_file.h"
17+
18+
using facade::operator""_MB;
19+
20+
ABSL_FLAG(std::string, disk_backpressure_folder, "/tmp/",
21+
"Folder to store "
22+
"disk backed connection backpressure");
23+
24+
ABSL_FLAG(size_t, disk_backpressure_file_max_bytes, 50_MB,
25+
"Maximum size of the backing file. When max size is reached, connection will "
26+
"stop offloading backpressure to disk and block on client read.");
27+
28+
ABSL_FLAG(size_t, disk_backpressure_load_size, 30,
29+
"How many items to load in dispatch queue from the disk backed file.");
30+
31+
namespace facade {
32+
33+
DiskBackedBackpressureQueue::DiskBackedBackpressureQueue(uint32_t conn_id)
34+
: max_backing_size_(absl::GetFlag(FLAGS_disk_backpressure_file_max_bytes)),
35+
max_queue_load_size_(absl::GetFlag(FLAGS_disk_backpressure_load_size)),
36+
id_(conn_id) {
37+
}
38+
39+
std::error_code DiskBackedBackpressureQueue::Init() {
40+
std::string backing_name = absl::StrCat(absl::GetFlag(FLAGS_disk_backpressure_folder), id_);
41+
{
42+
// Kernel transparently handles buffering via the page cache.
43+
auto res = util::fb2::OpenWrite(backing_name, {} /* overwrite mode + non direct io */);
44+
if (!res) {
45+
return res.error();
46+
}
47+
writer_.reset(*res);
48+
}
49+
50+
auto res = util::fb2::OpenRead(backing_name);
51+
if (!res) {
52+
return res.error();
53+
}
54+
reader_.reset(*res);
55+
56+
VLOG(3) << "Created backing for connection " << this << " " << backing_name;
57+
58+
return {};
59+
}
60+
61+
DiskBackedBackpressureQueue::~DiskBackedBackpressureQueue() {
62+
auto ec = writer_->Close();
63+
LOG_IF(WARNING, ec) << ec.message();
64+
ec = reader_->Close();
65+
LOG_IF(WARNING, ec) << ec.message();
66+
}
67+
68+
// Check if backing file is empty, i.e. backing file has 0 bytes.
69+
bool DiskBackedBackpressureQueue::Empty() const {
70+
return total_backing_bytes_ == 0;
71+
}
72+
73+
bool DiskBackedBackpressureQueue::HasEnoughBackingSpaceFor(size_t bytes) const {
74+
return (bytes + total_backing_bytes_) < max_backing_size_;
75+
}
76+
77+
size_t DiskBackedBackpressureQueue::TotalInMemoryBytes() const {
78+
return offsets_.size() * sizeof(ItemOffset);
79+
}
80+
81+
void DiskBackedBackpressureQueue::OffloadToBacking(std::string_view blob) {
82+
ItemOffset item;
83+
item.offset = next_offset_;
84+
item.total_bytes = blob.size();
85+
86+
// TODO we should truncate as the file grows. That way we never end up with large files
87+
// on disk.
88+
auto res = writer_->Write(blob);
89+
if (res) {
90+
VLOG(2) << "Failed to offload connection " << this << " backpressure with offset "
91+
<< item.offset << " of size " << item.total_bytes << " to backing with error: " << res;
92+
return;
93+
}
94+
95+
total_backing_bytes_ += blob.size();
96+
offsets_.push_back(item);
97+
next_offset_ += item.total_bytes;
98+
99+
VLOG(2) << "Offload connection " << this << " backpressure of " << item.total_bytes
100+
<< " bytes to disk at offset: " << item.offset;
101+
VLOG(3) << "Command offloaded: " << blob;
102+
}
103+
104+
void DiskBackedBackpressureQueue::LoadFromBacking(std::function<void(io::MutableBytes)> f) {
105+
std::string buffer;
106+
size_t up_to = max_queue_load_size_;
107+
108+
while (!offsets_.empty() && up_to--) {
109+
ItemOffset item = offsets_.front();
110+
111+
buffer.resize(item.total_bytes);
112+
113+
io::MutableBytes bytes{reinterpret_cast<uint8_t*>(buffer.data()), item.total_bytes};
114+
auto result = reader_->Read(item.offset, bytes);
115+
if (!result) {
116+
LOG(ERROR) << "Could not load item at offset " << item.offset << " of size "
117+
<< item.total_bytes << " from disk with error: " << result.error().value() << " "
118+
<< result.error().message();
119+
return;
120+
}
121+
122+
VLOG(2) << "Loaded item with offset " << item.offset << " of size " << item.total_bytes
123+
<< " for connection " << this;
124+
125+
f(bytes);
126+
127+
offsets_.pop_front();
128+
total_backing_bytes_ -= item.total_bytes;
129+
}
130+
}
131+
132+
} // namespace facade
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#pragma once
6+
7+
#include <util/fibers/uring_file.h>
8+
9+
#include <deque>
10+
#include <functional>
11+
#include <memory>
12+
#include <string_view>
13+
#include <system_error>
14+
15+
#include "io/io.h"
16+
17+
namespace facade {
18+
19+
class DiskBackedBackpressureQueue {
20+
public:
21+
explicit DiskBackedBackpressureQueue(uint32_t conn_id);
22+
~DiskBackedBackpressureQueue();
23+
24+
std::error_code Init();
25+
26+
// Check if we can offload bytes to backing file.
27+
bool HasEnoughBackingSpaceFor(size_t bytes) const;
28+
29+
// Total size of internal buffers/structures.
30+
size_t TotalInMemoryBytes() const;
31+
32+
void OffloadToBacking(std::string_view blob);
33+
34+
// For each item loaded from disk it calls f(item) to consume it.
35+
// Reads up to max_queue_load_size_ items on each call
36+
void LoadFromBacking(std::function<void(io::MutableBytes)> f);
37+
38+
// Check if backing file is empty, i.e. backing file has 0 bytes.
39+
bool Empty() const;
40+
41+
private:
42+
// File Reader/Writer
43+
std::unique_ptr<io::WriteFile> writer_;
44+
std::unique_ptr<io::ReadonlyFile> reader_;
45+
46+
// In memory backed file map
47+
struct ItemOffset {
48+
size_t offset = 0;
49+
size_t total_bytes = 0;
50+
};
51+
52+
std::deque<ItemOffset> offsets_;
53+
54+
size_t total_backing_bytes_ = 0;
55+
size_t next_offset_ = 0;
56+
57+
// Read only constants
58+
const size_t max_backing_size_ = 0;
59+
const size_t max_queue_load_size_ = 0;
60+
61+
// unique id for the file backed
62+
const size_t id_ = 0;
63+
};
64+
65+
} // namespace facade

src/facade/dragonfly_connection.cc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022, DragonflyDB authors. All rights reserved.
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
22
//
33
// See LICENSE for licensing terms.
44
//
@@ -13,6 +13,7 @@
1313
#include <numeric>
1414
#include <variant>
1515

16+
#include "absl/flags/internal/flag.h"
1617
#include "base/cycle_clock.h"
1718
#include "base/flag_utils.h"
1819
#include "base/flags.h"
@@ -22,6 +23,7 @@
2223
#include "base/stl_util.h"
2324
#include "core/heap_size.h"
2425
#include "facade/conn_context.h"
26+
#include "facade/disk_connection_backpressure.h"
2527
#include "facade/dragonfly_listener.h"
2628
#include "facade/memcache_parser.h"
2729
#include "facade/redis_parser.h"
@@ -112,6 +114,9 @@ ABSL_FLAG(uint32_t, pipeline_wait_batch_usec, 0,
112114
"If non-zero, waits for this time for more I/O "
113115
" events to come for the connection in case there is only one command in the pipeline. ");
114116

117+
ABSL_FLAG(size_t, disk_backpressure_offload_watermark, 0,
118+
"Offload backpressure to disk when dispatch queue size crosses the watermark.");
119+
115120
using namespace util;
116121
using namespace std;
117122
using absl::GetFlag;
@@ -676,6 +681,16 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
676681
#endif
677682

678683
UpdateLibNameVerMap(lib_name_, lib_ver_, +1);
684+
685+
backpressure_to_disk_watermark_ = absl::GetFlag(FLAGS_disk_backpressure_offload_watermark);
686+
if (backpressure_to_disk_watermark_ > 0) {
687+
backing_queue_ = std::make_unique<DiskBackedBackpressureQueue>(id_);
688+
auto ec = backing_queue_->Init();
689+
if (ec) {
690+
LOG(ERROR) << "Error while initializing backpressure file " << ec.message();
691+
backing_queue_.reset();
692+
}
693+
}
679694
}
680695

681696
Connection::~Connection() {

src/facade/dragonfly_connection.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022, DragonflyDB authors. All rights reserved.
1+
// Copyright 2025, DragonflyDB authors. All rights reserved.
22
// See LICENSE for licensing terms.
33
//
44

@@ -45,6 +45,7 @@ class ConnectionContext;
4545
class RedisParser;
4646
class ServiceInterface;
4747
class SinkReplyBuilder;
48+
class DiskBackedBackpressureQueue;
4849

4950
// Connection represents an active connection for a client.
5051
//
@@ -503,6 +504,9 @@ class Connection : public util::Connection {
503504
};
504505
};
505506

507+
std::unique_ptr<DiskBackedBackpressureQueue> backing_queue_;
508+
size_t backpressure_to_disk_watermark_ = 0;
509+
506510
bool request_shutdown_ = false;
507511
};
508512

0 commit comments

Comments
 (0)