Skip to content

Commit adc89c7

Browse files
authored
chore: Adding a mpsc intrusive queue based on Vyukov's design (dragonflydb#562)
feat(server): Speed up rdb load by using object pool for parsing objects. 1. Add a mpsc intrusive queue based on Vyukov's design. 2. Use it as a object pool when we pull from the queue to reuse the existing object and push into it in order to return the object back to the pool. Signed-off-by: Roman Gershman <[email protected]> Signed-off-by: Roman Gershman <[email protected]>
1 parent d4cad11 commit adc89c7

File tree

3 files changed

+109
-4
lines changed

3 files changed

+109
-4
lines changed

src/core/mpsc_intrusive_queue.h

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2022, Roman Gershman. All rights reserved.
2+
// See LICENSE for licensing terms.
3+
//
4+
5+
#pragma once
6+
7+
#include <atomic>
8+
#include <cstddef>
9+
10+
// TODO: to move to helio
11+
12+
namespace dfly {
13+
namespace detail {
14+
15+
// a MPSC queue where multiple threads push and a single thread pops.
16+
//
17+
// Requires global functions for T:
18+
//
19+
// T* MPSC_intrusive_load_next(const T& src)
20+
// void MPSC_intrusive_store_next(T* next, T* dest);
21+
// based on the design from here:
22+
// https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
23+
template <typename T> class MPSCIntrusiveQueue {
24+
private:
25+
static constexpr size_t cache_alignment = 64;
26+
static constexpr size_t cacheline_length = 64;
27+
28+
alignas(cache_alignment) typename std::aligned_storage<sizeof(T), alignof(T)>::type storage_{};
29+
T* dummy_;
30+
alignas(cache_alignment) std::atomic<T*> head_;
31+
alignas(cache_alignment) T* tail_;
32+
char pad_[cacheline_length];
33+
34+
public:
35+
MPSCIntrusiveQueue()
36+
: dummy_{reinterpret_cast<T*>(std::addressof(storage_))}, head_{dummy_}, tail_{dummy_} {
37+
MPSC_intrusive_store_next(dummy_, nullptr);
38+
}
39+
40+
MPSCIntrusiveQueue(MPSCIntrusiveQueue const&) = delete;
41+
MPSCIntrusiveQueue& operator=(MPSCIntrusiveQueue const&) = delete;
42+
43+
void Push(T* ctx) noexcept {
44+
// ctx becomes a new head.
45+
MPSC_intrusive_store_next(ctx, nullptr);
46+
T* prev = head_.exchange(ctx, std::memory_order_acq_rel);
47+
MPSC_intrusive_store_next(prev, ctx);
48+
}
49+
50+
T* Pop() noexcept;
51+
};
52+
53+
template <typename T> T* MPSCIntrusiveQueue<T>::Pop() noexcept {
54+
T* tail = tail_;
55+
56+
// tail->next_.load(std::memory_order_acquire);
57+
T* next = MPSC_intrusive_load_next(*tail);
58+
if (dummy_ == tail) {
59+
if (nullptr == next) {
60+
// empty
61+
return nullptr;
62+
}
63+
tail_ = next;
64+
tail = next;
65+
next = MPSC_intrusive_load_next(*next);
66+
}
67+
68+
if (nullptr != next) {
69+
// non-empty
70+
tail_ = next;
71+
return tail;
72+
}
73+
74+
T* head = head_.load(std::memory_order_acquire);
75+
if (tail != head) {
76+
// non-empty, retry is in order: we are in the middle of push.
77+
return nullptr;
78+
}
79+
80+
Push(dummy_);
81+
82+
next = MPSC_intrusive_load_next(*tail);
83+
if (nullptr != next) {
84+
tail_ = next;
85+
return tail;
86+
}
87+
88+
// non-empty, retry is in order: we are still adding.
89+
return nullptr;
90+
}
91+
92+
} // namespace detail
93+
} // namespace dfly

src/server/rdb_load.cc

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1644,6 +1644,12 @@ RdbLoader::RdbLoader(ScriptMgr* script_mgr) : script_mgr_(script_mgr) {
16441644
}
16451645

16461646
RdbLoader::~RdbLoader() {
1647+
while (true) {
1648+
Item* item = item_queue_.Pop();
1649+
if (item == nullptr)
1650+
break;
1651+
delete item;
1652+
}
16471653
}
16481654

16491655
error_code RdbLoader::Load(io::Source* src) {
@@ -2058,7 +2064,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
20582064
}
20592065

20602066
for (auto* item : ib) {
2061-
delete item;
2067+
item_queue_.Push(item);
20622068
}
20632069
}
20642070

@@ -2068,14 +2074,18 @@ void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) {
20682074
}
20692075

20702076
error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
2071-
Item* item = new Item;
2077+
// We return the item in LoadItemsBuffer.
2078+
Item* item = item_queue_.Pop();
2079+
2080+
if (item == nullptr) {
2081+
item = new Item;
2082+
}
20722083

20732084
// Read key
2074-
// We free item in LoadItemsBuffer.
20752085
SET_OR_RETURN(ReadKey(), item->key);
20762086

2087+
// Read value
20772088
error_code ec = ReadObj(type, &item->val);
2078-
20792089
if (ec) {
20802090
VLOG(1) << "ReadObj error " << ec << " for key " << item->key;
20812091
return ec;

src/server/rdb_load.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ extern "C" {
1212

1313
#include "base/io_buf.h"
1414
#include "base/pod_array.h"
15+
#include "core/mpsc_intrusive_queue.h"
1516
#include "io/io.h"
1617
#include "server/common.h"
1718

@@ -217,6 +218,7 @@ class RdbLoader : protected RdbLoaderBase {
217218

218219
// Callback when receiving RDB_OPCODE_FULLSYNC_END
219220
std::function<void()> full_sync_cut_cb;
221+
detail::MPSCIntrusiveQueue<Item> item_queue_;
220222
};
221223

222224
} // namespace dfly

0 commit comments

Comments
 (0)