@@ -631,6 +631,18 @@ ENGINE_ERROR_CODE DcpProducer::step(struct dcp_message_producers* producers) {
631631 se->getVbucket (),
632632 mapEndStreamStatus (getCookie (), se->getFlags ()),
633633 resp->getStreamId ());
634+
635+ if (resp->getStreamId () && sendStreamEndOnClientStreamClose) {
636+ if (!closeStreamInner (
637+ se->getVbucket (), resp->getStreamId (), true )
638+ .first ) {
639+ throw std::logic_error (
640+ " DcpProducer::step(StreamEnd): no stream was found "
641+ " for " +
642+ se->getVbucket ().to_string () + " " +
643+ resp->getStreamId ().to_string ());
644+ }
645+ }
634646 break ;
635647 }
636648 case DcpResponse::Event::Commit: {
@@ -1053,6 +1065,42 @@ bool DcpProducer::handleResponse(const protocol_binary_response_header* resp) {
10531065 return false ;
10541066}
10551067
1068+ std::pair<std::shared_ptr<Stream>, bool > DcpProducer::closeStreamInner (
1069+ Vbid vbucket, cb::mcbp::DcpStreamId sid, bool eraseFromMapIfFound) {
1070+ std::shared_ptr<Stream> stream;
1071+ bool vbFound = false ;
1072+
1073+ // Obtain exclusive access to the streams map and see if the vbucket is
1074+ // mapped.
1075+ std::lock_guard<StreamsMap> guard (streams);
1076+ auto rv = streams.find (vbucket, guard);
1077+ if (rv.second ) {
1078+ vbFound = true ;
1079+ // Vbucket is mapped, get exclusive access to the StreamContainer
1080+ auto handle = rv.first ->wlock ();
1081+ // Try and locate a matching stream
1082+ for (; !handle.end (); handle.next ()) {
1083+ if (handle.get ()->compareStreamId (sid)) {
1084+ stream = handle.get ();
1085+ break ;
1086+ }
1087+ }
1088+
1089+ if (eraseFromMapIfFound && stream) {
1090+ // Need to tidy up the map, we first call erase on the handle,
1091+ // which will erase the current element from the container
1092+ handle.erase ();
1093+
1094+ // If the container is now empty, remove it from the map, the
1095+ // shared_ptr (held by rv) will do the real destruction.
1096+ if (handle.empty ()) {
1097+ streams.erase (vbucket, guard);
1098+ }
1099+ }
1100+ }
1101+ return {stream, vbFound};
1102+ }
1103+
10561104ENGINE_ERROR_CODE DcpProducer::closeStream (uint32_t opaque,
10571105 Vbid vbucket,
10581106 cb::mcbp::DcpStreamId sid) {
@@ -1078,55 +1126,30 @@ ENGINE_ERROR_CODE DcpProducer::closeStream(uint32_t opaque,
10781126 }
10791127
10801128 /* We should not remove the stream from the streams map if we have to
1081- send the "STREAM_END" response asynchronously to the consumer */
1082- std::shared_ptr<Stream> stream;
1083-
1084- {
1085- // Obtain exclusive access to the streams map and see if the vbucket is
1086- // mapped.
1087- std::lock_guard<StreamsMap> guard (streams);
1088- auto rv = streams.find (vbucket, guard);
1089- if (rv.second ) {
1090- // Vbucket is mapped, get exclusive access to the StreamContainer
1091- auto handle = rv.first ->wlock ();
1092- // Try and locate a matching stream
1093- for (; !handle.end (); handle.next ()) {
1094- if (handle.get ()->compareStreamId (sid)) {
1095- stream = handle.get ();
1096- break ;
1097- }
1098- }
1099-
1100- if (!sendStreamEndOnClientStreamClose) {
1101- // Need to tidy up the map, we first call erase on the handle,
1102- // which will erase the current element from the container
1103- handle.erase ();
1104-
1105- // If the container is now empty, remove it from the map, the
1106- // shared_ptr (held by rv) will do the real destruction.
1107- if (handle.empty ()) {
1108- streams.erase (vbucket, guard);
1109- }
1110- }
1111- }
1112- } // end streams lock scope
1129+ send the "STREAM_END" response asynchronously to the consumer, so
1130+ use the value of sendStreamEndOnClientStreamClose to determine if the
1131+ stream should be removed if found*/
1132+ auto rv = closeStreamInner (vbucket, sid, !sendStreamEndOnClientStreamClose);
11131133
11141134 ENGINE_ERROR_CODE ret;
1115- if (!stream ) {
1135+ if (!rv. first ) {
11161136 logger->warn (
11171137 " ({}) Cannot close stream because no "
1118- " stream exists for this vbucket" ,
1119- vbucket);
1120- return ENGINE_KEY_ENOENT;
1138+ " stream exists for this vbucket {}" ,
1139+ vbucket,
1140+ sid);
1141+ return sid && rv.second ? ENGINE_DCP_STREAMID_INVALID
1142+ : ENGINE_KEY_ENOENT;
11211143 } else {
1122- if (!stream ->isActive ()) {
1144+ if (!rv. first ->isActive ()) {
11231145 logger->warn (
11241146 " ({}) Cannot close stream because "
1125- " stream is already marked as dead" ,
1126- vbucket);
1147+ " stream is already marked as dead {}" ,
1148+ vbucket,
1149+ sid);
11271150 ret = ENGINE_KEY_ENOENT;
11281151 } else {
1129- stream ->setDead (END_STREAM_CLOSED);
1152+ rv. first ->setDead (END_STREAM_CLOSED);
11301153 ret = ENGINE_SUCCESS;
11311154 }
11321155 if (!sendStreamEndOnClientStreamClose) {
0 commit comments