@@ -61,60 +61,42 @@ DcpConnMap::~DcpConnMap() {
6161 LOG (EXTENSION_LOG_NOTICE, " Deleted dcpConnMap_" );
6262}
6363
64- std::shared_ptr<ConnHandler> DcpConnMap::checkForAndRemoveExistingConn (
65- LockHolder& lh,
66- const void * cookie,
67- const std::string& name,
68- const std::string& connType) {
69- /*
70- * If we request a connection of the same name then mark the existing
71- * connection as "want to disconnect" and erase it from the map. The caller
72- * will put it in deadConnections for us after it cancels associated
73- * background tasks as we do that outside of the connsLock.
74- */
75- auto oldConnection = findByName_UNLOCKED (lh, name);
76- if (oldConnection) {
77- LOG (EXTENSION_LOG_NOTICE,
78- " %s Disconnecting existing Dcp %s %s as it has the same "
79- " name as a new connection %p" ,
80- oldConnection->logHeader (), connType.c_str (), name.c_str (), cookie);
81- oldConnection->setDisconnect ();
82- map_.erase (oldConnection->getCookie ());
83- }
84-
85- return oldConnection;
86- }
87-
88- DcpConsumer* DcpConnMap::newConsumer (const void * cookie,
89- const std::string& name) {
90- DcpConsumer* result = nullptr ;
91- std::shared_ptr<ConnHandler> oldConnection;
64+ DcpConsumer *DcpConnMap::newConsumer (const void * cookie,
65+ const std::string &name)
66+ {
67+ LockHolder lh (connsLock);
9268
9369 std::string conn_name (" eq_dcpq:" );
9470 conn_name.append (name);
9571
96- {
97- LockHolder lh (connsLock);
98- const auto & iter = map_.find (cookie);
99- if (iter != map_.end ()) {
100- iter->second ->setDisconnect ();
101- LOG (EXTENSION_LOG_NOTICE,
102- " Failed to create Dcp Consumer because connection "
103- " (%p) already exists." , cookie);
104- return nullptr ;
105- }
106-
107- oldConnection = checkForAndRemoveExistingConn (
108- lh, cookie, conn_name, " Consumer" );
109- auto consumer = std::make_shared<DcpConsumer>(engine, cookie, conn_name);
110-
111- LOG (EXTENSION_LOG_INFO, " %s Connection created" , consumer->logHeader ());
72+ const auto & iter = map_.find (cookie);
73+ if (iter != map_.end ()) {
74+ iter->second ->setDisconnect ();
75+ LOG (EXTENSION_LOG_NOTICE,
76+ " Failed to create Dcp Consumer because connection "
77+ " (%p) already exists." , cookie);
78+ return nullptr ;
79+ }
11280
113- result = consumer.get ();
114- map_[cookie] = std::move (consumer);
81+ /*
82+ * If we request a connection of the same name then
83+ * mark the existing connection as "want to disconnect".
84+ */
85+ for (const auto & cookieToConn : map_) {
86+ if (cookieToConn.second ->getName () == conn_name) {
87+ LOG (EXTENSION_LOG_NOTICE,
88+ " %s Disconnecting existing Dcp Consumer %p as it has the same "
89+ " name as a new connection %p" ,
90+ cookieToConn.second ->logHeader (), cookieToConn.first , cookie);
91+ cookieToConn.second ->setDisconnect ();
92+ }
11593 }
11694
117- disconnectConn (std::move (oldConnection));
95+ std::shared_ptr<DcpConsumer> dc =
96+ std::make_shared<DcpConsumer>(engine, cookie, conn_name);
97+ auto * result = dc.get ();
98+ LOG (EXTENSION_LOG_INFO, " %s Connection created" , dc->logHeader ());
99+ map_[cookie] = std::move (dc);
118100 return result;
119101}
120102
@@ -153,35 +135,43 @@ DcpProducer* DcpConnMap::newProducer(const void* cookie,
153135 const std::string& name,
154136 uint32_t flags,
155137 Collections::Filter filter) {
156- DcpProducer* result = nullptr ;
157- std::shared_ptr<ConnHandler> oldConnection;
138+ LockHolder lh (connsLock);
158139
159140 std::string conn_name (" eq_dcpq:" );
160141 conn_name.append (name);
161- {
162- LockHolder lh (connsLock);
163142
164- const auto & iter = map_.find (cookie);
165- if (iter != map_.end ()) {
166- iter->second ->setDisconnect ();
167- LOG (EXTENSION_LOG_NOTICE,
168- " Failed to create Dcp Producer because connection "
169- " (%p) already exists." , cookie);
170- return nullptr ;
171- }
172-
173- oldConnection = checkForAndRemoveExistingConn (
174- lh, cookie, conn_name, " Producer" );
175- auto producer = std::make_shared<DcpProducer>(
176- engine, cookie, conn_name, flags, std::move (filter), true /* startTask*/ );
177-
178- LOG (EXTENSION_LOG_INFO, " %s Connection created" , producer->logHeader ());
143+ const auto & iter = map_.find (cookie);
144+ if (iter != map_.end ()) {
145+ iter->second ->setDisconnect ();
146+ LOG (EXTENSION_LOG_NOTICE,
147+ " Failed to create Dcp Producer because connection "
148+ " (%p) already exists." , cookie);
149+ return nullptr ;
150+ }
179151
180- result = producer.get ();
181- map_[cookie] = std::move (producer);
152+ /*
153+ * If we request a connection of the same name then
154+ * mark the existing connection as "want to disconnect".
155+ */
156+ for (const auto & cookieToConn : map_) {
157+ if (cookieToConn.second ->getName () == conn_name) {
158+ LOG (EXTENSION_LOG_NOTICE,
159+ " %s Disconnecting existing Dcp Producer %p as it has the same "
160+ " name as a new connection %p" ,
161+ cookieToConn.second ->logHeader (), cookieToConn.first , cookie);
162+ cookieToConn.second ->setDisconnect ();
163+ }
182164 }
183165
184- disconnectConn (std::move (oldConnection));
166+ auto producer = std::make_shared<DcpProducer>(engine,
167+ cookie,
168+ conn_name,
169+ flags,
170+ std::move (filter),
171+ true /* startTask*/ );
172+ LOG (EXTENSION_LOG_INFO, " %s Connection created" , producer->logHeader ());
173+ auto * result = producer.get ();
174+ map_[cookie] = std::move (producer);
185175
186176 return result;
187177}
@@ -313,10 +303,6 @@ void DcpConnMap::disconnect(const void *cookie) {
313303 // acquire PassiveStream::buffer.bufMutex; and that could deadlock
314304 // in EPBucket::setVBucketState, via
315305 // PassiveStream::processBufferedMessages.
316- disconnectConn (std::move (conn));
317- }
318-
319- void DcpConnMap::disconnectConn (std::shared_ptr<ConnHandler>&& conn) {
320306 if (conn) {
321307 auto producer = std::dynamic_pointer_cast<DcpProducer>(conn);
322308 if (producer) {
@@ -519,11 +505,6 @@ void DcpConnMap::consumerBatchSizeConfigChanged(size_t newValue) {
519505
520506std::shared_ptr<ConnHandler> DcpConnMap::findByName (const std::string& name) {
521507 LockHolder lh (connsLock);
522- return findByName_UNLOCKED (lh, name);
523- }
524-
525- std::shared_ptr<ConnHandler> DcpConnMap::findByName_UNLOCKED (
526- LockHolder& lh, const std::string& name) {
527508 for (const auto & cookieToConn : map_) {
528509 // If the connection is NOT about to be disconnected
529510 // and the names match
0 commit comments