Skip to content

Commit 2bf70e3

Browse files
committed
[BP] MB-68753: Ensure new DcpConsumer is eligible for notifications
The DcpConsumer is contructed and leaving the parent member paused as false. This means that ConnManager::run will ignore the connection and that any local scheduleNotify will be a no-op. This issue is behind the intermitent failure seen in MB-67544 This appears to work because we maybe relying upon step getting many callbacks, and in the case where the connection uses its timeslice no more callbacks until a notification is requested (which never happens because of the paused=false issue). Change-Id: I7357524b08d3e581b85c20b6ea76d0de4c58623f Reviewed-on: https://review.couchbase.org/c/kv_engine/+/234655 Tested-by: Jim Walker <[email protected]> Well-Formed: Restriction Checker Reviewed-by: Faizan Alam <[email protected]>
1 parent 36a0f4c commit 2bf70e3

File tree

5 files changed

+17
-4
lines changed

5 files changed

+17
-4
lines changed

engines/ep/src/dcp/consumer.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,10 @@ DcpConsumer::DcpConsumer(EventuallyPersistentEngine& engine,
212212
// Enable ChangeStreams on this connection
213213
changeStreamsNegotiation.state =
214214
BlockingDcpControlNegotiation::State::PendingRequest;
215+
216+
// MB-68753: Pause the consumer so subsequent scheduleNotify will wake the
217+
// consumer and the connection will get callbacks from ConnManager::run
218+
pause(PausedReason::ReadyListEmpty);
215219
}
216220

217221
DcpConsumer::~DcpConsumer() {
@@ -351,6 +355,9 @@ cb::engine_errc DcpConsumer::addStream(uint32_t opaque,
351355
opaqueMap_[new_opaque] = std::make_pair(opaque, vbucket);
352356
pendingAddStream = false;
353357

358+
// A DcpStreamRequest should be ready for transmission
359+
scheduleNotify();
360+
354361
return cb::engine_errc::success;
355362
}
356363

@@ -1244,6 +1251,7 @@ void DcpConsumer::addStats(const AddStatFn& add_stat, const CookieIface* c) {
12441251
c);
12451252

12461253
addStat("synchronous_replication", isSyncReplicationEnabled(), add_stat, c);
1254+
addStat("pending_add_stream", pendingAddStream, add_stat, c);
12471255
}
12481256

12491257
void DcpConsumer::aggregateQueueStats(ConnCounter& aggregator) const {

engines/ep/src/dcp/consumer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,7 @@ class DcpConsumer : public ConnHandler,
552552
const std::chrono::seconds dcpNoopTxInterval;
553553

554554
// Step can't start sending packets until we've received add stream
555-
bool pendingAddStream = true;
555+
cb::RelaxedAtomic<bool> pendingAddStream = true;
556556

557557
bool pendingEnableNoop;
558558
bool pendingSendNoopInterval;

engines/ep/src/dcp/passive_stream.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ PassiveStream::PassiveStream(EventuallyPersistentEngine* e,
7070
std::lock_guard<std::mutex> lh(streamMutex);
7171
streamRequest_UNLOCKED(vb_uuid);
7272
itemsReady.store(true);
73+
// Cannot call scheduleNotify here as we're not in the readyStreamsVBQueue.
74+
// The addStream path will call scheduleNotify as at that point this new
75+
// object will be in the readyStreamsVBQueue and can be picked up by step.
7376
}
7477

7578
PassiveStream::~PassiveStream() {

engines/ep/tests/module_tests/dcp_single_threaded_test.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "vbucket.h"
2626

2727
#include <programs/engine_testapp/mock_cookie.h>
28+
#include <programs/engine_testapp/mock_server.h>
2829
#include <spdlog/spdlog.h>
2930

3031
class STDcpTest : public STParameterizedBucketTest {
@@ -496,6 +497,7 @@ void STDcpTest::processConsumerMutationsNearThreshold(bool beyondThreshold) {
496497
auto& mockConnMap = static_cast<MockDcpConnMap&>(connMap);
497498
auto consumer =
498499
std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
500+
EXPECT_TRUE(consumer->isPaused());
499501
mockConnMap.addConn(cookie, consumer);
500502

501503
/* Replica vbucket */
@@ -570,9 +572,6 @@ void STDcpTest::processConsumerMutationsNearThreshold(bool beyondThreshold) {
570572
task here */
571573
EXPECT_EQ(stop_processing, consumer->processBufferedItems());
572574

573-
/* Expect the connection to be notified */
574-
EXPECT_FALSE(consumer->isPaused());
575-
576575
/* Expect disconnect signal in Ephemeral with "fail_new_data" policy */
577576
EXPECT_EQ(cb::engine_errc::disconnect, consumer->step(producers));
578577
} else {

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2648,6 +2648,9 @@ TEST_P(FlowControlTest, NotifyConsumerOnlyIfFlowControlEnabled) {
26482648
// Setup the stream
26492649
ASSERT_EQ(cb::engine_errc::success,
26502650
consumer->addStream(opaque, vbid, 0 /*flags*/));
2651+
// Test for addStream notify because this queues an outbound message
2652+
ASSERT_TRUE(mock_cookie_notified(cookie_to_mock_cookie(cookie)));
2653+
mock_waitfor_cookie(cookie_to_mock_cookie(cookie));
26512654
opaque += 1;
26522655
ASSERT_EQ(cb::engine_errc::success,
26532656
consumer->snapshotMarker(opaque,

0 commit comments

Comments
 (0)