Skip to content

Commit 6206a83

Browse files
jameseh96daverigby
authored andcommitted
MB-35061: Ensure Consumer is removed from vbConns when stream is erased
DcpConsumer::{registerStream,removeStream} were introduced in http://review.couchbase.org/#/c/103845/ . They are complementary methods to add a stream to the Consumer::streams map and add the Consumer to the DcpConnMap.vbConns (and vice versa), keeping the two consistent - a consumer without a (non-dead) stream for a given vb should not be in vbConns for that vb. In some cases, Consumer::streams.erase() was called directly, leaving the Consumer in the vbConns map, despite it no longer holding a relevant stream. Methods such as DcpConnMap::seqnoAckVBPassiveStream assume only a single Consumer will be stored per vbucket, and use the first found. This fails, as the "old" consumer does not have the associated stream. Change-Id: Ifedc7a11d0439c2ec9365c277c75e5119bbc991b Reviewed-on: http://review.couchbase.org/112784 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent d179ba0 commit 6206a83

File tree

4 files changed

+38
-8
lines changed

4 files changed

+38
-8
lines changed

engines/ep/src/connmap.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,3 +190,19 @@ void ConnMap::removeVBConnByVBId(const void* connCookie, Vbid vbid) {
190190
std::lock_guard<std::mutex> lh(vbConnLocks[lock_num]);
191191
removeVBConnByVBId_UNLOCKED(connCookie, vbid);
192192
}
193+
194+
bool ConnMap::vbConnectionExists(ConnHandler* conn, Vbid vbid) {
195+
size_t lock_num = vbid.get() % vbConnLockNum;
196+
std::lock_guard<std::mutex> lh(vbConnLocks[lock_num]);
197+
auto& connsForVb = vbConns[vbid.get()];
198+
199+
// Check whether the connhandler already exists in vbConns for the
200+
// provided vbid
201+
for (const auto& existingConn : connsForVb) {
202+
if (conn == existingConn.lock().get()) {
203+
return true;
204+
}
205+
}
206+
207+
return false;
208+
}

engines/ep/src/connmap.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ class ConnMap {
7676

7777
void removeVBConnByVBId(const void* connCookie, Vbid vbid);
7878

79+
/**
80+
* Checks (by pointer comparison) whether a ConnHandler is already
81+
* present in vbConns.
82+
*
83+
*/
84+
bool vbConnectionExists(ConnHandler* conn, Vbid vbid);
85+
7986
/**
8087
* Notifies the front-end synchronously on this thread that this paused
8188
* connection should be re-considered for work.

engines/ep/src/dcp/consumer.cc

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ ENGINE_ERROR_CODE DcpConsumer::addStream(uint32_t opaque,
291291
vbucket);
292292
return ENGINE_KEY_EEXISTS;
293293
} else {
294-
streams.erase(vbucket);
294+
removeStream(vbucket);
295295
}
296296
}
297297

@@ -1340,12 +1340,11 @@ void DcpConsumer::closeAllStreams() {
13401340

13411341
void DcpConsumer::closeStreamDueToVbStateChange(Vbid vbucket,
13421342
vbucket_state_t state) {
1343-
auto it = streams.erase(vbucket);
1344-
if (it.second) {
1343+
auto stream = removeStream(vbucket);
1344+
if (stream) {
13451345
logger->debug("({}) State changed to {}, closing passive stream!",
13461346
vbucket,
13471347
VBucket::toString(state));
1348-
auto& stream = it.first;
13491348
uint32_t bytesCleared = stream->setDead(END_STREAM_STATE);
13501349
flowControl.incrFreedBytes(bytesCleared);
13511350
scheduleNotifyIfNecessary();
@@ -1743,10 +1742,15 @@ void DcpConsumer::setDisconnect() {
17431742
void DcpConsumer::registerStream(std::shared_ptr<PassiveStream> stream) {
17441743
auto vbid = stream->getVBucket();
17451744
streams.insert({vbid, stream});
1746-
engine_.getDcpConnMap().addVBConnByVBId(shared_from_this(), vbid);
1745+
auto& connMap = engine_.getDcpConnMap();
1746+
1747+
Expects(!connMap.vbConnectionExists(this, vbid));
1748+
1749+
connMap.addVBConnByVBId(shared_from_this(), vbid);
17471750
}
17481751

1749-
void DcpConsumer::removeStream(Vbid vbid) {
1750-
streams.erase(vbid);
1752+
std::shared_ptr<PassiveStream> DcpConsumer::removeStream(Vbid vbid) {
1753+
auto eraseResult = streams.erase(vbid).first;
17511754
engine_.getDcpConnMap().removeVBConnByVBId(getCookie(), vbid);
1755+
return eraseResult;
17521756
}

engines/ep/src/dcp/consumer.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,9 +478,12 @@ class DcpConsumer : public ConnHandler,
478478
* Remove a stream from this Consumer and remove the VB-to-Consumer
479479
* mapping from DcpConnMap.
480480
*
481+
* Returns the removed stream ptr, or an empty shared_ptr if it
482+
* was not found
483+
*
481484
* @param vbid The stream to be removed
482485
*/
483-
void removeStream(Vbid vbid);
486+
std::shared_ptr<PassiveStream> removeStream(Vbid vbid);
484487

485488
/**
486489
* RAII helper class to update the flowControl object with the number of

0 commit comments

Comments
 (0)