@@ -1358,12 +1358,20 @@ bool DcpProducer::setStreamDeadStatus(Vbid vbid,
13581358 end_stream_status_t status) {
13591359 auto rv = streams.find (vbid.get ());
13601360 if (rv != streams.end ()) {
1361- for (auto handle = rv->second ->rlock (); !handle.end (); handle.next ()) {
1362- if (handle.get ()->compareStreamId (sid)) {
1363- handle.get ()->setDead (status);
1364- return true ;
1361+ std::shared_ptr<Stream> streamPtr;
1362+ // MB-35073: holding StreamContainer rlock while calling setDead
1363+ // has been seen to cause lock inversion elsewhere.
1364+ // Collect sharedptr then setDead once lock is released (itr out of
1365+ // scope).
1366+ for (auto itr = rv->second ->rlock (); !itr.end (); itr.next ()) {
1367+ if (itr.get ()->compareStreamId (sid)) {
1368+ streamPtr = itr.get ();
1369+ break ;
13651370 }
13661371 }
1372+ if (streamPtr) {
1373+ streamPtr->setDead (status);
1374+ }
13671375 return true ;
13681376 }
13691377
@@ -1378,12 +1386,22 @@ void DcpProducer::closeAllStreams() {
13781386 streams.end (),
13791387 [&vbvector](StreamsMap::value_type& vt) {
13801388 vbvector.push_back ((Vbid)vt.first );
1381- auto handle = vt.second ->wlock ();
1382- while (!handle.end ()) {
1383- handle.get ()->setDead (END_STREAM_DISCONNECTED);
1384- handle.next ();
1389+ std::vector<std::shared_ptr<Stream>> streamPtrs;
1390+ // MB-35073: holding StreamContainer lock while
1391+ // calling setDead leads to lock inversion - so
1392+ // collect sharedptrs in one pass then setDead once
1393+ // lock is released (itr out of scope).
1394+ {
1395+ auto handle = vt.second ->wlock ();
1396+ for (; !handle.end (); handle.next ()) {
1397+ streamPtrs.push_back (handle.get ());
1398+ }
1399+ handle.clear ();
1400+ }
1401+
1402+ for (auto streamPtr : streamPtrs) {
1403+ streamPtr->setDead (END_STREAM_DISCONNECTED);
13851404 }
1386- handle.clear ();
13871405 });
13881406 }
13891407 for (const auto vbid: vbvector) {
0 commit comments