Skip to content

Commit 4df733f

Browse files
trondndaverigby
authored andcommitted
Create DCP consumer / producer without holding the lock
Previously we grabbed a write lock to the underlying map and checked if we already had the cookie registered (which would _ALWAYS_ fail as the core won't allow DCP open on a connection which is already marked as DCP). After that it would check if we had a connection with the same logical name before we would create the object and insert it into the map. We only need the lock while we check for the name and insert the new stream. Remove the code to check for an existing consumer/producer for the cookie and the unit tests for that (it is tested via DcpTest::CantDcpOpenTwice that the second DCP open won't reach the underlying engine and is refused from the packet validators) Change-Id: I70a1f4a3217baa07405b2156d2bb84d3931c8c11 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/143688 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 8aa650c commit 4df733f

File tree

3 files changed

+27
-174
lines changed

3 files changed

+27
-174
lines changed

engines/ep/src/dcp/dcpconnmap.cc

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -75,37 +75,26 @@ DcpConsumer* DcpConnMap::newConsumer(const void* cookie,
7575
std::string conn_name("eq_dcpq:");
7676
conn_name.append(name);
7777

78-
auto handle = connStore->getCookieToConnectionMapHandle();
78+
auto consumer = makeConsumer(engine, cookie, conn_name, consumerName);
79+
EP_LOG_DEBUG("{} Connection created", consumer->logHeader());
80+
auto* rawPtr = consumer.get();
7981

80-
const auto& connForCookie = handle->findConnHandlerByCookie(cookie);
81-
if (connForCookie) {
82-
connForCookie->setDisconnect();
83-
EP_LOG_INFO(
84-
"Failed to create Dcp Consumer because connection "
85-
"({}) already exists.",
86-
cookie);
87-
return nullptr;
88-
}
82+
// Get a write-handle!
83+
auto handle = connStore->getCookieToConnectionMapHandle();
8984

90-
/*
91-
* If we request a connection of the same name then
92-
* mark the existing connection as "want to disconnect".
93-
*/
85+
// If we request a connection of the same name then
86+
// mark the existing connection as "want to disconnect".
9487
const auto& connForName = handle->findConnHandlerByName(conn_name);
9588
if (connForName) {
9689
EP_LOG_INFO(
9790
"{} Disconnecting existing Dcp Consumer {} as it has the "
98-
"same "
99-
"name as a new connection {}",
91+
"same name as a new connection {}",
10092
connForName->logHeader(),
10193
connForName->getCookie(),
10294
cookie);
10395
connForName->setDisconnect();
10496
}
10597

106-
auto consumer = makeConsumer(engine, cookie, conn_name, consumerName);
107-
EP_LOG_DEBUG("{} Connection created", consumer->logHeader());
108-
auto* rawPtr = consumer.get();
10998
handle->addConnByCookie(cookie, std::move(consumer));
11099
return rawPtr;
111100
}
@@ -160,16 +149,13 @@ DcpProducer* DcpConnMap::newProducer(const void* cookie,
160149
std::string conn_name("eq_dcpq:");
161150
conn_name.append(name);
162151

152+
auto producer = std::make_shared<DcpProducer>(
153+
engine, cookie, conn_name, flags, true /*startTask*/);
154+
EP_LOG_DEBUG("{} Connection created", producer->logHeader());
155+
auto* result = producer.get();
156+
157+
// Get a write-handle!
163158
auto handle = connStore->getCookieToConnectionMapHandle();
164-
const auto& connForCookie = handle->findConnHandlerByCookie(cookie);
165-
if (connForCookie) {
166-
connForCookie->flagDisconnect();
167-
EP_LOG_INFO(
168-
"Failed to create Dcp Producer because connection "
169-
"({}) already exists.",
170-
cookie);
171-
return nullptr;
172-
}
173159

174160
// If we request a connection of the same name then mark the
175161
// existing connection as "want to disconnect" and pull it out from
@@ -182,8 +168,7 @@ DcpProducer* DcpConnMap::newProducer(const void* cookie,
182168
if (connForName) {
183169
EP_LOG_INFO(
184170
"{} Disconnecting existing Dcp Producer {} as it has the "
185-
"same "
186-
"name as a new connection {}",
171+
"same name as a new connection {}",
187172
connForName->logHeader(),
188173
connForName->getCookie(),
189174
cookie);
@@ -197,12 +182,7 @@ DcpProducer* DcpConnMap::newProducer(const void* cookie,
197182
// here will lead to issues described MB-36451.
198183
}
199184

200-
auto producer = std::make_shared<DcpProducer>(
201-
engine, cookie, conn_name, flags, true /*startTask*/);
202-
EP_LOG_DEBUG("{} Connection created", producer->logHeader());
203-
auto* result = producer.get();
204185
handle->addConnByCookie(cookie, producer);
205-
206186
return result;
207187
}
208188

engines/ep/tests/ep_testsuite_dcp.cc

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1749,61 +1749,6 @@ static enum test_result test_dcp_producer_open(EngineIface* h) {
17491749
return SUCCESS;
17501750
}
17511751

1752-
static enum test_result test_dcp_producer_open_same_cookie(EngineIface* h) {
1753-
auto* cookie = testHarness->create_cookie(h);
1754-
const std::string name("unittest");
1755-
uint32_t opaque = 0;
1756-
const uint32_t seqno = 0;
1757-
auto dcp = requireDcpIface(h);
1758-
1759-
checkeq(ENGINE_SUCCESS,
1760-
dcp->open(cookie,
1761-
opaque,
1762-
seqno,
1763-
cb::mcbp::request::DcpOpenPayload::Producer,
1764-
name,
1765-
R"({"consumer_name":"replica1"})"),
1766-
"Failed dcp producer open connection.");
1767-
1768-
const auto stat_type("eq_dcpq:" + name + ":type");
1769-
auto type = get_str_stat(h, stat_type.c_str(), "dcp");
1770-
checkeq(0, type.compare("producer"), "Producer not found");
1771-
/*
1772-
* Number of references is 2 (as opposed to 1) because a
1773-
* mock_connstuct is initialised to having 1 reference
1774-
* to represent a client being connected to it.
1775-
*/
1776-
checkeq(2,
1777-
testHarness->get_number_of_mock_cookie_references(cookie),
1778-
"Number of cookie references is not two");
1779-
/*
1780-
* engine_data needs to be reset so that it passes the check that
1781-
* a connection does not already exist on the same socket.
1782-
*/
1783-
testHarness->store_engine_specific(cookie, nullptr);
1784-
1785-
checkeq(ENGINE_DISCONNECT,
1786-
dcp->open(cookie,
1787-
opaque++,
1788-
seqno,
1789-
cb::mcbp::request::DcpOpenPayload::Producer,
1790-
name,
1791-
R"({"consumer_name":"replica1"})"),
1792-
"Failed to return ENGINE_DISCONNECT");
1793-
1794-
checkeq(2,
1795-
testHarness->get_number_of_mock_cookie_references(cookie),
1796-
"Number of cookie references is not two");
1797-
1798-
testHarness->destroy_cookie(cookie);
1799-
1800-
checkeq(1,
1801-
testHarness->get_number_of_mock_cookie_references(cookie),
1802-
"Number of cookie references is not one");
1803-
1804-
return SUCCESS;
1805-
}
1806-
18071752
static enum test_result test_dcp_noop(EngineIface* h) {
18081753
auto* cookie = testHarness->create_cookie(h);
18091754
const std::string name("unittest");
@@ -8246,8 +8191,6 @@ BaseTestCase testsuite_testcases[] = {
82468191
cleanup),
82478192
TestCase("test open producer", test_dcp_producer_open,
82488193
test_setup, teardown, nullptr, prepare, cleanup),
8249-
TestCase("test open producer same cookie", test_dcp_producer_open_same_cookie,
8250-
test_setup, teardown, nullptr, prepare, cleanup),
82518194
TestCase("test dcp noop", test_dcp_noop, test_setup, teardown, nullptr,
82528195
prepare, cleanup),
82538196
TestCase("test dcp noop failure", test_dcp_noop_fail, test_setup,

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 12 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1340,35 +1340,6 @@ TEST_P(ConnectionTest, test_mb17042_duplicate_name_consumer_connections) {
13401340
<< "Dead connections still remain";
13411341
}
13421342

1343-
TEST_P(ConnectionTest, test_mb17042_duplicate_cookie_producer_connections) {
1344-
MockDcpConnMap connMap(*engine);
1345-
connMap.initialize();
1346-
auto* cookie = create_mock_cookie(engine);
1347-
// Create a new Dcp producer
1348-
DcpProducer* producer = connMap.newProducer(cookie,
1349-
"test_producer1",
1350-
/*flags*/ 0);
1351-
1352-
// Create a duplicate Dcp producer
1353-
DcpProducer* duplicateproducer = connMap.newProducer(cookie,
1354-
"test_producer2",
1355-
/*flags*/ 0);
1356-
1357-
EXPECT_TRUE(producer->doDisconnect()) << "producer doDisconnect == false";
1358-
EXPECT_EQ(nullptr, duplicateproducer) << "duplicateproducer is not null";
1359-
1360-
// Disconnect the producer connection
1361-
connMap.disconnect(cookie);
1362-
// Cleanup the deadConnections
1363-
connMap.manageConnections();
1364-
// Should be zero deadConnections
1365-
EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
1366-
<< "Dead connections still remain";
1367-
}
1368-
1369-
1370-
1371-
13721343
TEST_P(ConnectionTest, test_producer_unknown_ctrl_msg) {
13731344
auto* cookie = create_mock_cookie(engine);
13741345
/* Create a new Dcp producer */
@@ -1386,28 +1357,6 @@ TEST_P(ConnectionTest, test_producer_unknown_ctrl_msg) {
13861357
destroy_mock_cookie(cookie);
13871358
}
13881359

1389-
TEST_P(ConnectionTest, test_mb17042_duplicate_cookie_consumer_connections) {
1390-
MockDcpConnMap connMap(*engine);
1391-
connMap.initialize();
1392-
auto* cookie = create_mock_cookie(engine);
1393-
// Create a new Dcp consumer
1394-
DcpConsumer* consumer = connMap.newConsumer(cookie, "test_consumer1");
1395-
1396-
// Create a duplicate Dcp consumer
1397-
DcpConsumer* duplicateconsumer =
1398-
connMap.newConsumer(cookie, "test_consumer2");
1399-
EXPECT_TRUE(consumer->doDisconnect()) << "consumer doDisconnect == false";
1400-
EXPECT_EQ(nullptr, duplicateconsumer) << "duplicateconsumer is not null";
1401-
1402-
// Disconnect the consumer connection
1403-
connMap.disconnect(cookie);
1404-
// Cleanup the deadConnections
1405-
connMap.manageConnections();
1406-
// Should be zero deadConnections
1407-
EXPECT_EQ(0, connMap.getNumberOfDeadConnections())
1408-
<< "Dead connections still remain";
1409-
}
1410-
14111360
TEST_P(ConnectionTest, test_update_of_last_message_time_in_consumer) {
14121361
auto* cookie = create_mock_cookie(engine);
14131362
Vbid vbid(0);
@@ -1774,12 +1723,12 @@ class DcpConnMapTest : public ::testing::Test {
17741723
* MB-36915: With a recent change, we unconditionally acquire an exclusive
17751724
* lock to vbstate in KVBucket::setVBucketState. But, the new lock
17761725
* introduces a potential deadlock by lock-inversion on connLock and
1777-
* vbstateLock in EPE::dcpOpen if a connection with the same cookie or name
1726+
* vbstateLock in EPE::dcpOpen if a connection with the same name
17781727
* already exists in conn-map. TSAN easily spots the issue as soon as we
17791728
* have an execution where two threads run in parallel and execute the code
17801729
* responsible for the potential deadlock, which is what this test achieves.
17811730
*/
1782-
void testLockInversionInSetVBucketStateAndNewProducer(ConnExistsBy by);
1731+
void testLockInversionInSetVBucketStateAndNewProducer();
17831732

17841733
SynchronousEPEngineUniquePtr engine;
17851734
const Vbid vbid = Vbid(0);
@@ -2227,8 +2176,7 @@ TEST_F(DcpConnMapTest, ConnAggStats) {
22272176
destroy_mock_cookie(statsCookie);
22282177
}
22292178

2230-
void DcpConnMapTest::testLockInversionInSetVBucketStateAndNewProducer(
2231-
ConnExistsBy by) {
2179+
void DcpConnMapTest::testLockInversionInSetVBucketStateAndNewProducer() {
22322180
auto& connMap = dynamic_cast<MockDcpConnMap&>(engine->getDcpConnMap());
22332181
auto* cookie = create_mock_cookie(engine.get());
22342182
const std::string connName = "producer";
@@ -2277,28 +2225,15 @@ void DcpConnMapTest::testLockInversionInSetVBucketStateAndNewProducer(
22772225
// Note: ActiveStream::setDead executed only if re-creating the same
22782226
// connection (ie, same cookie or connection name).
22792227
auto* cookie2 = create_mock_cookie(engine.get());
2280-
switch (by) {
2281-
case ConnExistsBy::Cookie: {
2282-
const std::string newConnName = "newName";
2283-
EXPECT_FALSE(connMap.newProducer(cookie, newConnName, flags));
2284-
// Note: Connection flagged as disconnected but not yet removed
2285-
// from conn-map. See comments in ConnMap::newProducer.
2286-
EXPECT_TRUE(connMap.doesVbConnExist(vbid, "eq_dcpq:" + connName));
2287-
break;
2288-
}
2289-
case ConnExistsBy ::Name: {
2290-
producer = connMap.newProducer(cookie2, connName, flags);
2291-
ASSERT_TRUE(producer);
2292-
// Check that the connection has been re-created with the same name
2293-
// and exists in vbConns at stream-req
2294-
{
2295-
SCOPED_TRACE("");
2296-
streamRequest(*producer);
2297-
}
2298-
EXPECT_TRUE(connMap.doesVbConnExist(vbid, "eq_dcpq:" + connName));
2299-
break;
2300-
}
2228+
producer = connMap.newProducer(cookie2, connName, flags);
2229+
ASSERT_TRUE(producer);
2230+
// Check that the connection has been re-created with the same name
2231+
// and exists in vbConns at stream-req
2232+
{
2233+
SCOPED_TRACE("");
2234+
streamRequest(*producer);
23012235
}
2236+
EXPECT_TRUE(connMap.doesVbConnExist(vbid, "eq_dcpq:" + connName));
23022237

23032238
t1.join();
23042239

@@ -2308,14 +2243,9 @@ void DcpConnMapTest::testLockInversionInSetVBucketStateAndNewProducer(
23082243
destroy_mock_cookie(cookie2);
23092244
}
23102245

2311-
TEST_F(DcpConnMapTest,
2312-
AvoidLockInversionInSetVBucketStateAndNewProducerExistingCookie) {
2313-
testLockInversionInSetVBucketStateAndNewProducer(ConnExistsBy::Cookie);
2314-
}
2315-
23162246
TEST_F(DcpConnMapTest,
23172247
AvoidLockInversionInSetVBucketStateAndNewProducerExistingName) {
2318-
testLockInversionInSetVBucketStateAndNewProducer(ConnExistsBy::Name);
2248+
testLockInversionInSetVBucketStateAndNewProducer();
23192249
}
23202250

23212251
class NotifyTest : public DCPTest {

0 commit comments

Comments
 (0)