@@ -249,10 +249,12 @@ bool DcpConnMap::handleSlowStream(Vbid vbid, const CheckpointCursor* cursor) {
249249 size_t lock_num = vbid.get () % vbConnLockNum;
250250 std::lock_guard<std::mutex> lh (vbConnLocks[lock_num]);
251251
252- for (const auto & connection : vbConns[vbid.get ()]) {
253- auto * producer = dynamic_cast <DcpProducer*>(&connection.get ());
254-
255- // Check that this connection is actually a producer
252+ for (const auto & weakPtr : vbConns[vbid.get ()]) {
253+ auto connection = weakPtr.lock ();
254+ if (!connection) {
255+ continue ;
256+ }
257+ auto * producer = dynamic_cast <DcpProducer*>(connection.get ());
256258 if (producer && producer->handleSlowStream (vbid, cursor)) {
257259 return true ;
258260 }
@@ -396,20 +398,28 @@ void DcpConnMap::removeVBConnections(DcpProducer& prod) {
396398 size_t lock_num = vbid.get () % vbConnLockNum;
397399 std::lock_guard<std::mutex> lh (vbConnLocks[lock_num]);
398400 auto & vb_conns = vbConns[vbid.get ()];
399- auto * cookie = prod.getCookie ();
400- vb_conns.remove_if ([cookie](ConnHandler& conn) {
401- return cookie == (conn.getCookie ());
402- });
401+ for (auto itr = vb_conns.begin (); itr != vb_conns.end (); ++itr) {
402+ auto connection = (*itr).lock ();
403+ // Erase if we cannot lock, or if the cookie matches
404+ if (!connection ||
405+ (connection && prod.getCookie () == connection->getCookie ())) {
406+ vb_conns.erase (itr);
407+ break ;
408+ }
409+ }
403410 }
404411}
405412
406413void DcpConnMap::notifyVBConnections (Vbid vbid, uint64_t bySeqno) {
407414 size_t lock_num = vbid.get () % vbConnLockNum;
408415 std::lock_guard<std::mutex> lh (vbConnLocks[lock_num]);
409416
410- for (auto & connection : vbConns[vbid.get ()]) {
411- auto * producer = dynamic_cast <DcpProducer*>(&connection.get ());
412- // Check that this connection is actually a producer
417+ for (auto & weakPtr : vbConns[vbid.get ()]) {
418+ auto connection = weakPtr.lock ();
419+ if (!connection) {
420+ continue ;
421+ }
422+ auto * producer = dynamic_cast <DcpProducer*>(connection.get ());
413423 if (producer) {
414424 producer->notifySeqnoAvailable (vbid, bySeqno);
415425 }
@@ -425,9 +435,12 @@ void DcpConnMap::seqnoAckVBPassiveStream(Vbid vbid) {
425435 // only Producers).
426436 // @todo-durability: not clear yet if for Consumers we can simplify by
427437 // keeping a 1-to-1 VB-to-Consumer mapping
428- for (auto & connection : vbConns[vbid.get ()]) {
429- auto * consumer = dynamic_cast <DcpConsumer*>(&connection.get ());
430- // Check that this connection is actually a consumer
438+ for (auto & weakPtr : vbConns[vbid.get ()]) {
439+ auto connection = weakPtr.lock ();
440+ if (!connection) {
441+ continue ;
442+ }
443+ auto * consumer = dynamic_cast <DcpConsumer*>(connection.get ());
431444 if (consumer) {
432445 // Note: Sync Repl enabled at Consumer only if Producer supports it.
433446 // This is to prevent that 6.5 Consumers send DCP_SEQNO_ACK to
0 commit comments