Skip to content

Commit 4e8f55f

Browse files
jimwwalkerdaverigby
authored andcommitted
MB-33773: Don't take a reference to a buffered item
Taking a reference items in the buffer.messages structure is not safe as a closeStream will clear the messages, making the reference invalid. To fix the problem no reference is taken, instead we use std::move and whilst the queue is locked, move the DcpResponse* to the task's scope, now clearing the queue can't delete the object. Other changes are made to fit with this move, such as moving back on a OOM failure. Testing covers the original fault and exercises a few related code paths to be try and assert that under these situations we don't end up with a flowControl leak. Change-Id: Icfe6fc42ac2be0d2a76b42fa4a533f615d065194 Reviewed-on: http://review.couchbase.org/107961 Well-Formed: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 454186c commit 4e8f55f

File tree

5 files changed

+215
-11
lines changed

5 files changed

+215
-11
lines changed

engines/ep/src/dcp/flow-control.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ class FlowControl {
5353

5454
void addStats(ADD_STAT add_stat, const void *c);
5555

56+
uint64_t getFreedBytes() const {
57+
return freedBytes.load();
58+
}
59+
5660
private:
5761
void setBufSizeWithinBounds(size_t &bufSize);
5862

engines/ep/src/dcp/stream.cc

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2355,7 +2355,7 @@ process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed
23552355
// That is because the front-end thread checks if buffer.empty() for
23562356
// deciding if it's time to start again processing new incoming
23572357
// mutations. That happens in PassiveStream::messageReceived.
2358-
auto& response = buffer.front(lh);
2358+
std::unique_ptr<DcpResponse> response = std::move(buffer.front(lh));
23592359

23602360
// Release bufMutex whilst we attempt to process the message
23612361
// a lock inversion exists with connManager if we hold this.
@@ -2416,16 +2416,25 @@ process_items_error_t PassiveStream::processBufferedMessages(uint32_t& processed
24162416
// run.
24172417
// Note:
24182418
// 1) no need to re-acquire bufMutex here
2419-
// 2) we have not removed the message from the buffer yet
2419+
// 2) we have not removed the unique_ptr from the buffer yet, but
2420+
// we must give the item back to the buffer queue
24202421
if (failed && isActive()) {
2422+
lh.lock();
2423+
// isActive should be false if the queue was emptied, but check
2424+
// anyway so we're more robust against any future code changes to
2425+
// isActive and closeStream
2426+
if (!buffer.messages.empty()) {
2427+
buffer.front(lh) = std::move(response);
2428+
}
2429+
lh.unlock();
24212430
break;
24222431
}
24232432

24242433
// At this point we have processed the message successfully,
24252434
// then we can remove it from the buffer.
24262435
// Note: we need to re-acquire bufMutex to update the buffer safely
24272436
lh.lock();
2428-
buffer.pop_front(lh);
2437+
buffer.pop_front(lh, message_bytes);
24292438

24302439
count++;
24312440
if (ret != ENGINE_ERANGE) {

engines/ep/src/dcp/stream.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -838,11 +838,12 @@ class PassiveStream : public Stream {
838838
/*
839839
* Caller must of locked bufMutex and pass as lh (not asserted)
840840
*/
841-
std::unique_ptr<DcpResponse> pop_front(std::unique_lock<std::mutex>& lh) {
842-
std::unique_ptr<DcpResponse> rval(std::move(messages.front()));
841+
void pop_front(std::unique_lock<std::mutex>& lh, size_t bytesPopped) {
842+
if (messages.empty()) {
843+
return;
844+
}
843845
messages.pop_front();
844-
bytes -= rval->getMessageSize();
845-
return rval;
846+
bytes -= bytesPopped;
846847
}
847848

848849
/*

engines/ep/tests/mock/mock_dcp_consumer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ class MockDcpConsumer: public DcpConsumer {
5959
return producerIsVersion5orHigher;
6060
}
6161

62+
FlowControl& getFlowControl() {
63+
return flowControl;
64+
}
65+
6266
/*
6367
* Creates a PassiveStream.
6468
* @return a SingleThreadedRCPtr to the newly created MockPassiveStream.

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 190 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include "failover-table.h"
4343
#include "memory_tracker.h"
4444
#include "objectregistry.h"
45+
#include "replicationthrottle.h"
4546
#include "test_helpers.h"
4647
#include "thread_gate.h"
4748

@@ -3571,6 +3572,14 @@ class SingleThreadedStreamTest : public SingleThreadedEPBucketTest {
35713572
config_string += "max_size=104857600;replication_throttle_threshold=4";
35723573
SingleThreadedEPBucketTest::SetUp();
35733574
}
3575+
3576+
enum class mb_33773Mode {
3577+
closeStreamOnTask,
3578+
closeStreamBeforeTask,
3579+
noMemory,
3580+
noMemoryAndClosed
3581+
};
3582+
void mb_33773(mb_33773Mode mode);
35743583
};
35753584

35763585
/*
@@ -3707,7 +3716,6 @@ TEST_F(SingleThreadedStreamTest, MB31410) {
37073716
std::function<void()> hook = [&tg,
37083717
passiveStream,
37093718
&isFirstRun,
3710-
seqno,
37113719
nextFrontEndSeqno,
37123720
&sync]() {
37133721
// If the test succeeds (i.e., the frontEndTask above sees
@@ -3758,9 +3766,7 @@ TEST_F(SingleThreadedStreamTest, MB31410) {
37583766
if (numBufferedItems == 2) {
37593767
auto& bufferedMessages = passiveStream->getBufferMessages();
37603768
auto* dcpResponse = bufferedMessages.at(0).get();
3761-
EXPECT_EQ(seqno,
3762-
*dynamic_cast<MutationResponse&>(*dcpResponse)
3763-
.getBySeqno());
3769+
EXPECT_EQ(nullptr, dcpResponse);
37643770
dcpResponse = bufferedMessages.at(1).get();
37653771
EXPECT_EQ(nextFrontEndSeqno,
37663772
*dynamic_cast<MutationResponse&>(*dcpResponse)
@@ -3805,3 +3811,183 @@ TEST_F(SingleThreadedStreamTest, MB31410) {
38053811
// Cleanup
38063812
ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(opaque, vbid));
38073813
}
3814+
3815+
// Main test code for MB-33773, see TEST_F for details of each mode.
3816+
// The test generally forces the consumer to buffer mutations and then
3817+
// interleaves various operations using ProcessBufferedMessages_postFront_Hook
3818+
void SingleThreadedStreamTest::mb_33773(
3819+
SingleThreadedStreamTest::mb_33773Mode mode) {
3820+
setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
3821+
3822+
auto consumer =
3823+
std::make_shared<MockDcpConsumer>(*engine, cookie, "MB_33773");
3824+
3825+
uint32_t opaque = 0;
3826+
3827+
ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(opaque, vbid, 0 /*flags*/));
3828+
opaque++;
3829+
3830+
auto* passiveStream = static_cast<MockPassiveStream*>(
3831+
(consumer->getVbucketStream(vbid)).get());
3832+
ASSERT_TRUE(passiveStream->isActive());
3833+
3834+
const uint64_t snapStart = 1;
3835+
const uint64_t snapEnd = 100;
3836+
3837+
// The consumer receives the snapshot-marker
3838+
SnapshotMarker snapshotMarker(opaque,
3839+
vbid,
3840+
snapStart,
3841+
snapEnd,
3842+
dcp_marker_flag_t::MARKER_FLAG_MEMORY);
3843+
passiveStream->processMarker(&snapshotMarker);
3844+
3845+
// This code is tricking the replication throttle into returning pause so
3846+
// that the mutation's are buffered.
3847+
engine->getReplicationThrottle().adjustWriteQueueCap(0);
3848+
const size_t size = engine->getEpStats().getMaxDataSize();
3849+
engine->getEpStats().setMaxDataSize(1);
3850+
ASSERT_EQ(ReplicationThrottle::Status::Pause,
3851+
engine->getReplicationThrottle().getStatus());
3852+
3853+
// Push mutations
3854+
EXPECT_EQ(0, passiveStream->getNumBufferItems());
3855+
for (size_t seqno = snapStart; seqno < snapEnd; seqno++) {
3856+
EXPECT_EQ(ENGINE_SUCCESS,
3857+
consumer->mutation(
3858+
opaque,
3859+
makeStoredDocKey("k" + std::to_string(seqno)),
3860+
{},
3861+
0,
3862+
0,
3863+
0,
3864+
vbid,
3865+
0,
3866+
seqno,
3867+
0,
3868+
0,
3869+
0,
3870+
{},
3871+
0));
3872+
}
3873+
// and check they were buffered.
3874+
ASSERT_EQ(snapEnd - snapStart, passiveStream->getNumBufferItems());
3875+
engine->getEpStats().setMaxDataSize(size); // undo the quota adjustment
3876+
3877+
// We expect flowcontrol bytes to increase when the buffered items are
3878+
// discarded.
3879+
auto bytes = consumer->getFlowControl().getFreedBytes();
3880+
auto backoffs = consumer->getNumBackoffs();
3881+
size_t flowControlBytesFreed = 0; // this is used for one test only
3882+
switch (mode) {
3883+
case mb_33773Mode::closeStreamOnTask:{
3884+
// Create and set a hook that will call setDead, the hook executes
3885+
// just after an item has been taken from the buffer
3886+
std::function<void()> hook = [this, consumer]() {
3887+
consumer->closeStreamDueToVbStateChange(vbid, vbucket_state_active);
3888+
};
3889+
passiveStream->setProcessBufferedMessages_postFront_Hook(hook);
3890+
break;
3891+
}
3892+
case mb_33773Mode::closeStreamBeforeTask:
3893+
consumer->closeStreamDueToVbStateChange(vbid, vbucket_state_active);
3894+
break;
3895+
case mb_33773Mode::noMemory: {
3896+
// Fudge memory again so the task has to rebuffer the messages
3897+
std::function<void()> hook = [this]() {
3898+
engine->getEpStats().setMaxDataSize(1);
3899+
};
3900+
passiveStream->setProcessBufferedMessages_postFront_Hook(hook);
3901+
break;
3902+
}
3903+
case mb_33773Mode::noMemoryAndClosed: {
3904+
// This hook will force quota to 1 so the processing fails.
3905+
// But also closes the stream so that the messages queue is emptied.
3906+
// We are testing that the item we've moved out of the queue is still
3907+
// accounted in flow-control
3908+
std::function<void()> hook = [this,
3909+
consumer,
3910+
&flowControlBytesFreed]() {
3911+
engine->getEpStats().setMaxDataSize(1);
3912+
consumer->closeStreamDueToVbStateChange(vbid, vbucket_state_active);
3913+
// Capture flow control freed bytes which should now include all
3914+
// buffered messages, except one (which was moved)
3915+
flowControlBytesFreed = consumer->getFlowControl().getFreedBytes();
3916+
};
3917+
passiveStream->setProcessBufferedMessages_postFront_Hook(hook);
3918+
break;
3919+
}
3920+
}
3921+
3922+
// Run the NonIO task. Without any fix (and in the interleaved test) the
3923+
// task will grab a reference to an object which will be freed as a side
3924+
// affect of calling closeStream. Crash/ASAN failure will occur.
3925+
auto& nonIo = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
3926+
runNextTask(nonIo);
3927+
3928+
switch (mode) {
3929+
case mb_33773Mode::closeStreamOnTask:
3930+
case mb_33773Mode::closeStreamBeforeTask:
3931+
// Expect that after running the task, which closed the stream via the
3932+
// hook flow control freed increased to reflect the buffered items which
3933+
// were discarded,
3934+
EXPECT_GT(consumer->getFlowControl().getFreedBytes(), bytes);
3935+
return;
3936+
case mb_33773Mode::noMemory: {
3937+
std::function<void()> hook = [] {};
3938+
passiveStream->setProcessBufferedMessages_postFront_Hook(hook);
3939+
// fall through to next case
3940+
}
3941+
case mb_33773Mode::noMemoryAndClosed: {
3942+
// Undo memory fudge for the rest of the test
3943+
engine->getEpStats().setMaxDataSize(size);
3944+
break;
3945+
}
3946+
}
3947+
3948+
// NOTE: Only the noMemory test runs from here
3949+
3950+
// backoffs should of increased
3951+
EXPECT_GT(consumer->getNumBackoffs(), backoffs);
3952+
3953+
if (mode == mb_33773Mode::noMemoryAndClosed) {
3954+
// Check the hook updated this counter
3955+
EXPECT_NE(0, flowControlBytesFreed);
3956+
// And check that consumer flow control is even bigger now
3957+
EXPECT_GT(consumer->getFlowControl().getFreedBytes(),
3958+
flowControlBytesFreed);
3959+
} else {
3960+
// The items are still buffered
3961+
EXPECT_EQ(snapEnd - snapStart, passiveStream->getNumBufferItems());
3962+
// Run task again, it should of re-sheduled itself
3963+
runNextTask(nonIo);
3964+
// and all items now gone
3965+
EXPECT_EQ(0, passiveStream->getNumBufferItems());
3966+
}
3967+
}
3968+
3969+
// Do mb33773 with the close stream interleaved into the processBufferedMessages
3970+
// This is more reflective of the actual MB as this case would result in a fault
3971+
TEST_F(SingleThreadedStreamTest, MB_33773_interleaved) {
3972+
mb_33773(mb_33773Mode::closeStreamOnTask);
3973+
}
3974+
3975+
// Do mb33773 with the close stream before processBufferedMessages. This is
3976+
// checking that flow-control is updated with the fix in place
3977+
TEST_F(SingleThreadedStreamTest, MB_33773) {
3978+
mb_33773(mb_33773Mode::closeStreamBeforeTask);
3979+
}
3980+
3981+
// Test more of the changes in mb33773, this mode makes the processing fail
3982+
// because there's not enough memory, this makes us exercise the code that swaps
3983+
// a reponse back into the deque
3984+
TEST_F(SingleThreadedStreamTest, MB_33773_oom) {
3985+
mb_33773(mb_33773Mode::noMemory);
3986+
}
3987+
3988+
// Test more of the changes in mb33773, this mode makes the processing fail
3989+
// because there's not enough memory, this makes us exercise the code that swaps
3990+
// a reponse back into the deque
3991+
TEST_F(SingleThreadedStreamTest, MB_33773_oom_close) {
3992+
mb_33773(mb_33773Mode::noMemoryAndClosed);
3993+
}

0 commit comments

Comments
 (0)