Skip to content

Commit e61f96e

Browse files
committed
fix concurrent: fix PopNoblock sometimes not giving out last item
`PopNoblock` would sometimes not give out the last remaining item. This happened due to a misunderstanding of the semantics of `TryPopWeak`. The bug surfaced in the following test: ``` [ RUN ] MpscQueue.MultiProducer userver/core/src/concurrent/mpsc_queue_test.cpp:232: Failure Value of: consumer.PopNoblock(value_2) Actual: false Expected: true [ FAILED ] MpscQueue.MultiProducer (14 ms) ``` commit_hash:e0a4d77e412d434ff2e592af92bf50cfb4941bc5
1 parent a7f4bf4 commit e61f96e

File tree

3 files changed

+33
-17
lines changed

3 files changed

+33
-17
lines changed

core/include/userver/concurrent/impl/intrusive_mpsc_queue.hpp

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ class IntrusiveMpscQueueImpl final {
2929
using NodePtr = SinglyLinkedBaseHook*;
3030
using NodeRef = utils::NotNull<NodePtr>;
3131

32+
enum class PopMode {
33+
// See TryPopBlocking.
34+
kRarelyBlocking,
35+
// See TryPopWeak.
36+
kWeak,
37+
};
38+
3239
constexpr IntrusiveMpscQueueImpl() = default;
3340

3441
IntrusiveMpscQueueImpl(IntrusiveMpscQueueImpl&&) = delete;
@@ -53,7 +60,7 @@ class IntrusiveMpscQueueImpl final {
5360
// Can only be called from one thread at a time.
5461
NodePtr TryPopBlocking() noexcept;
5562

56-
// Returns the oldest pushed not, or `nullptr` if the queue "seems to be"
63+
// Returns the oldest pushed node, or `nullptr` if the queue "seems to be"
5764
// empty. Can only be called from one thread at a time.
5865
// Unlike TryPopBlocking, never blocks, but this comes at a cost: it might
5966
// not return an item that has been completely pushed (happens-before).
@@ -64,6 +71,9 @@ class IntrusiveMpscQueueImpl final {
6471
// responsible for pushing both items, and the other producer walks away,
6572
// and for them the push operation is essentially pushed asynchronously.
6673
//
74+
// In addition to that, the last remaining item in the queue may not be
75+
// returned if the push of a second item has started.
76+
//
6777
// If the producers always notify the consumer after pushing,
6878
// then TryPopWeak is enough: the consumer will be notified of all
6979
// the pushed items by some of the producers.
@@ -74,16 +84,12 @@ class IntrusiveMpscQueueImpl final {
7484
// upon the notification.
7585
NodePtr TryPopWeak() noexcept;
7686

77-
private:
78-
enum class PopMode {
79-
kRarelyBlocking,
80-
kWeak,
81-
};
87+
// See TryPopBlocking and TryPopWeak.
88+
NodePtr TryPop(PopMode pop_mode) noexcept;
8289

90+
private:
8391
static std::atomic<NodePtr>& GetNext(NodeRef node) noexcept;
8492

85-
NodePtr DoTryPop(PopMode) noexcept;
86-
8793
// This node is put into the queue when it would otherwise be empty.
8894
SinglyLinkedBaseHook stub_;
8995
// Points to the oldest node not yet popped by the consumer,
@@ -109,6 +115,8 @@ class IntrusiveMpscQueue final {
109115

110116
T* TryPopWeak() noexcept { return static_cast<T*>(impl_.TryPopWeak()); }
111117

118+
T* TryPop(IntrusiveMpscQueueImpl::PopMode pop_mode) noexcept { return static_cast<T*>(impl_.TryPop(pop_mode)); }
119+
112120
private:
113121
IntrusiveMpscQueueImpl impl_;
114122
};

core/include/userver/concurrent/mpsc_queue.hpp

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ class MpscQueue final : public std::enable_shared_from_this<MpscQueue<T>> {
125125

126126
bool Pop(ConsumerToken&, T&, engine::Deadline);
127127
bool PopNoblock(ConsumerToken&, T&);
128-
bool DoPop(ConsumerToken&, T&);
128+
bool DoPop(ConsumerToken&, T&, impl::IntrusiveMpscQueueImpl::PopMode);
129129

130130
void MarkConsumerIsDead();
131131
void MarkProducerIsDead();
@@ -224,14 +224,16 @@ template <typename T>
224224
bool MpscQueue<T>::Pop(ConsumerToken& token, T& value, engine::Deadline deadline) {
225225
bool no_more_producers = false;
226226
const bool success = nonempty_event_.WaitUntil(deadline, [&] {
227-
if (DoPop(token, value)) {
227+
// kWeak is OK here, because if there is another push operation in process,
228+
// they will notify us after pushing.
229+
if (DoPop(token, value, impl::IntrusiveMpscQueueImpl::PopMode::kWeak)) {
228230
return true;
229231
}
230232
if (NoMoreProducers()) {
231233
// Producer might have pushed something in queue between .pop()
232234
// and !producer_is_created_and_dead_ check. Check twice to avoid
233235
// TOCTOU.
234-
if (!DoPop(token, value)) {
236+
if (!DoPop(token, value, impl::IntrusiveMpscQueueImpl::PopMode::kRarelyBlocking)) {
235237
no_more_producers = true;
236238
}
237239
return true;
@@ -243,12 +245,18 @@ bool MpscQueue<T>::Pop(ConsumerToken& token, T& value, engine::Deadline deadline
243245

244246
template <typename T>
245247
bool MpscQueue<T>::PopNoblock(ConsumerToken& token, T& value) {
246-
return DoPop(token, value);
248+
// kRarelyBlocking is required here, because with kWeak we sometimes would miss an item if another push
249+
// is in process, and there is no guarantee that the user will retry PopNoblock.
250+
//
251+
// If there was a high-level consumer API that is not affected by kWeak (e.g. some batching API),
252+
// then it could be used there.
253+
// As it stands, it would be too bug-prone to provide weak guarantees in PopNoblock.
254+
return DoPop(token, value, impl::IntrusiveMpscQueueImpl::PopMode::kRarelyBlocking);
247255
}
248256

249257
template <typename T>
250-
bool MpscQueue<T>::DoPop(ConsumerToken& /*unused*/, T& value) {
251-
if (const auto node = std::unique_ptr<Node>{queue_.TryPopWeak()}) {
258+
bool MpscQueue<T>::DoPop(ConsumerToken& /*unused*/, T& value, impl::IntrusiveMpscQueueImpl::PopMode pop_mode) {
259+
if (const auto node = std::unique_ptr<Node>{queue_.TryPop(pop_mode)}) {
252260
value = std::move(node->value);
253261

254262
--size_;

core/src/concurrent/impl/intrusive_mpsc_queue.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ bool IntrusiveMpscQueueImpl::PushIfEmpty(NodeRef node) noexcept {
5454
}
5555

5656
IntrusiveMpscQueueImpl::NodePtr IntrusiveMpscQueueImpl::TryPopBlocking() noexcept {
57-
return DoTryPop(PopMode::kRarelyBlocking);
57+
return TryPop(PopMode::kRarelyBlocking);
5858
}
5959

60-
IntrusiveMpscQueueImpl::NodePtr IntrusiveMpscQueueImpl::TryPopWeak() noexcept { return DoTryPop(PopMode::kWeak); }
60+
IntrusiveMpscQueueImpl::NodePtr IntrusiveMpscQueueImpl::TryPopWeak() noexcept { return TryPop(PopMode::kWeak); }
6161

62-
IntrusiveMpscQueueImpl::NodePtr IntrusiveMpscQueueImpl::DoTryPop(PopMode mode) noexcept {
62+
IntrusiveMpscQueueImpl::NodePtr IntrusiveMpscQueueImpl::TryPop(PopMode mode) noexcept {
6363
UASSERT_MSG(!is_consuming_.exchange(true), "Multiple concurrent consumers detected");
6464
const utils::FastScopeGuard guard([this]() noexcept {
6565
UASSERT_MSG(is_consuming_.exchange(false), "Multiple concurrent consumers detected");

0 commit comments

Comments
 (0)