Skip to content

Commit 607c9bc

Browse files
committed
chore: disk based backpressure for connections
Signed-off-by: Kostas Kyrimis <[email protected]>
1 parent 8ce10fe commit 607c9bc

File tree

3 files changed

+321
-3
lines changed

3 files changed

+321
-3
lines changed

src/facade/dragonfly_connection.cc

Lines changed: 206 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include <numeric>
1414
#include <variant>
1515

16+
#include "absl/strings/str_split.h"
1617
#include "base/cycle_clock.h"
1718
#include "base/flag_utils.h"
1819
#include "base/flags.h"
@@ -27,7 +28,6 @@
2728
#include "facade/redis_parser.h"
2829
#include "facade/service_interface.h"
2930
#include "facade/socket_utils.h"
30-
#include "io/file.h"
3131
#include "util/fibers/fibers.h"
3232
#include "util/fibers/proactor_base.h"
3333

@@ -112,6 +112,16 @@ ABSL_FLAG(uint32_t, pipeline_wait_batch_usec, 0,
112112
"If non-zero, waits for this time for more I/O "
113113
" events to come for the connection in case there is only one command in the pipeline. ");
114114

115+
ABSL_FLAG(size_t, connection_disk_backpressure_watermark, 0,
116+
"Offload dispach queue backpressure to disk when it crosses watermark. (0 to disable)");
117+
118+
ABSL_FLAG(size_t, connection_disk_backpressure_file_max_bytes, 50_MB,
119+
"Maximum size of the backing file. When the watermark is reached, connection will "
120+
"stop offloading backpressure to disk");
121+
122+
ABSL_FLAG(size_t, connection_disk_backpressure_load_size, 30,
123+
"How many queue backpressure items to load from disk when dispatch queue is drained");
124+
115125
using namespace util;
116126
using namespace std;
117127
using absl::GetFlag;
@@ -434,6 +444,10 @@ size_t Connection::PipelineMessage::StorageCapacity() const {
434444
return storage.capacity() + args.capacity();
435445
}
436446

447+
size_t Connection::PipelineMessage::StorageBytes() const {
448+
return storage.size();
449+
}
450+
437451
size_t Connection::MessageHandle::UsedMemory() const {
438452
struct MessageSize {
439453
size_t operator()(const PubMessagePtr& msg) {
@@ -676,6 +690,11 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
676690
#endif
677691

678692
UpdateLibNameVerMap(lib_name_, lib_ver_, +1);
693+
694+
const size_t disk_watermark = absl::GetFlag(FLAGS_connection_disk_backpressure_watermark);
695+
if (disk_watermark) {
696+
backing_queue_ = std::make_unique<DiskBackedBackpressureQueue>();
697+
}
679698
}
680699

681700
Connection::~Connection() {
@@ -1162,8 +1181,12 @@ void Connection::ConnectionFlow() {
11621181

11631182
void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
11641183
absl::FunctionRef<MessageHandle()> cmd_msg_cb) {
1165-
bool optimize_for_async = has_more;
11661184
QueueBackpressure& qbp = GetQueueBackpressure();
1185+
if (OffloadBackpressureToDiskIfNeeded(cmd_msg_cb)) {
1186+
return;
1187+
}
1188+
1189+
bool optimize_for_async = has_more;
11671190
if (optimize_for_async &&
11681191
qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size())) {
11691192
stats_->pipeline_throttle_count++;
@@ -1683,12 +1706,23 @@ void Connection::AsyncFiber() {
16831706
QueueBackpressure& qbp = GetQueueBackpressure();
16841707
while (!reply_builder_->GetError()) {
16851708
DCHECK_EQ(socket()->proactor(), ProactorBase::me());
1709+
1710+
LoadBackpressureFromDiskIfNeeded();
1711+
16861712
cnd_.wait(noop_lk, [this] {
1687-
return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch);
1713+
return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch) ||
1714+
disk_backpressure_available_;
16881715
});
1716+
16891717
if (cc_->conn_closing)
16901718
break;
16911719

1720+
if (disk_backpressure_available_) {
1721+
LoadBackpressureFromDiskIfNeeded();
1722+
DCHECK(dispatch_q_.size() > 0);
1723+
disk_backpressure_available_ = false;
1724+
}
1725+
16921726
// We really want to have batching in the builder if possible. This is especially
16931727
// critical in situations where Nagle's algorithm can introduce unwanted high
16941728
// latencies. However we can only batch if we're sure that there are more commands
@@ -2254,4 +2288,173 @@ void ResetStats() {
22542288
io_req_size_hist->Clear();
22552289
}
22562290

2291+
bool Connection::OffloadBackpressureToDiskIfNeeded(absl::FunctionRef<MessageHandle()> handle) {
2292+
// Offload only when dispatch_q_ crosses watermark or when backing queue already
2293+
// has pending items.
2294+
if (backing_queue_ &&
2295+
((dispatch_q_.size() > backing_queue_->Watermark()) || !backing_queue_->Empty())) {
2296+
auto ec = backing_queue_->Init();
2297+
LOG_IF(ERROR, ec) << "Failed to init disk backed backpressure with error " << ec.message();
2298+
2299+
MessageHandle msg;
2300+
if (!ec) {
2301+
msg = handle();
2302+
PipelineMessage* pmsg = std::get<Connection::PipelineMessagePtr>(msg.handle).get();
2303+
if (backing_queue_->HasEnoughBackingSpace(pmsg)) {
2304+
backing_queue_->OffloadToBacking(pmsg);
2305+
if (dispatch_q_.size() == 0) {
2306+
disk_backpressure_available_ = true;
2307+
cnd_.notify_one();
2308+
}
2309+
// Recycle message
2310+
QueueBackpressure& qbp = GetQueueBackpressure();
2311+
if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) {
2312+
stats_->pipeline_cmd_cache_bytes += pmsg->StorageCapacity();
2313+
pipeline_req_pool_.push_back(
2314+
std::move(std::get<Connection::PipelineMessagePtr>(msg.handle)));
2315+
}
2316+
// item offloaded to disk without errors, unblock connection fiber
2317+
return true;
2318+
}
2319+
LOG(WARNING) << "Disk backpressure file size limit reached. Could not offload backpressure.";
2320+
}
2321+
}
2322+
return false;
2323+
}
2324+
2325+
void Connection::LoadBackpressureFromDiskIfNeeded() {
2326+
if (HasDiskBacked()) {
2327+
auto q_insert_cb = [this](io::MutableBytes bytes) {
2328+
PipelineMessagePtr ptr;
2329+
if (ptr = GetFromPipelinePool(); ptr) {
2330+
ptr->storage.resize(bytes.size());
2331+
} else {
2332+
ptr = make_unique<PipelineMessage>(1, 1);
2333+
ptr->storage.resize(bytes.size());
2334+
}
2335+
2336+
memcpy(ptr->storage.begin(), bytes.begin(), bytes.size());
2337+
std::string_view read{reinterpret_cast<char*>(ptr->storage.data()), bytes.size()};
2338+
ptr->args = absl::StrSplit(read, '\0', absl::SkipEmpty());
2339+
2340+
SendAsync({.handle = std::move(ptr)});
2341+
};
2342+
2343+
backing_queue_->LoadFromDiskToQueue(q_insert_cb);
2344+
}
2345+
}
2346+
2347+
size_t Connection::DiskBackedBackpressureQueue::unique_id = 0;
2348+
2349+
Connection::DiskBackedBackpressureQueue::DiskBackedBackpressureQueue()
2350+
: max_backing_size_(absl::GetFlag(FLAGS_connection_disk_backpressure_file_max_bytes)),
2351+
max_queue_load_size_(absl::GetFlag(FLAGS_connection_disk_backpressure_load_size)),
2352+
watermark_(absl::GetFlag(FLAGS_connection_disk_backpressure_watermark)) {
2353+
id_ = ++unique_id;
2354+
}
2355+
2356+
std::error_code Connection::DiskBackedBackpressureQueue::Init() {
2357+
if (init_) {
2358+
return {};
2359+
}
2360+
2361+
std::string backing_name = absl::StrCat("/tmp/backing_", id_);
2362+
{
2363+
// Kernel transparently handles buffering via the page cache.
2364+
auto res = util::fb2::OpenWrite(backing_name, {} /* overwrite mode + non direct io */);
2365+
if (!res) {
2366+
return res.error();
2367+
}
2368+
writer_.reset(*res);
2369+
}
2370+
2371+
auto res = util::fb2::OpenRead(backing_name);
2372+
if (!res) {
2373+
return res.error();
2374+
}
2375+
reader_.reset(*res);
2376+
2377+
VLOG(3) << "Created backing for connection " << this << " " << backing_name;
2378+
init_ = true;
2379+
2380+
return {};
2381+
}
2382+
2383+
bool Connection::DiskBackedBackpressureQueue::Empty() const {
2384+
return total_backing_bytes_ == 0;
2385+
}
2386+
2387+
bool Connection::DiskBackedBackpressureQueue::HasEnoughBackingSpace(
2388+
const Connection::PipelineMessage* msg) const {
2389+
return (msg->StorageBytes() + total_backing_bytes_) < max_backing_size_;
2390+
}
2391+
2392+
size_t Connection::DiskBackedBackpressureQueue::TotalInMemoryBytes() const {
2393+
return offsets_.size() * sizeof(ItemOffset);
2394+
}
2395+
2396+
void Connection::DiskBackedBackpressureQueue::OffloadToBacking(
2397+
const Connection::PipelineMessage* msg) {
2398+
ItemOffset item;
2399+
item.offset = next_offset_;
2400+
item.total_bytes = msg->FullCommand().size();
2401+
2402+
size_t start = absl::GetCurrentTimeNanos();
2403+
2404+
// TODO we should truncate as the file grows. That way we never end up with large files
2405+
// on disk.
2406+
auto res = writer_->Write(msg->FullCommand());
2407+
if (res) {
2408+
VLOG(2) << "Failed to offload connection " << this << " backpressure with offset "
2409+
<< item.offset << " of size " << item.total_bytes << " to backing with error: " << res;
2410+
return;
2411+
}
2412+
2413+
// Only update for non error paths
2414+
size_t end = absl::GetCurrentTimeNanos();
2415+
max_io_write_latency_ = std::max(max_io_write_latency_, (end - start));
2416+
2417+
total_backing_bytes_ += msg->FullCommand().size();
2418+
offsets_.push_back(item);
2419+
next_offset_ += item.total_bytes;
2420+
2421+
VLOG(2) << "Offload connection " << this << " backpressure of " << item.total_bytes
2422+
<< " bytes to disk at offset: " << item.offset;
2423+
VLOG(3) << "Command offloaded: " << msg->FullCommand();
2424+
}
2425+
2426+
template <typename F> void Connection::DiskBackedBackpressureQueue::LoadFromDiskToQueue(F f) {
2427+
std::string buffer;
2428+
size_t up_to = max_queue_load_size_;
2429+
2430+
size_t start = absl::GetCurrentTimeNanos();
2431+
2432+
while (!offsets_.empty() && up_to--) {
2433+
ItemOffset item = offsets_.front();
2434+
2435+
buffer.resize(item.total_bytes);
2436+
2437+
io::MutableBytes bytes{reinterpret_cast<uint8_t*>(buffer.data()), item.total_bytes};
2438+
auto result = reader_->Read(item.offset, bytes);
2439+
if (!result) {
2440+
LOG(ERROR) << "Could not load item at offset " << item.offset << " of size "
2441+
<< item.total_bytes << " from disk with error: " << result.error().value() << " "
2442+
<< result.error().message();
2443+
return;
2444+
}
2445+
2446+
VLOG(2) << "Loaded item with offset " << item.offset << " of size " << item.total_bytes
2447+
<< " for connection " << this;
2448+
2449+
f(bytes);
2450+
2451+
offsets_.pop_front();
2452+
total_backing_bytes_ -= item.total_bytes;
2453+
}
2454+
2455+
// Only update for non error paths
2456+
size_t end = absl::GetCurrentTimeNanos();
2457+
max_io_read_latency_ = std::max(max_io_read_latency_, (end - start));
2458+
}
2459+
22572460
} // namespace facade

src/facade/dragonfly_connection.h

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "facade/facade_types.h"
1818
#include "facade/memcache_parser.h"
1919
#include "facade/resp_expr.h"
20+
#include "io/file.h"
2021
#include "io/io_buf.h"
2122
#include "util/connection.h"
2223
#include "util/fibers/fibers.h"
@@ -90,6 +91,13 @@ class Connection : public util::Connection {
9091

9192
size_t StorageCapacity() const;
9293

94+
// Used by file backed queue back pressure to reconstruct a PipelineMessage
95+
size_t StorageBytes() const;
96+
97+
std::string_view FullCommand() const {
98+
return {storage.data(), storage.size()};
99+
}
100+
93101
// mi_stl_allocator uses mi heap internally.
94102
// The capacity is chosen so that we allocate a fully utilized (256 bytes) block.
95103
using StorageType = absl::InlinedVector<char, kReqStorageSize>;
@@ -401,6 +409,13 @@ class Connection : public util::Connection {
401409

402410
void ConfigureProvidedBuffer();
403411

412+
bool HasDiskBacked() const {
413+
return backing_queue_ && !backing_queue_->Empty();
414+
}
415+
416+
bool OffloadBackpressureToDiskIfNeeded(absl::FunctionRef<MessageHandle()> handle);
417+
418+
void LoadBackpressureFromDiskIfNeeded();
404419
// The read buffer with read data that needs to be parsed and processed.
405420
// For io_uring bundles we may have available_bytes larger than slice.size()
406421
// which means that there are more buffers available to read.
@@ -504,6 +519,68 @@ class Connection : public util::Connection {
504519
};
505520

506521
bool request_shutdown_ = false;
522+
523+
class DiskBackedBackpressureQueue {
524+
public:
525+
DiskBackedBackpressureQueue();
526+
527+
// Init on first call, no-op afterwards.
528+
std::error_code Init();
529+
530+
// Check if backing file is empty, i.e. backing file has 0 bytes.
531+
bool Empty() const;
532+
533+
// Check if we can offload msg to backing file.
534+
bool HasEnoughBackingSpace(const Connection::PipelineMessage* msg) const;
535+
536+
// Total size of internal buffers/structures.
537+
size_t TotalInMemoryBytes() const;
538+
539+
void OffloadToBacking(const Connection::PipelineMessage* msg);
540+
541+
// For each item loaded from disk it calls f(item) to consume it.
542+
// Reads up to max_queue_load_size_ items on each call
543+
template <typename F> void LoadFromDiskToQueue(F f);
544+
545+
size_t Watermark() const {
546+
return watermark_;
547+
}
548+
549+
private:
550+
static size_t unique_id;
551+
552+
// File Reader/Writer
553+
std::unique_ptr<io::WriteFile> writer_;
554+
std::unique_ptr<io::ReadonlyFile> reader_;
555+
556+
// In memory backed file map
557+
struct ItemOffset {
558+
size_t offset = 0;
559+
size_t total_bytes = 0;
560+
};
561+
562+
std::deque<ItemOffset> offsets_;
563+
564+
// unique id for the file backed
565+
size_t id_ = 0;
566+
567+
// stats
568+
size_t total_backing_bytes_ = 0;
569+
size_t next_offset_ = 0;
570+
size_t max_io_read_latency_ = 0;
571+
size_t max_io_write_latency_ = 0;
572+
573+
// Read only constants
574+
const size_t max_backing_size_ = 0;
575+
const size_t max_queue_load_size_ = 0;
576+
const size_t watermark_ = 0;
577+
578+
// Idempotent init
579+
bool init_ = false;
580+
};
581+
582+
std::unique_ptr<DiskBackedBackpressureQueue> backing_queue_;
583+
bool disk_backpressure_available_ = false;
507584
};
508585

509586
} // namespace facade

0 commit comments

Comments
 (0)