Skip to content

Commit f6bf85c

Browse files
author
Gerrit Code Review
committed
Merge "MB-68753: Merge commit '2bf70e3a2' into 'couchbase/trinity'" into trinity
2 parents 9a76d5e + aa574eb commit f6bf85c

File tree

5 files changed

+17
-4
lines changed

5 files changed

+17
-4
lines changed

engines/ep/src/dcp/consumer.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,10 @@ DcpConsumer::DcpConsumer(EventuallyPersistentEngine& engine,
206206
// Enable ChangeStreams on this connection
207207
changeStreamsNegotiation.state =
208208
BlockingDcpControlNegotiation::State::PendingRequest;
209+
210+
// MB-68753: Pause the consumer so subsequent scheduleNotify will wake the
211+
// consumer and the connection will get callbacks from ConnManager::run
212+
pause(PausedReason::ReadyListEmpty);
209213
}
210214

211215
DcpConsumer::~DcpConsumer() {
@@ -345,6 +349,9 @@ cb::engine_errc DcpConsumer::addStream(uint32_t opaque,
345349
opaqueMap_[new_opaque] = std::make_pair(opaque, vbucket);
346350
pendingAddStream = false;
347351

352+
// A DcpStreamRequest should be ready for transmission
353+
scheduleNotify();
354+
348355
return cb::engine_errc::success;
349356
}
350357

@@ -1276,6 +1283,7 @@ void DcpConsumer::addStats(const AddStatFn& add_stat, CookieIface& c) {
12761283
c);
12771284

12781285
addStat("synchronous_replication", isSyncReplicationEnabled(), add_stat, c);
1286+
addStat("pending_add_stream", pendingAddStream, add_stat, c);
12791287
}
12801288

12811289
void DcpConsumer::addStreamStats(const AddStatFn& add_stat,

engines/ep/src/dcp/consumer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ class DcpConsumer : public ConnHandler,
570570
std::chrono::duration<float> dcpNoopTxInterval;
571571

572572
// Step can't start sending packets until we've received add stream
573-
bool pendingAddStream = true;
573+
cb::RelaxedAtomic<bool> pendingAddStream = true;
574574

575575
bool pendingEnableNoop;
576576
NoopIntervalNegotiation noopIntervalNegotiation;

engines/ep/src/dcp/passive_stream.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ PassiveStream::PassiveStream(EventuallyPersistentEngine* e,
6868
std::lock_guard<std::mutex> lh(streamMutex);
6969
streamRequest_UNLOCKED(vb_uuid);
7070
itemsReady.store(true);
71+
// Cannot call scheduleNotify here as we're not in the readyStreamsVBQueue.
72+
// The addStream path will call scheduleNotify as at that point this new
73+
// object will be in the readyStreamsVBQueue and can be picked up by step.
7174
}
7275

7376
PassiveStream::~PassiveStream() {

engines/ep/tests/module_tests/dcp_single_threaded_test.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "vbucket.h"
2626

2727
#include <programs/engine_testapp/mock_cookie.h>
28+
#include <programs/engine_testapp/mock_server.h>
2829
#include <spdlog/spdlog.h>
2930

3031
class STDcpTest : public STParameterizedBucketTest {
@@ -584,6 +585,7 @@ TEST_P(STDcpTest, ProcessUnackedBytesAtReplicationOOM) {
584585
auto& mockConnMap = static_cast<MockDcpConnMap&>(connMap);
585586
auto consumer =
586587
std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
588+
EXPECT_TRUE(consumer->isPaused());
587589
mockConnMap.addConn(cookie, consumer);
588590

589591
// Replica vbucket
@@ -642,9 +644,6 @@ TEST_P(STDcpTest, ProcessUnackedBytesAtReplicationOOM) {
642644
EXPECT_EQ(0, vb.getHighSeqno());
643645
EXPECT_EQ(0, unackedBytes);
644646

645-
// Expect the connection to be notified
646-
EXPECT_FALSE(consumer->isPaused());
647-
648647
// Simulate the DcpConsumerTask - Nothing to process, we are just
649648
// disconnecting
650649
EXPECT_EQ(all_processed, consumer->processUnackedBytes());

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2776,6 +2776,9 @@ void FlowControlTestBase::testNotifyConsumerOnlyIfFlowControlEnabled(
27762776
// Setup the stream
27772777
ASSERT_EQ(cb::engine_errc::success,
27782778
consumer->addStream(opaque, vbid, 0 /*flags*/));
2779+
// Test for addStream notify because this queues an outbound message
2780+
ASSERT_TRUE(mock_cookie_notified(cookie_to_mock_cookie(cookie)));
2781+
mock_waitfor_cookie(cookie_to_mock_cookie(cookie));
27792782
opaque += 1;
27802783
ASSERT_EQ(cb::engine_errc::success,
27812784
consumer->snapshotMarker(opaque,

0 commit comments

Comments
 (0)