@@ -159,6 +159,32 @@ std::string PassiveStream::getStreamTypeName() const {
159159}
160160
161161void PassiveStream::acceptStream (cb::mcbp::Status status, uint32_t add_opaque) {
162+ VBucketPtr vb = engine->getVBucket (vb_);
163+ if (!vb) {
164+ log (spdlog::level::level_enum::warn,
165+ " ({}) PassiveStream::acceptStream(): status:{} - Unable to find "
166+ " VBucket - cannot accept Stream." ,
167+ vb_,
168+ status);
169+ return ;
170+ }
171+
172+ auto consumer = consumerPtr.lock ();
173+ if (!consumer) {
174+ log (spdlog::level::level_enum::warn,
175+ " ({}) PassiveStream::acceptStream(): status:{} - Unable to lock "
176+ " Consumer - cannot accept Stream." ,
177+ vb_,
178+ status);
179+ return ;
180+ }
181+
182+ // For SyncReplication streams lookup the highPreparedeSqno to check if
183+ // we need to re-ACK (after accepting the stream).
184+ const int64_t highPreparedSeqno = consumer->isSyncReplicationEnabled ()
185+ ? vb->getHighPreparedSeqno ()
186+ : 0 ;
187+
162188 std::unique_lock<std::mutex> lh (streamMutex);
163189 if (isPending ()) {
164190 pushToReadyQ (std::make_unique<AddStreamResponse>(
@@ -169,15 +195,9 @@ void PassiveStream::acceptStream(cb::mcbp::Status status, uint32_t add_opaque) {
169195 // replica can commit any in-flight SyncWrites if no further
170196 // SyncWrites are done and no disk snapshots processed by this
171197 // replica.
172- {
173- auto consumer = consumerPtr.lock ();
174- if (consumer && consumer->isSyncReplicationEnabled ()) {
175- auto vb = engine->getVBucket (vb_);
176- if (vb && vb->getHighPreparedSeqno () != 0 ) {
177- pushToReadyQ (std::make_unique<SeqnoAcknowledgement>(
178- opaque_, vb_, vb->getHighPreparedSeqno ()));
179- }
180- }
198+ if (highPreparedSeqno) {
199+ pushToReadyQ (std::make_unique<SeqnoAcknowledgement>(
200+ opaque_, vb_, highPreparedSeqno));
181201 }
182202 transitionState (StreamState::AwaitingFirstSnapshotMarker);
183203 } else {
0 commit comments