Skip to content

Commit 4ab0048

Browse files
committed
feat core: implement WaitWake primitive and PullPin task queue
The new schedules scales better and consumes less CPU: ``` ----------------------------------------------------------------------------------- Benchmark Time CPU Iterations ----------------------------------------------------------------------------------- EngineTaskYieldSingleThreadDefault/1 316 ns 316 ns 2220351 EngineTaskYieldSingleThreadDefault/2 630 ns 630 ns 1112507 EngineTaskYieldSingleThreadDefault/4 1264 ns 1263 ns 558502 EngineTaskYieldSingleThreadDefault/8 2526 ns 2524 ns 278501 EngineTaskYieldSingleThreadDefault/16 5072 ns 5072 ns 134894 EngineTaskYieldSingleThreadDefault/32 10165 ns 10163 ns 67798 EngineTaskYieldSingleThreadDefault/64 20403 ns 20362 ns 34170 EngineTaskYieldSingleThreadDefault/128 41798 ns 41788 ns 16875 EngineTaskYieldSingleThreadPullPin/1 327 ns 327 ns 2132918 EngineTaskYieldSingleThreadPullPin/2 614 ns 614 ns 1155496 EngineTaskYieldSingleThreadPullPin/4 1171 ns 1171 ns 599756 EngineTaskYieldSingleThreadPullPin/8 2320 ns 2319 ns 299938 EngineTaskYieldSingleThreadPullPin/16 4680 ns 4680 ns 148972 EngineTaskYieldSingleThreadPullPin/32 9371 ns 9369 ns 74429 EngineTaskYieldSingleThreadPullPin/64 18824 ns 18823 ns 37335 EngineTaskYieldSingleThreadPullPin/128 38295 ns 38275 ns 17963 EngineTaskYieldMultipleThreadsDefault/1 317 ns 317 ns 2213657 yields=3.15859M/s yields/thread=3.15859M/s EngineTaskYieldMultipleThreadsDefault/2 494 ns 494 ns 1389906 yields=4.26975M/s yields/thread=2.13487M/s EngineTaskYieldMultipleThreadsDefault/4 713 ns 713 ns 926177 yields=5.65114M/s yields/thread=1.41278M/s EngineTaskYieldMultipleThreadsDefault/8 1975 ns 1975 ns 354494 yields=4.06994M/s yields/thread=508.742k/s EngineTaskYieldMultipleThreadsDefault/16 3825 ns 2300 ns 384028 yields=6.99766M/s yields/thread=437.354k/s EngineTaskYieldMultipleThreadsDefault/32 7472 ns 7366 ns 306527 yields=4.32357M/s yields/thread=135.112k/s EngineTaskYieldMultipleThreadsDefault/64 1361 ns 1360 ns 481744 yields=4.40304M/s yields/thread=733.84k/s EngineTaskYieldMultipleThreadsDefault/128 2975 ns 2968 ns 236147 yields=4.22007M/s yields/thread=351.672k/s EngineTaskYieldMultipleThreadsPullPin/1 330 ns 330 ns 2140147 yields=3.02931M/s yields/thread=3.02931M/s EngineTaskYieldMultipleThreadsPullPin/2 430 ns 430 ns 1610118 yields=4.23787M/s yields/thread=2.11894M/s EngineTaskYieldMultipleThreadsPullPin/4 519 ns 519 ns 1309407 yields=7.76925M/s yields/thread=1.94231M/s EngineTaskYieldMultipleThreadsPullPin/8 1321 ns 1321 ns 917785 yields=13.534M/s yields/thread=1.69175M/s EngineTaskYieldMultipleThreadsPullPin/16 818 ns 813 ns 1033331 yields=22.6134M/s yields/thread=1.41334M/s EngineTaskYieldMultipleThreadsPullPin/32 1050 ns 782 ns 831137 yields=37.919M/s yields/thread=1.18497M/s EngineTaskYieldMultipleThreadsPullPin/64 952 ns 947 ns 1339093 yields=9.96169M/s yields/thread=1.66028M/s EngineTaskYieldMultipleThreadsPullPin/128 825 ns 824 ns 1292218 yields=20.8307M/s yields/thread=1.7359M/s AsyncComparisonsStdThread 30743 ns 18147 ns 35116 AsyncComparisonsCoroDefault/1 843 ns 842 ns 844449 AsyncComparisonsCoroDefault/2 3642 ns 3113 ns 223740 AsyncComparisonsCoroDefault/4 3245 ns 2441 ns 291053 AsyncComparisonsCoroDefault/8 2958 ns 1344 ns 522948 AsyncComparisonsCoroDefault/16 3686 ns 793 ns 885894 AsyncComparisonsCoroDefault/32 4567 ns 437 ns 1000000 AsyncComparisonsCoroPullPin/1 1444 ns 1444 ns 481150 AsyncComparisonsCoroPullPin/2 3104 ns 2570 ns 257570 AsyncComparisonsCoroPullPin/4 3781 ns 3227 ns 207185 AsyncComparisonsCoroPullPin/8 3826 ns 3161 ns 217770 AsyncComparisonsCoroPullPin/16 4259 ns 3487 ns 190850 AsyncComparisonsCoroPullPin/32 4419 ns 3898 ns 171662 AsyncComparisonsCoroSpannedDefault/1 1173 ns 1173 ns 582349 AsyncComparisonsCoroSpannedDefault/2 4784 ns 3827 ns 189878 AsyncComparisonsCoroSpannedDefault/4 4720 ns 3270 ns 250590 AsyncComparisonsCoroSpannedDefault/8 5012 ns 1910 ns 386672 AsyncComparisonsCoroSpannedDefault/16 5493 ns 982 ns 678369 AsyncComparisonsCoroSpannedDefault/32 6441 ns 534 ns 1301088 AsyncComparisonsCoroSpannedPullPin/1 1799 ns 1799 ns 387044 AsyncComparisonsCoroSpannedPullPin/2 4226 ns 3645 ns 193038 AsyncComparisonsCoroSpannedPullPin/4 5046 ns 4235 ns 121739 AsyncComparisonsCoroSpannedPullPin/8 4495 ns 3727 ns 169162 AsyncComparisonsCoroSpannedPullPin/16 4245 ns 3730 ns 168192 AsyncComparisonsCoroSpannedPullPin/32 5841 ns 4656 ns 113131 ``` Measured at 12 CPU system However the latencies are expected to become bigger. Tests: протестировано CI; выключено по умолчанию, а значит на прод не влияет commit_hash:84475a55ea59b18326528e3ac10efdc3a322fece
1 parent 70109a8 commit 4ab0048

14 files changed

+818
-43
lines changed

.mapping.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,6 +1378,8 @@
13781378
"core/src/concurrent/impl/striped_read_indicator.cpp":"taxi/uservices/userver/core/src/concurrent/impl/striped_read_indicator.cpp",
13791379
"core/src/concurrent/impl/striped_read_indicator_benchmark.cpp":"taxi/uservices/userver/core/src/concurrent/impl/striped_read_indicator_benchmark.cpp",
13801380
"core/src/concurrent/impl/striped_read_indicator_test.cpp":"taxi/uservices/userver/core/src/concurrent/impl/striped_read_indicator_test.cpp",
1381+
"core/src/concurrent/impl/wait_wake.hpp":"taxi/uservices/userver/core/src/concurrent/impl/wait_wake.hpp",
1382+
"core/src/concurrent/impl/wait_wake_test.cpp":"taxi/uservices/userver/core/src/concurrent/impl/wait_wake_test.cpp",
13811383
"core/src/concurrent/intrusive_walkable_pool.hpp":"taxi/uservices/userver/core/src/concurrent/intrusive_walkable_pool.hpp",
13821384
"core/src/concurrent/intrusive_walkable_pool_benchmark.cpp":"taxi/uservices/userver/core/src/concurrent/intrusive_walkable_pool_benchmark.cpp",
13831385
"core/src/concurrent/intrusive_walkable_pool_test.cpp":"taxi/uservices/userver/core/src/concurrent/intrusive_walkable_pool_test.cpp",
@@ -1662,11 +1664,15 @@
16621664
"core/src/engine/task/task_processor_test.cpp":"taxi/uservices/userver/core/src/engine/task/task_processor_test.cpp",
16631665
"core/src/engine/task/task_queue.cpp":"taxi/uservices/userver/core/src/engine/task/task_queue.cpp",
16641666
"core/src/engine/task/task_queue.hpp":"taxi/uservices/userver/core/src/engine/task/task_queue.hpp",
1667+
"core/src/engine/task/task_queue_pull_pin.cpp":"taxi/uservices/userver/core/src/engine/task/task_queue_pull_pin.cpp",
1668+
"core/src/engine/task/task_queue_pull_pin.hpp":"taxi/uservices/userver/core/src/engine/task/task_queue_pull_pin.hpp",
1669+
"core/src/engine/task/task_queue_pull_pin_test.cpp":"taxi/uservices/userver/core/src/engine/task/task_queue_pull_pin_test.cpp",
16651670
"core/src/engine/task/task_queue_tsan.cpp":"taxi/uservices/userver/core/src/engine/task/task_queue_tsan.cpp",
16661671
"core/src/engine/task/task_queue_tsan.hpp":"taxi/uservices/userver/core/src/engine/task/task_queue_tsan.hpp",
16671672
"core/src/engine/task/task_queue_tsan_test.cpp":"taxi/uservices/userver/core/src/engine/task/task_queue_tsan_test.cpp",
16681673
"core/src/engine/task/task_test.cpp":"taxi/uservices/userver/core/src/engine/task/task_test.cpp",
16691674
"core/src/engine/task/task_with_result_test.cpp":"taxi/uservices/userver/core/src/engine/task/task_with_result_test.cpp",
1675+
"core/src/engine/task/thread_id_test.hpp":"taxi/uservices/userver/core/src/engine/task/thread_id_test.hpp",
16701676
"core/src/engine/task/thread_started_hook_test.cpp":"taxi/uservices/userver/core/src/engine/task/thread_started_hook_test.cpp",
16711677
"core/src/engine/task/work_stealing_queue/consumer.cpp":"taxi/uservices/userver/core/src/engine/task/work_stealing_queue/consumer.cpp",
16721678
"core/src/engine/task/work_stealing_queue/consumer.hpp":"taxi/uservices/userver/core/src/engine/task/work_stealing_queue/consumer.hpp",

core/include/userver/engine/task_queue_type.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ namespace engine {
1313
enum class TaskQueueType {
1414
kGlobalTaskQueue, /// < Global `moodycamel` queue from which each thread gets tasks
1515
kWorkStealingTaskQueue, /// < Global+thread-specific queues with interqueues work stealing (experimental queue)
16+
kPullPinTaskQueue, /// < Global+thread-specific queues. Each task gets pinned to a thread-specific queue and is
17+
/// executed only in that thread (experimental queue)
1618
kTSanTaskQueue, /// < Queue for TSan runs. Each task gets pinned to a thread-specific queue and is executed only in
1719
/// that thread (experimental queue). Thread Sanitizer runs are automatically switched to this
1820
/// queue
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
#pragma once
2+
3+
#include <condition_variable>
4+
5+
#include <userver/utils/assert.hpp>
6+
7+
#if __has_include(<linux/futex.h>)
8+
9+
#include <linux/futex.h> /* Definition of FUTEX_* constants */
10+
#include <sys/syscall.h> /* Definition of SYS_* constants */
11+
#include <unistd.h>
12+
13+
#include <limits>
14+
15+
#endif
16+
17+
USERVER_NAMESPACE_BEGIN
18+
19+
namespace concurrent::impl {
20+
21+
class WaitWakeCondvar final {
22+
public:
23+
WaitWakeCondvar() = default;
24+
25+
std::size_t WakeupAll() {
26+
{
27+
const std::lock_guard guard{mutex_};
28+
}
29+
condvar_.notify_all();
30+
return std::numeric_limits<std::size_t>::max();
31+
}
32+
33+
std::size_t WakeupSome(int /*wakeup_at_most*/) {
34+
WakeupAll();
35+
return std::numeric_limits<std::size_t>::max();
36+
}
37+
38+
// May wake up more than one waiter due to internal limitations
39+
std::size_t WakeupByIndex(std::size_t /*index*/) {
40+
WakeupAll();
41+
return std::numeric_limits<std::size_t>::max();
42+
}
43+
44+
template <class Predicate>
45+
void WaitByIndex(std::size_t /*index*/, Predicate pred) {
46+
std::unique_lock lock{mutex_};
47+
condvar_.wait(lock, std::move(pred));
48+
}
49+
50+
private:
51+
std::mutex mutex_{};
52+
std::condition_variable condvar_{};
53+
};
54+
55+
#if __has_include(<linux/futex.h>)
56+
57+
class WaitWakeFutex final {
58+
public:
59+
WaitWakeFutex() = default;
60+
61+
std::size_t WakeupAll() { return WakeupByBitmask(FUTEX_BITSET_MATCH_ANY, kAllBitsetWaiters); }
62+
63+
// Returns number of woken up waiters that were sleeping in OS (may return less than actually woke up)
64+
std::size_t WakeupSome(int wakeup_at_most) { return WakeupByBitmask(FUTEX_BITSET_MATCH_ANY, wakeup_at_most); }
65+
66+
// May wake up more than one waiter due to internal limitations (may return less than actually woke up)
67+
std::size_t WakeupByIndex(std::size_t index) { return WakeupByBitmask(IndexToBitmask(index), kAllBitsetWaiters); }
68+
69+
template <class Predicate>
70+
void WaitByIndex(std::size_t index, Predicate pred) {
71+
const auto bitmask = IndexToBitmask(index);
72+
73+
for (;;) {
74+
std::uint32_t snapshot; // NOLINT(cppcoreguidelines-init-variables)
75+
__atomic_load(&generation_, &snapshot, __ATOMIC_SEQ_CST);
76+
if (pred()) {
77+
break;
78+
}
79+
80+
const auto ret = syscall(SYS_futex, &generation_, FUTEX_WAIT_BITSET, snapshot, NULL, NULL, bitmask);
81+
UINVARIANT(ret != -1 || errno == EAGAIN || errno == EWOULDBLOCK, "Failure in futex(FUTEX_WAIT_BITSET)");
82+
}
83+
}
84+
85+
private:
86+
static constexpr int kAllBitsetWaiters = std::numeric_limits<int>::max();
87+
static constexpr int IndexToBitmask(std::size_t index) { return static_cast<int>(1 << (index % kBitsInBitset)); }
88+
89+
std::size_t WakeupByBitmask(int bitmask, int wakeup_at_most) {
90+
__atomic_add_fetch(&generation_, 1, __ATOMIC_SEQ_CST);
91+
92+
const auto ret = syscall(SYS_futex, &generation_, FUTEX_WAKE_BITSET, wakeup_at_most, NULL, NULL, bitmask);
93+
UINVARIANT(ret != -1 || errno == EAGAIN || errno == EWOULDBLOCK, "Failure in futex(FUTEX_WAKE_BITSET)");
94+
return ret;
95+
}
96+
97+
static constexpr int kBitsInBitset = 32;
98+
99+
std::uint32_t generation_{0};
100+
};
101+
102+
using WaitWake = WaitWakeFutex;
103+
104+
#else
105+
106+
using WaitWake = WaitWakeCondvar;
107+
108+
#endif
109+
110+
} // namespace concurrent::impl
111+
112+
USERVER_NAMESPACE_END
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
#include <concurrent/impl/wait_wake.hpp>
2+
3+
#include <thread>
4+
5+
#include <gtest/gtest.h>
6+
7+
USERVER_NAMESPACE_BEGIN
8+
9+
constexpr std::size_t kTestThreadsCount = 100;
10+
11+
void GiveTimeToEnterSysCall() { std::this_thread::sleep_for(std::chrono::milliseconds(4)); }
12+
13+
TEST(WaitWake, SingleThread) {
14+
concurrent::impl::WaitWake ww;
15+
std::atomic<int> state = 0;
16+
std::atomic<bool> predicate_was_called = false;
17+
18+
std::thread t{[&ww, &state, &predicate_was_called]() {
19+
ww.WaitByIndex(0, [&state, &predicate_was_called]() {
20+
if (!predicate_was_called.exchange(true)) {
21+
++state;
22+
}
23+
return state.load() == 2;
24+
});
25+
EXPECT_EQ(state.load(), 2);
26+
++state;
27+
}};
28+
29+
while (state != 1) {
30+
std::this_thread::yield();
31+
}
32+
EXPECT_TRUE(predicate_was_called);
33+
state = 2;
34+
ww.WakeupAll();
35+
36+
t.join();
37+
EXPECT_EQ(state.load(), 3);
38+
}
39+
40+
TEST(WaitWake, WakeupTriggersPredicateRecheck) {
41+
concurrent::impl::WaitWake ww;
42+
std::atomic<int> state = 0;
43+
std::thread t{[&ww, &state]() {
44+
ww.WaitByIndex(0, [&state]() {
45+
++state;
46+
return state.load() >= 2;
47+
});
48+
}};
49+
50+
while (state != 1) {
51+
std::this_thread::yield();
52+
}
53+
54+
// Make sure that wakeup not lost
55+
const auto woken_up = ww.WakeupAll();
56+
57+
// Other thread may not have entered the OS sleep yet, but must wake up anyway
58+
EXPECT_LE(woken_up, 1);
59+
60+
t.join();
61+
EXPECT_GE(state.load(), 2);
62+
}
63+
64+
TEST(WaitWake, MultipleThreads) {
65+
concurrent::impl::WaitWake ww;
66+
std::atomic<int> state = 0;
67+
68+
std::vector<std::thread> threads;
69+
threads.reserve(kTestThreadsCount);
70+
for (std::size_t i = 0; i < kTestThreadsCount; ++i) {
71+
threads.emplace_back([&ww, &state]() {
72+
++state;
73+
ww.WaitByIndex(0, [&state]() { return state.load() == kTestThreadsCount + 1; });
74+
EXPECT_EQ(state.load(), kTestThreadsCount + 1);
75+
});
76+
}
77+
78+
while (state != kTestThreadsCount) {
79+
std::this_thread::yield();
80+
}
81+
82+
++state;
83+
ww.WakeupAll();
84+
for (auto& t : threads) {
85+
t.join();
86+
}
87+
}
88+
89+
#if __has_include(<linux/futex.h>)
90+
91+
TEST(WaitWake, FutexWakeupByIndex) {
92+
concurrent::impl::WaitWake ww;
93+
std::atomic<std::size_t> state = 0;
94+
95+
std::vector<std::thread> threads;
96+
threads.reserve(kTestThreadsCount);
97+
for (std::size_t i = 0; i < kTestThreadsCount; ++i) {
98+
threads.emplace_back([i, &ww, &state]() {
99+
++state;
100+
ww.WaitByIndex(i, [&state]() { return state.load() >= kTestThreadsCount + 1; });
101+
++state;
102+
});
103+
}
104+
105+
while (state != kTestThreadsCount) {
106+
std::this_thread::yield();
107+
}
108+
GiveTimeToEnterSysCall();
109+
110+
++state;
111+
const auto woken_up = ww.WakeupByIndex(0);
112+
113+
// WaitWake distinguishes 32 indexes, but some may not entered the OS sleep yet or some spurious wakeups could
114+
// happen. Just checking that not all of the waiters were woken up
115+
EXPECT_LE(woken_up, kTestThreadsCount / 2);
116+
EXPECT_GE(woken_up, 1); // not guaranteed, but holds due to GiveTimeToEnterSysCall() and huge kTestThreadsCount
117+
118+
// Give quite some time to wake up some of the affected
119+
do {
120+
std::this_thread::sleep_for(std::chrono::milliseconds(4));
121+
} while (state == kTestThreadsCount + 1);
122+
EXPECT_LT(state.load(), kTestThreadsCount * 2);
123+
124+
for (std::size_t i = 1; i < kTestThreadsCount; ++i) {
125+
ww.WakeupByIndex(i);
126+
}
127+
128+
for (auto& t : threads) {
129+
t.join();
130+
}
131+
}
132+
133+
TEST(WaitWake, FutexWakeupSome) {
134+
concurrent::impl::WaitWake ww;
135+
std::atomic<std::size_t> state = 0;
136+
137+
std::vector<std::thread> threads;
138+
threads.reserve(kTestThreadsCount);
139+
for (std::size_t i = 0; i < kTestThreadsCount; ++i) {
140+
threads.emplace_back([i, &ww, &state]() {
141+
++state;
142+
ww.WaitByIndex(i, [&state]() { return state.load() >= kTestThreadsCount + 1; });
143+
++state;
144+
});
145+
}
146+
147+
while (state != kTestThreadsCount) {
148+
std::this_thread::yield();
149+
}
150+
GiveTimeToEnterSysCall();
151+
152+
++state;
153+
const int half_of_waiters{kTestThreadsCount / 2};
154+
const int woken_up = ww.WakeupSome(half_of_waiters);
155+
156+
// WaitWake distinguishes 32 indexes, but some may not entered the OS sleep yet or some spurious wakeups could
157+
// happen. Just checking that not all of the waiters were woken up
158+
EXPECT_LE(woken_up, half_of_waiters);
159+
EXPECT_GE(woken_up, 1); // not guaranteed, but holds due to GiveTimeToEnterSysCall() and huge kTestThreadsCount
160+
161+
// Give quite some time to wake up all the affected
162+
do {
163+
std::this_thread::sleep_for(std::chrono::milliseconds(4));
164+
} while (state < kTestThreadsCount + half_of_waiters + 1);
165+
166+
ww.WakeupSome(kTestThreadsCount - half_of_waiters);
167+
for (auto& t : threads) {
168+
t.join();
169+
}
170+
}
171+
172+
#endif
173+
174+
USERVER_NAMESPACE_END

0 commit comments

Comments
 (0)