@@ -1134,6 +1134,15 @@ cb::engine_errc PassiveStream::processDropScope(
11341134
11351135void PassiveStream::processMarker (SnapshotMarker* marker) {
11361136 VBucketPtr vb = engine->getVBucket (vb_);
1137+ if (!vb) {
1138+ return ;
1139+ }
1140+
1141+ // Vbucket must be in a permitted state to apply the snapshot
1142+ folly::SharedMutex::ReadHolder rlh (vb->getStateLock ());
1143+ if (!permittedVBStates.test (vb->getState ())) {
1144+ return ;
1145+ }
11371146
11381147 cur_snapshot_start.store (marker->getStartSeqno ());
11391148 cur_snapshot_end.store (marker->getEndSeqno ());
@@ -1142,117 +1151,113 @@ void PassiveStream::processMarker(SnapshotMarker* marker) {
11421151 ? Snapshot::Disk
11431152 : Snapshot::Memory);
11441153
1145- if (vb) {
1146- auto checkpointType = marker->getFlags () & MARKER_FLAG_DISK
1147- ? CheckpointType::Disk
1148- : CheckpointType::Memory;
1154+ auto checkpointType = marker->getFlags () & MARKER_FLAG_DISK
1155+ ? CheckpointType::Disk
1156+ : CheckpointType::Memory;
11491157
1150- const auto historical = marker->getFlags () & MARKER_FLAG_HISTORY
1151- ? CheckpointHistorical::Yes
1152- : CheckpointHistorical::No;
1158+ const auto historical = marker->getFlags () & MARKER_FLAG_HISTORY
1159+ ? CheckpointHistorical::Yes
1160+ : CheckpointHistorical::No;
11531161
1154- // Check whether the snapshot can be considered as an initial disk
1155- // checkpoint for the replica.
1156- if (checkpointType == CheckpointType::Disk && vb->getHighSeqno () == 0 ) {
1157- checkpointType = CheckpointType::InitialDisk;
1158- }
1162+ // Check whether the snapshot can be considered as an initial disk
1163+ // checkpoint for the replica.
1164+ if (checkpointType == CheckpointType::Disk && vb->getHighSeqno () == 0 ) {
1165+ checkpointType = CheckpointType::InitialDisk;
1166+ }
11591167
1160- auto & ckptMgr = *vb->checkpointManager ;
1161-
1162- std::optional<uint64_t > hcs = marker->getHighCompletedSeqno ();
1163- if ((marker->getFlags () & MARKER_FLAG_DISK) &&
1164- !supportsSyncReplication) {
1165- // If this stream doesn't support SyncReplication (i.e. the producer
1166- // is a pre-MadHatter version) then we should consider the HCS to be
1167- // present but zero for disk snapshot (not possible for any
1168- // SyncWrites to have completed yet). If SyncReplication is
1169- // supported then use the value from the marker.
1170- hcs = 0 ;
1171- }
1168+ auto & ckptMgr = *vb->checkpointManager ;
11721169
1173- if (marker->getFlags () & MARKER_FLAG_DISK && !hcs) {
1174- const auto msg = fmt::format (
1175- " PassiveStream::processMarker: stream:{} {}, flags:{}, "
1176- " flagsDecoded:{}, snapStart:{}, snapEnd:{}, HCS:{} - "
1177- " missing HCS" ,
1178- name_,
1179- vb_,
1180- marker->getFlags (),
1181- dcpMarkerFlagsToString (marker->getFlags ()),
1182- marker->getStartSeqno (),
1183- marker->getEndSeqno (),
1184- to_string_or_none (hcs));
1185- throw std::logic_error (msg);
1186- }
1170+ std::optional<uint64_t > hcs = marker->getHighCompletedSeqno ();
1171+ if ((marker->getFlags () & MARKER_FLAG_DISK) && !supportsSyncReplication) {
1172+ // If this stream doesn't support SyncReplication (i.e. the producer
1173+ // is a pre-MadHatter version) then we should consider the HCS to be
1174+ // present but zero for disk snapshot (not possible for any
1175+ // SyncWrites to have completed yet). If SyncReplication is
1176+ // supported then use the value from the marker.
1177+ hcs = 0 ;
1178+ }
11871179
1188- if (marker->getFlags () & MARKER_FLAG_DISK) {
1189- // A replica could receive a duplicate DCP prepare during a disk
1190- // snapshot if it had previously received an uncompleted prepare.
1191- // We can receive a disk snapshot when we either:
1192- // a) First connect
1193- // b) Get cursor dropped by the active
1194- //
1195- // We selectively allow these prepares to overwrite the old one by
1196- // setting a duplicate prepare window in the vBucket. This will
1197- // allow any currently outstanding prepares to be overwritten, but
1198- // not any new ones.
1199- vb->setDuplicatePrepareWindow ();
1200- }
1180+ if (marker->getFlags () & MARKER_FLAG_DISK && !hcs) {
1181+ const auto msg = fmt::format (
1182+ " PassiveStream::processMarker: stream:{} {}, flags:{}, "
1183+ " flagsDecoded:{}, snapStart:{}, snapEnd:{}, HCS:{} - "
1184+ " missing HCS" ,
1185+ name_,
1186+ vb_,
1187+ marker->getFlags (),
1188+ dcpMarkerFlagsToString (marker->getFlags ()),
1189+ marker->getStartSeqno (),
1190+ marker->getEndSeqno (),
1191+ to_string_or_none (hcs));
1192+ throw std::logic_error (msg);
1193+ }
12011194
1202- // We could be connected to a non sync-repl, so if the max-visible is
1203- // not transmitted (optional is false), set visible to snap-end
1204- auto visibleSeq =
1205- marker->getMaxVisibleSeqno ().value_or (marker->getEndSeqno ());
1195+ if (marker->getFlags () & MARKER_FLAG_DISK) {
1196+ // A replica could receive a duplicate DCP prepare during a disk
1197+ // snapshot if it had previously received an uncompleted prepare.
1198+ // We can receive a disk snapshot when we either:
1199+ // a) First connect
1200+ // b) Get cursor dropped by the active
1201+ //
1202+ // We selectively allow these prepares to overwrite the old one by
1203+ // setting a duplicate prepare window in the vBucket. This will
1204+ // allow any currently outstanding prepares to be overwritten, but
1205+ // not any new ones.
1206+ vb->setDuplicatePrepareWindow ();
1207+ }
12061208
1207- if (checkpointType == CheckpointType::InitialDisk) {
1208- // Case: receiving the first snapshot in a Disk snapshot.
1209- // Note that replica may never execute here as the active may switch
1210- // directly to in-memory and send the first snapshot in a Memory
1211- // snapshot.
1209+ // We could be connected to a non sync-repl, so if the max-visible is
1210+ // not transmitted (optional is false), set visible to snap-end
1211+ auto visibleSeq =
1212+ marker->getMaxVisibleSeqno ().value_or (marker->getEndSeqno ());
1213+ if (checkpointType == CheckpointType::InitialDisk) {
1214+ // Case: receiving the first snapshot in a Disk snapshot.
1215+ // Note that replica may never execute here as the active may switch
1216+ // directly to in-memory and send the first snapshot in a Memory
1217+ // snapshot.
1218+
1219+ vb->setReceivingInitialDiskSnapshot (true );
1220+ ckptMgr.createSnapshot (cur_snapshot_start.load (),
1221+ cur_snapshot_end.load (),
1222+ hcs,
1223+ checkpointType,
1224+ visibleSeq,
1225+ historical);
1226+ } else {
1227+ // Case: receiving any type of snapshot (Disk/Memory).
12121228
1213- vb-> setReceivingInitialDiskSnapshot ( true );
1229+ if (marker-> getFlags () & MARKER_FLAG_CHK) {
12141230 ckptMgr.createSnapshot (cur_snapshot_start.load (),
12151231 cur_snapshot_end.load (),
12161232 hcs,
12171233 checkpointType,
12181234 visibleSeq,
12191235 historical);
12201236 } else {
1221- // Case: receiving any type of snapshot (Disk/Memory).
1222-
1223- if (marker->getFlags () & MARKER_FLAG_CHK) {
1237+ // MB-42780: In general we cannot merge multiple snapshots into
1238+ // the same checkpoint. The only exception is for when replica
1239+ // receives multiple Memory checkpoints in a row.
1240+ // Since 6.5.0 the Active behaves correctly with regard to that
1241+ // (ie, the Active always sets the MARKER_FLAG_CHK in a snapshot
1242+ // transition tha involves Disk snapshots), but older Producers
1243+ // may still miss the MARKER_FLAG_CHK.
1244+ if (prevSnapType == Snapshot::Memory &&
1245+ cur_snapshot_type == Snapshot::Memory) {
1246+ ckptMgr.extendOpenCheckpoint (cur_snapshot_end.load (),
1247+ visibleSeq);
1248+ } else {
12241249 ckptMgr.createSnapshot (cur_snapshot_start.load (),
12251250 cur_snapshot_end.load (),
12261251 hcs,
12271252 checkpointType,
12281253 visibleSeq,
12291254 historical);
1230- } else {
1231- // MB-42780: In general we cannot merge multiple snapshots into
1232- // the same checkpoint. The only exception is for when replica
1233- // receives multiple Memory checkpoints in a row.
1234- // Since 6.5.0 the Active behaves correctly with regard to that
1235- // (ie, the Active always sets the MARKER_FLAG_CHK in a snapshot
1236- // transition tha involves Disk snapshots), but older Producers
1237- // may still miss the MARKER_FLAG_CHK.
1238- if (prevSnapType == Snapshot::Memory &&
1239- cur_snapshot_type == Snapshot::Memory) {
1240- ckptMgr.extendOpenCheckpoint (cur_snapshot_end.load (),
1241- visibleSeq);
1242- } else {
1243- ckptMgr.createSnapshot (cur_snapshot_start.load (),
1244- cur_snapshot_end.load (),
1245- hcs,
1246- checkpointType,
1247- visibleSeq,
1248- historical);
1249- }
12501255 }
12511256 }
1257+ }
12521258
1253- if (marker->getFlags () & MARKER_FLAG_ACK) {
1254- cur_snapshot_ack = true ;
1255- }
1259+ if (marker->getFlags () & MARKER_FLAG_ACK) {
1260+ cur_snapshot_ack = true ;
12561261 }
12571262}
12581263
0 commit comments