Skip to content

Commit aa9ce8e

Browse files
committed
chore: disk based backpressure for connections
Signed-off-by: Kostas Kyrimis <[email protected]>
1 parent 8ce10fe commit aa9ce8e

File tree

3 files changed

+319
-3
lines changed

3 files changed

+319
-3
lines changed

src/facade/dragonfly_connection.cc

Lines changed: 208 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#include <numeric>
1414
#include <variant>
1515

16+
#include "absl/strings/str_split.h"
17+
#include "absl/time/clock.h"
1618
#include "base/cycle_clock.h"
1719
#include "base/flag_utils.h"
1820
#include "base/flags.h"
@@ -27,7 +29,7 @@
2729
#include "facade/redis_parser.h"
2830
#include "facade/service_interface.h"
2931
#include "facade/socket_utils.h"
30-
#include "io/file.h"
32+
#include "io/io.h"
3133
#include "util/fibers/fibers.h"
3234
#include "util/fibers/proactor_base.h"
3335

@@ -112,6 +114,16 @@ ABSL_FLAG(uint32_t, pipeline_wait_batch_usec, 0,
112114
"If non-zero, waits for this time for more I/O "
113115
" events to come for the connection in case there is only one command in the pipeline. ");
114116

117+
ABSL_FLAG(size_t, connection_disk_backpressure_watermark, 0,
118+
"Offload dispach queue backpressure to disk when it crosses watermark. (0 to disable)");
119+
120+
ABSL_FLAG(size_t, connection_disk_backpressure_file_max_bytes, 50_MB,
121+
"Maximum size of the backing file. When the watermark is reached, connection will "
122+
"stop offloading backpressure to disk");
123+
124+
ABSL_FLAG(size_t, connection_disk_backpressure_load_size, 30,
125+
"How many queue backpressure items to load from disk when dispatch queue is drained");
126+
115127
using namespace util;
116128
using namespace std;
117129
using absl::GetFlag;
@@ -434,6 +446,10 @@ size_t Connection::PipelineMessage::StorageCapacity() const {
434446
return storage.capacity() + args.capacity();
435447
}
436448

449+
size_t Connection::PipelineMessage::StorageBytes() const {
450+
return storage.size();
451+
}
452+
437453
size_t Connection::MessageHandle::UsedMemory() const {
438454
struct MessageSize {
439455
size_t operator()(const PubMessagePtr& msg) {
@@ -676,6 +692,11 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
676692
#endif
677693

678694
UpdateLibNameVerMap(lib_name_, lib_ver_, +1);
695+
696+
const size_t disk_watermark = absl::GetFlag(FLAGS_connection_disk_backpressure_watermark);
697+
if (disk_watermark) {
698+
backing_queue_ = std::make_unique<DiskBackedBackpressureQueue>();
699+
}
679700
}
680701

681702
Connection::~Connection() {
@@ -1162,8 +1183,12 @@ void Connection::ConnectionFlow() {
11621183

11631184
void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
11641185
absl::FunctionRef<MessageHandle()> cmd_msg_cb) {
1165-
bool optimize_for_async = has_more;
11661186
QueueBackpressure& qbp = GetQueueBackpressure();
1187+
if (OffloadBackpressureToDiskIfNeeded(cmd_msg_cb)) {
1188+
return;
1189+
}
1190+
1191+
bool optimize_for_async = has_more;
11671192
if (optimize_for_async &&
11681193
qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size())) {
11691194
stats_->pipeline_throttle_count++;
@@ -1683,12 +1708,23 @@ void Connection::AsyncFiber() {
16831708
QueueBackpressure& qbp = GetQueueBackpressure();
16841709
while (!reply_builder_->GetError()) {
16851710
DCHECK_EQ(socket()->proactor(), ProactorBase::me());
1711+
1712+
LoadBackpressureFromDiskIfNeeded();
1713+
16861714
cnd_.wait(noop_lk, [this] {
1687-
return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch);
1715+
return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch) ||
1716+
disk_backpressure_available_;
16881717
});
1718+
16891719
if (cc_->conn_closing)
16901720
break;
16911721

1722+
if (disk_backpressure_available_) {
1723+
LoadBackpressureFromDiskIfNeeded();
1724+
DCHECK(dispatch_q_.size() > 0);
1725+
disk_backpressure_available_ = false;
1726+
}
1727+
16921728
// We really want to have batching in the builder if possible. This is especially
16931729
// critical in situations where Nagle's algorithm can introduce unwanted high
16941730
// latencies. However we can only batch if we're sure that there are more commands
@@ -2254,4 +2290,173 @@ void ResetStats() {
22542290
io_req_size_hist->Clear();
22552291
}
22562292

2293+
bool Connection::OffloadBackpressureToDiskIfNeeded(absl::FunctionRef<MessageHandle()> handle) {
2294+
// Offload only when dispatch_q_ crosses watermark or when backing queue already
2295+
// has pending items.
2296+
if (backing_queue_ &&
2297+
((dispatch_q_.size() > backing_queue_->Watermark()) || !backing_queue_->Empty())) {
2298+
auto ec = backing_queue_->Init();
2299+
LOG_IF(ERROR, ec) << "Failed to init disk backed backpressure with error " << ec.message();
2300+
2301+
MessageHandle msg;
2302+
if (!ec) {
2303+
msg = handle();
2304+
PipelineMessage* pmsg = std::get<Connection::PipelineMessagePtr>(msg.handle).get();
2305+
if (backing_queue_->HasEnoughBackingSpace(pmsg)) {
2306+
backing_queue_->OffloadToBacking(pmsg);
2307+
if (dispatch_q_.size() == 0) {
2308+
disk_backpressure_available_ = true;
2309+
cnd_.notify_one();
2310+
}
2311+
// Recycle message
2312+
QueueBackpressure& qbp = GetQueueBackpressure();
2313+
if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) {
2314+
stats_->pipeline_cmd_cache_bytes += pmsg->StorageCapacity();
2315+
pipeline_req_pool_.push_back(
2316+
std::move(std::get<Connection::PipelineMessagePtr>(msg.handle)));
2317+
}
2318+
// item offloaded to disk without errors, unblock connection fiber
2319+
return true;
2320+
}
2321+
LOG(WARNING) << "Disk backpressure file size limit reached. Could not offload backpressure.";
2322+
}
2323+
}
2324+
return false;
2325+
}
2326+
2327+
void Connection::LoadBackpressureFromDiskIfNeeded() {
2328+
if (HasDiskBacked()) {
2329+
auto q_insert_cb = [this](io::MutableBytes bytes) {
2330+
PipelineMessagePtr ptr;
2331+
if (ptr = GetFromPipelinePool(); ptr) {
2332+
ptr->storage.resize(bytes.size());
2333+
} else {
2334+
ptr = make_unique<PipelineMessage>(1, 1);
2335+
ptr->storage.resize(bytes.size());
2336+
}
2337+
2338+
memcpy(ptr->storage.begin(), bytes.begin(), bytes.size());
2339+
std::string_view read{reinterpret_cast<char*>(ptr->storage.data()), bytes.size()};
2340+
ptr->args = absl::StrSplit(read, '\0', absl::SkipEmpty());
2341+
2342+
SendAsync({.handle = std::move(ptr)});
2343+
};
2344+
2345+
backing_queue_->LoadFromDiskToQueue(q_insert_cb);
2346+
}
2347+
}
2348+
2349+
size_t Connection::DiskBackedBackpressureQueue::unique_id = 0;
2350+
2351+
Connection::DiskBackedBackpressureQueue::DiskBackedBackpressureQueue()
2352+
: max_backing_size_(absl::GetFlag(FLAGS_connection_disk_backpressure_file_max_bytes)),
2353+
max_queue_load_size_(absl::GetFlag(FLAGS_connection_disk_backpressure_load_size)),
2354+
watermark_(absl::GetFlag(FLAGS_connection_disk_backpressure_watermark)) {
2355+
id_ = ++unique_id;
2356+
}
2357+
2358+
std::error_code Connection::DiskBackedBackpressureQueue::Init() {
2359+
if (init_) {
2360+
return {};
2361+
}
2362+
2363+
std::string backing_name = absl::StrCat("/tmp/backing_", id_);
2364+
{
2365+
// Kernel transparently handles buffering via the page cache.
2366+
auto res = util::fb2::OpenWrite(backing_name, {} /* overwrite mode + non direct io */);
2367+
if (!res) {
2368+
return res.error();
2369+
}
2370+
writer_.reset(*res);
2371+
}
2372+
2373+
auto res = util::fb2::OpenRead(backing_name);
2374+
if (!res) {
2375+
return res.error();
2376+
}
2377+
reader_.reset(*res);
2378+
2379+
VLOG(3) << "Created backing for connection " << this << " " << backing_name;
2380+
init_ = true;
2381+
2382+
return {};
2383+
}
2384+
2385+
bool Connection::DiskBackedBackpressureQueue::Empty() const {
2386+
return total_backing_bytes_ == 0;
2387+
}
2388+
2389+
bool Connection::DiskBackedBackpressureQueue::HasEnoughBackingSpace(
2390+
const Connection::PipelineMessage* msg) const {
2391+
return (msg->StorageBytes() + total_backing_bytes_) < max_backing_size_;
2392+
}
2393+
2394+
size_t Connection::DiskBackedBackpressureQueue::TotalInMemoryBytes() const {
2395+
return offsets_.size() * sizeof(ItemOffset);
2396+
}
2397+
2398+
void Connection::DiskBackedBackpressureQueue::OffloadToBacking(
2399+
const Connection::PipelineMessage* msg) {
2400+
ItemOffset item;
2401+
item.offset = next_offset_;
2402+
item.total_bytes = msg->FullCommand().size();
2403+
2404+
size_t start = absl::GetCurrentTimeNanos();
2405+
2406+
// Only update for non error paths
2407+
size_t end = absl::GetCurrentTimeNanos();
2408+
max_io_write_latency_ = std::max(max_io_write_latency_, (end - start));
2409+
2410+
// TODO we should truncate as the file grows. That way we never end up with large files
2411+
// on disk.
2412+
auto res = writer_->Write(msg->FullCommand());
2413+
if (res) {
2414+
VLOG(2) << "Failed to offload connection " << this << " backpressure with offset "
2415+
<< item.offset << " of size " << item.total_bytes << " to backing with error: " << res;
2416+
return;
2417+
}
2418+
2419+
total_backing_bytes_ += msg->FullCommand().size();
2420+
offsets_.push_back(item);
2421+
next_offset_ += item.total_bytes;
2422+
2423+
VLOG(2) << "Offload connection " << this << " backpressure of " << item.total_bytes
2424+
<< " bytes to disk at offset: " << item.offset;
2425+
VLOG(3) << "Command offloaded: " << msg->FullCommand();
2426+
}
2427+
2428+
template <typename F> void Connection::DiskBackedBackpressureQueue::LoadFromDiskToQueue(F f) {
2429+
std::string buffer;
2430+
size_t up_to = max_queue_load_size_;
2431+
2432+
size_t start = absl::GetCurrentTimeNanos();
2433+
2434+
while (!offsets_.empty() && up_to--) {
2435+
ItemOffset item = offsets_.front();
2436+
2437+
buffer.resize(item.total_bytes);
2438+
2439+
io::MutableBytes bytes{reinterpret_cast<uint8_t*>(buffer.data()), item.total_bytes};
2440+
auto result = reader_->Read(item.offset, bytes);
2441+
if (!result) {
2442+
LOG(ERROR) << "Could not load item at offset " << item.offset << " of size "
2443+
<< item.total_bytes << " from disk with error: " << result.error().value() << " "
2444+
<< result.error().message();
2445+
return;
2446+
}
2447+
2448+
VLOG(2) << "Loaded item with offset " << item.offset << " of size " << item.total_bytes
2449+
<< " for connection " << this;
2450+
2451+
f(bytes);
2452+
2453+
offsets_.pop_front();
2454+
total_backing_bytes_ -= item.total_bytes;
2455+
}
2456+
2457+
// Only update for non error paths
2458+
size_t end = absl::GetCurrentTimeNanos();
2459+
max_io_read_latency_ = std::max(max_io_read_latency_, (end - start));
2460+
}
2461+
22572462
} // namespace facade

src/facade/dragonfly_connection.h

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "facade/facade_types.h"
1818
#include "facade/memcache_parser.h"
1919
#include "facade/resp_expr.h"
20+
#include "io/file.h"
2021
#include "io/io_buf.h"
2122
#include "util/connection.h"
2223
#include "util/fibers/fibers.h"
@@ -90,6 +91,13 @@ class Connection : public util::Connection {
9091

9192
size_t StorageCapacity() const;
9293

94+
// Used by file backed queue back pressure to reconstruct a PipelineMessage
95+
size_t StorageBytes() const;
96+
97+
std::string_view FullCommand() const {
98+
return {storage.data(), storage.size()};
99+
}
100+
93101
// mi_stl_allocator uses mi heap internally.
94102
// The capacity is chosen so that we allocate a fully utilized (256 bytes) block.
95103
using StorageType = absl::InlinedVector<char, kReqStorageSize>;
@@ -401,6 +409,13 @@ class Connection : public util::Connection {
401409

402410
void ConfigureProvidedBuffer();
403411

412+
bool HasDiskBacked() const {
413+
return backing_queue_ && !backing_queue_->Empty();
414+
}
415+
416+
bool OffloadBackpressureToDiskIfNeeded(absl::FunctionRef<MessageHandle()> handle);
417+
418+
void LoadBackpressureFromDiskIfNeeded();
404419
// The read buffer with read data that needs to be parsed and processed.
405420
// For io_uring bundles we may have available_bytes larger than slice.size()
406421
// which means that there are more buffers available to read.
@@ -504,6 +519,64 @@ class Connection : public util::Connection {
504519
};
505520

506521
bool request_shutdown_ = false;
522+
523+
class DiskBackedBackpressureQueue {
524+
public:
525+
DiskBackedBackpressureQueue();
526+
527+
// Init on first call, no-op afterwards.
528+
std::error_code Init();
529+
530+
// Check if backing file is empty, i.e. backing file has 0 bytes.
531+
bool Empty() const;
532+
533+
// Check if we can offload msg to backing file.
534+
bool HasEnoughBackingSpace(const Connection::PipelineMessage* msg) const;
535+
536+
// Total size of internal buffers/structures.
537+
size_t TotalInMemoryBytes() const;
538+
539+
void OffloadToBacking(const Connection::PipelineMessage* msg);
540+
541+
// For each item loaded from disk it calls f(item) to consume it.
542+
// Reads up to max_queue_load_size_ items on each call
543+
template <typename F> void LoadFromDiskToQueue(F f);
544+
545+
size_t Watermark() const {
546+
return watermark_;
547+
}
548+
549+
private:
550+
static size_t unique_id;
551+
std::string filename_;
552+
553+
std::unique_ptr<io::WriteFile> writer_;
554+
std::unique_ptr<io::ReadonlyFile> reader_;
555+
556+
struct ItemOffset {
557+
size_t offset = 0;
558+
size_t total_bytes = 0;
559+
};
560+
561+
std::deque<ItemOffset> offsets_;
562+
563+
// stats
564+
size_t total_backing_bytes_ = 0;
565+
size_t id_ = 0;
566+
size_t next_offset_ = 0;
567+
size_t max_io_read_latency_ = 0;
568+
size_t max_io_write_latency_ = 0;
569+
570+
// Read only constants
571+
const size_t max_backing_size_ = 0;
572+
const size_t max_queue_load_size_ = 0;
573+
const size_t watermark_ = 0;
574+
575+
bool init_ = false;
576+
};
577+
578+
std::unique_ptr<DiskBackedBackpressureQueue> backing_queue_;
579+
bool disk_backpressure_available_ = false;
507580
};
508581

509582
} // namespace facade

0 commit comments

Comments
 (0)