Skip to content

Commit 36b30ff

Browse files
craig[bot]tbg
andcommitted
Merge #145947
145947: kvserver: use vanilla truncation in applySnapshotRaftMuLocked r=tbg a=tbg Previously, applySnapshotRaftMuLocked performed the log truncation that comes with snapshot application in an "ad-hoc" way. In particular, it wasn't clearing the sideloaded storage. We know call the same methods {stage,finalize}TruncationRaftMuLocked that are also called in the regular truncation code, which also clears the sideloaded storage. We also ensure that the raft log size is zero and trusted after the snapshot. Follow-up to #145328. Epic: CRDB-46488 Co-authored-by: Tobias Grieger <[email protected]>
2 parents 7cc372f + 45ac58b commit 36b30ff

File tree

6 files changed

+97
-47
lines changed

6 files changed

+97
-47
lines changed

pkg/kv/kvserver/raft_log_truncator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ type pendingTruncation struct {
162162
// ReplicatedEvalResult.RaftLogDelta, this is <= 0.
163163
logDeltaBytes int64
164164
isDeltaTrusted bool
165-
// hasSideloaded is true if the truncated interval contains at least one
165+
// hasSideloaded is true if the truncated interval could contain at least one
166166
// sideloaded entry.
167167
hasSideloaded bool
168168
}

pkg/kv/kvserver/replica_application_result.go

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1212
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1313
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
14+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
1415
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
1516
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
1617
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
@@ -505,6 +506,69 @@ func (r *Replica) stagePendingTruncationRaftMuLocked(pt pendingTruncation) {
505506
r.asLogStorage().stagePendingTruncationRaftMuLocked(pt)
506507
}
507508

509+
func (r *replicaLogStorage) stageApplySnapshotRaftMuLocked(
510+
truncState kvserverpb.RaftTruncatedState,
511+
) {
512+
r.raftMu.AssertHeld()
513+
514+
// A snapshot application implies a log truncation to the snapshot's index,
515+
// and we apply the resulting memory state here (before the snapshot takes
516+
// effect, i.e. the log entries disappear from storage). This avoids
517+
// situations in which entries were already removed, but the in-mem state
518+
// indicates that they ought to still exist.
519+
//
520+
// The truncation finalized below, after the snapshot is visible.
521+
522+
// Clear the raft entry cache at the end of this method (after mu has been
523+
// released). Any reader that obtains their log bounds after the critical
524+
// section but before the clear will see an empty log anyway, since the
525+
// in-memory state is already updated to reflect the truncation, even if
526+
// entries are still present in the cache.
527+
defer r.cache.Drop(r.ls.RangeID)
528+
529+
r.mu.Lock()
530+
defer r.mu.Unlock()
531+
532+
// On snapshots, the entire log is cleared. This is safe:
533+
// - log entries preceding the entry represented by the snapshot are durable
534+
// via the snapshot itself, and
535+
// - committed log entries ahead of the snapshot index were not acked by this
536+
// replica, or raft would not have accepted this snapshot.
537+
//
538+
// Here, we update the in-memory state to reflect this before making the
539+
// corresponding change to on-disk state. This makes sure that concurrent
540+
// readers don't try to access entries no longer present in the log.
541+
r.updateStateRaftMuLockedMuLocked(logstore.RaftState{
542+
LastIndex: truncState.Index,
543+
LastTerm: truncState.Term,
544+
ByteSize: 0,
545+
})
546+
r.shMu.trunc = truncState
547+
r.shMu.lastCheckSize = 0
548+
r.shMu.sizeTrusted = true
549+
}
550+
551+
func (r *replicaLogStorage) finalizeApplySnapshotRaftMuLocked(ctx context.Context) {
552+
r.raftMu.AssertHeld()
553+
// This mirrors finalizeTruncationRaftMuLocked, but a snapshot may regress the last
554+
// index (to discard a divergent log). For example:
555+
//
556+
// Raft log (before snapshot):
557+
// - entry 100-150: term 1 [committed]
558+
// - entry 151-200: term 2
559+
// Committed raft log (on leader):
560+
// - entry 100-150: term 1
561+
// - entry 151: term 3
562+
//
563+
// The replica may receive a snapshot at index 151. If we don't clear the
564+
// sideloaded storage all the way up to the *old* last index, we may leak
565+
// sideloaded entries. Rather than remember the old last index, we instead
566+
// clear the sideloaded storage entirely. This is equivalent.
567+
if err := r.ls.Sideload.Clear(ctx); err != nil {
568+
log.Errorf(ctx, "while clearing sideloaded storage after snapshot: %+v", err)
569+
}
570+
}
571+
508572
func (r *replicaLogStorage) stagePendingTruncationRaftMuLocked(pt pendingTruncation) {
509573
r.raftMu.AssertHeld()
510574
// NB: The expected first index can be zero if this proposal is from before
@@ -513,7 +577,6 @@ func (r *replicaLogStorage) stagePendingTruncationRaftMuLocked(pt pendingTruncat
513577
// this doesn't need any special casing.
514578
pt.isDeltaTrusted = pt.isDeltaTrusted && r.shMu.trunc.Index+1 == pt.expectedFirstIndex
515579

516-
// TODO(pav-kv): move this logic to replicaLogStorage type.
517580
func() {
518581
r.mu.Lock()
519582
defer r.mu.Unlock()

pkg/kv/kvserver/replica_application_state_machine.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type applyCommittedEntriesStats struct {
4242
appBatchStats
4343
followerStoreWriteBytes kvadmission.FollowerStoreWriteBytes
4444
numBatchesProcessed int // TODO(sep-raft-log): numBatches
45-
stateAssertions int
45+
assertionsRequested int
4646
numConfChangeEntries int
4747
}
4848

@@ -206,14 +206,9 @@ func (sm *replicaStateMachine) ApplySideEffects(
206206
// Some tests (TestRangeStatsInit) assumes that once the store has started
207207
// and the first range has a lease that there will not be a later hard-state.
208208
if shouldAssert {
209-
// Assert that the on-disk state doesn't diverge from the in-memory
209+
// Queue a check that the on-disk state doesn't diverge from the in-memory
210210
// state as a result of the side effects.
211-
sm.r.mu.RLock()
212-
// TODO(sep-raft-log): either check only statemachine invariants or
213-
// pass both engines in.
214-
sm.r.assertStateRaftMuLockedReplicaMuRLocked(ctx, sm.r.store.TODOEngine())
215-
sm.r.mu.RUnlock()
216-
sm.applyStats.stateAssertions++
211+
sm.applyStats.assertionsRequested++
217212
}
218213
} else if res := cmd.ReplicatedResult(); !res.IsZero() {
219214
log.Fatalf(ctx, "failed to handle all side-effects of ReplicatedEvalResult: %v", res)

pkg/kv/kvserver/replica_raft.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,7 @@ func (s handleRaftReadyStats) SafeFormat(p redact.SafePrinter, _ rune) {
823823
}
824824
p.SafeString("]")
825825

826-
if n := s.apply.stateAssertions; n > 0 {
826+
if n := s.apply.assertionsRequested; n > 0 {
827827
p.Printf(", state_assertions=%d", n)
828828
}
829829
if s.snap.offered {
@@ -1082,6 +1082,11 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
10821082
}
10831083
}
10841084

1085+
// If this field is set, by the end of the method (after snapshot, append,
1086+
// apply handling), we will verify invariants including checking that
1087+
// in-memory state is congruent with disk state.
1088+
var shouldAssert bool
1089+
10851090
// Grab the known leaseholder before applying to the state machine.
10861091
startingLeaseholderID := r.shMu.state.Lease.Replica.ReplicaID
10871092
refreshReason := noReason
@@ -1111,6 +1116,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
11111116
}
11121117

11131118
if app.Snapshot != nil {
1119+
shouldAssert = true
11141120
if inSnap.Desc == nil {
11151121
// If we didn't expect Raft to have a snapshot but it has one
11161122
// regardless, that is unexpected and indicates a programming
@@ -1236,6 +1242,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
12361242
// it is now marked as destroyed.
12371243
return stats, err
12381244
}
1245+
shouldAssert = shouldAssert || stats.apply.assertionsRequested > 0
12391246

12401247
if r.store.cfg.KVAdmissionController != nil &&
12411248
stats.apply.followerStoreWriteBytes.NumEntries > 0 {
@@ -1268,6 +1275,15 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
12681275
if r.store.TestingKnobs().EnableUnconditionalRefreshesInRaftReady {
12691276
refreshReason = reasonNewLeaderOrConfigChange
12701277
}
1278+
1279+
if shouldAssert {
1280+
sm.r.mu.RLock()
1281+
// TODO(sep-raft-log): either check only statemachine invariants or
1282+
// pass both engines in.
1283+
sm.r.assertStateRaftMuLockedReplicaMuRLocked(ctx, sm.r.store.TODOEngine())
1284+
sm.r.mu.RUnlock()
1285+
}
1286+
12711287
if refreshReason != noReason {
12721288
r.mu.Lock()
12731289
r.refreshProposalsLocked(ctx, 0 /* refreshAtDelta */, refreshReason)

pkg/kv/kvserver/replica_raft_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func Test_handleRaftReadyStats_SafeFormat(t *testing.T) {
103103
numAddSST: 3,
104104
numAddSSTCopies: 1,
105105
},
106-
stateAssertions: 4,
106+
assertionsRequested: 4,
107107
numConfChangeEntries: 6,
108108
},
109109
append: logstore.AppendStats{

pkg/kv/kvserver/replica_raftstorage.go

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1414
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/snaprecv"
16-
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
1716
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
1817
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary"
1918
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
@@ -607,15 +606,11 @@ func (r *Replica) applySnapshotRaftMuLocked(
607606
clearedSpans = append(clearedSpans, clearedUnreplicatedSpan)
608607
clearedSpans = append(clearedSpans, clearedSubsumedSpans...)
609608

610-
// Drop the entry cache before ingestion, like a real truncation would.
611-
//
612-
// TODO(sep-raft-log): like a real truncation, we should also bump the
613-
// in-memory truncated state to the snapshot index. We should also assert
614-
// that this leads to a (logically) empty log (otherwise we wouldn't have
615-
// accepted the snapshot).
616-
//
617-
// See: https://github.com/cockroachdb/cockroach/pull/145328#discussion_r2068209588
618-
r.store.raftEntryCache.Drop(r.RangeID)
609+
ls := r.asLogStorage()
610+
611+
// Stage the truncation, so that in-memory state reflects an
612+
// empty log.
613+
ls.stageApplySnapshotRaftMuLocked(truncState)
619614

620615
stats.subsumedReplicas = timeutil.Now()
621616

@@ -655,6 +650,9 @@ func (r *Replica) applySnapshotRaftMuLocked(
655650
// snapshot.
656651
writeBytes = uint64(inSnap.SSTSize)
657652
}
653+
// The snapshot is visible, so finalize the truncation.
654+
ls.finalizeApplySnapshotRaftMuLocked(ctx)
655+
658656
// The "ignored" here is to ignore the writes to create the AC linear models
659657
// for LSM writes. Since these writes typically correspond to actual writes
660658
// onto the disk, we account for them separately in
@@ -683,6 +681,9 @@ func (r *Replica) applySnapshotRaftMuLocked(
683681
log.Fatalf(ctx, "snapshot RaftAppliedIndexTerm %d doesn't match its metadata term %d",
684682
state.RaftAppliedIndexTerm, nonemptySnap.Metadata.Term)
685683
}
684+
if ls.shMu.size != 0 {
685+
log.Fatalf(ctx, "expected empty raft log after snapshot, got %d", ls.shMu.size)
686+
}
686687

687688
// Read the prior read summary for this range, which was included in the
688689
// snapshot. We may need to use it to bump our timestamp cache if we
@@ -737,23 +738,6 @@ func (r *Replica) applySnapshotRaftMuLocked(
737738
// without risking a lock-ordering deadlock.
738739
r.store.mu.Unlock()
739740

740-
// The log has been cleared and reset to start at the snapshot's applied
741-
// index/term. Update the in-memory metadata accordingly.
742-
r.asLogStorage().updateStateRaftMuLockedMuLocked(logstore.RaftState{
743-
LastIndex: truncState.Index,
744-
LastTerm: truncState.Term,
745-
ByteSize: 0, // the log is empty now
746-
})
747-
ls := r.asLogStorage()
748-
ls.shMu.trunc = truncState
749-
// Snapshots typically have fewer log entries than the leaseholder. The next
750-
// time we hold the lease, recompute the log size before making decisions.
751-
//
752-
// TODO(pav-kv): does this assume that snapshots can contain log entries,
753-
// which is no longer true? The comment needs an update, and the decision to
754-
// set this flag to false revisited.
755-
ls.shMu.sizeTrusted = false
756-
757741
// Update the store stats for the data in the snapshot.
758742
r.store.metrics.subtractMVCCStats(ctx, r.tenantMetricsRef, *r.shMu.state.Stats)
759743
r.store.metrics.addMVCCStats(ctx, r.tenantMetricsRef, *state.Stats)
@@ -791,14 +775,6 @@ func (r *Replica) applySnapshotRaftMuLocked(
791775

792776
r.mu.Unlock()
793777

794-
// Assert that the in-memory and on-disk states of the Replica are congruent
795-
// after the application of the snapshot. Do so under a read lock, as this
796-
// operation can be expensive. This is safe, as we hold the Replica.raftMu
797-
// across both Replica.mu critical sections.
798-
r.mu.RLock()
799-
r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.TODOEngine())
800-
r.mu.RUnlock()
801-
802778
// The rangefeed processor is listening for the logical ops attached to
803779
// each raft command. These will be lost during a snapshot, so disconnect
804780
// the rangefeed, if one exists.

0 commit comments

Comments
 (0)