From a8f7f914fbc58727e56f1b9e906954c244652c91 Mon Sep 17 00:00:00 2001 From: iskettaneh <173953022+iskettaneh@users.noreply.github.com> Date: Thu, 20 Nov 2025 19:58:59 -0500 Subject: [PATCH] batchevel: PrepareLogEngineTruncation --- pkg/kv/kvserver/batcheval/BUILD.bazel | 1 + pkg/kv/kvserver/batcheval/cmd_truncate_log.go | 51 +++++++------------ pkg/kv/kvserver/batcheval/eval_context.go | 28 +++++----- pkg/kv/kvserver/replica.go | 5 -- pkg/kv/kvserver/replica_eval_context_span.go | 17 ++----- pkg/kv/kvserver/replica_raftlog.go | 44 ++++++++++++++++ 6 files changed, 78 insertions(+), 68 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 09a20269314a..04474036ddee 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -78,6 +78,7 @@ go_library( "//pkg/kv/kvserver/spanset", "//pkg/kv/kvserver/txnwait", "//pkg/kv/kvserver/uncertainty", + "//pkg/raft", "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/kv/kvserver/batcheval/cmd_truncate_log.go b/pkg/kv/kvserver/batcheval/cmd_truncate_log.go index 41b8979479d5..0b53dd3d6acd 100644 --- a/pkg/kv/kvserver/batcheval/cmd_truncate_log.go +++ b/pkg/kv/kvserver/batcheval/cmd_truncate_log.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -66,21 +67,25 @@ func TruncateLog( // existing entry's term? // TODO(pav-kv): some day, make args.Index an inclusive compaction index, and // eliminate the remaining +-1 arithmetics. - firstIndex := cArgs.EvalCtx.GetCompactedIndex() + 1 - if firstIndex >= args.Index { - if log.V(3) { - log.KvExec.Infof(ctx, "attempting to truncate previously truncated raft log. FirstIndex:%d, TruncateFrom:%d", - firstIndex, args.Index) - } - return result.Result{}, nil - } - // args.Index is the first index to keep. - term, err := cArgs.EvalCtx.GetTerm(args.Index - 1) + // Use the log engine to compute stats for the raft log. + // TODO(#136358): After we precisely maintain the Raft Log size, we could stop + // needing the Log Engine to compute the stats. + compactedIndex, term, logEngineReader, err := cArgs.EvalCtx.PrepareLogEngineTruncation(args.Index) if err != nil { - return result.Result{}, errors.Wrap(err, "getting term") + if errors.Is(err, raft.ErrCompacted) { + // The log has already been truncated past this point - this is a no-op. + if log.V(3) { + log.KvExec.Infof(ctx, "attempting to truncate previously truncated raft log. FirstIndex:%d, TruncateFrom:%d", + compactedIndex+1, args.Index) + } + return result.Result{}, nil + } + return result.Result{}, errors.Wrap(err, "preparing log truncation") } + defer logEngineReader.Close() + firstIndex := compactedIndex + 1 // Compute the number of bytes freed by this truncation. Note that using // firstIndex only make sense for the leaseholder as we base this off its // own first index (other replicas may have other first indexes). In @@ -92,34 +97,12 @@ func TruncateLog( // flight TruncateLogRequests, and using the firstIndex will result in // duplicate accounting. The ExpectedFirstIndex, populated for clusters at // LooselyCoupledRaftLogTruncation, allows us to avoid this problem. - // - // We have an additional source of error not mitigated by - // ExpectedFirstIndex. There is nothing synchronizing firstIndex with the - // state visible in readWriter. The former uses the in-memory state or - // fetches directly from the Engine. The latter uses Engine state from some - // point in time which can fall anywhere in the time interval starting from - // when the readWriter was created up to where we create an MVCCIterator - // below. - // TODO(sumeer): we can eliminate this error as part of addressing - // https://github.com/cockroachdb/cockroach/issues/55461 and - // https://github.com/cockroachdb/cockroach/issues/70974 that discuss taking - // a consistent snapshot of some Replica state and the engine. if args.ExpectedFirstIndex > firstIndex { firstIndex = args.ExpectedFirstIndex } start := keys.RaftLogKey(rangeID, firstIndex) end := keys.RaftLogKey(rangeID, args.Index) - // TODO(pav-kv): GetCompactedIndex, GetTerm, and NewReader calls can disagree - // on the state of the log since we don't hold any Replica locks here. Move - // the computation inside Replica where locking can be controlled precisely. - // - // Use the log engine to compute stats for the raft log. - // TODO(#136358): After we precisely maintain the Raft Log size, we could stop - // needing the Log Engine to compute the stats. - logReader := cArgs.EvalCtx.LogEngine().NewReader(storage.StandardDurability) - defer logReader.Close() - // Compute the stats delta that were to occur should the log entries be // purged. We do this as a side effect of seeing a new TruncatedState, // downstream of Raft. @@ -128,7 +111,7 @@ func TruncateLog( // are not tracked in the raft log delta. The delta will be adjusted below // raft. // We can pass zero as nowNanos because we're only interested in SysBytes. - ms, err := storage.ComputeStats(ctx, logReader, start, end, 0 /* nowNanos */) + ms, err := storage.ComputeStats(ctx, logEngineReader, start, end, 0 /* nowNanos */) if err != nil { return result.Result{}, errors.Wrap(err, "while computing stats of Raft log freed by truncation") } diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 00640fad5a64..f71f0db470ce 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -60,11 +60,10 @@ type EvalContext interface { GetNodeLocality() roachpb.Locality IsFirstRange() bool - GetCompactedIndex() kvpb.RaftIndex - GetTerm(index kvpb.RaftIndex) (kvpb.RaftTerm, error) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex - // LogEngine returns the engine that stores the raft log. - LogEngine() storage.Engine + PrepareLogEngineTruncation(firstIndexToKeep kvpb.RaftIndex) ( + kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error, + ) Desc() *roachpb.RangeDescriptor ContainsKey(key roachpb.Key) bool @@ -228,13 +227,6 @@ func (m *mockEvalCtxImpl) GetConcurrencyManager() concurrency.Manager { } } -func (m *mockEvalCtxImpl) LogEngine() storage.Engine { - if m.MockEvalCtx.LogEngine != nil { - return m.MockEvalCtx.LogEngine - } - panic("LogEngine not configured") -} - func (m *mockEvalCtxImpl) NodeID() roachpb.NodeID { return m.MockEvalCtx.NodeID } @@ -250,12 +242,16 @@ func (m *mockEvalCtxImpl) GetRangeID() roachpb.RangeID { func (m *mockEvalCtxImpl) IsFirstRange() bool { panic("unimplemented") } -func (m *mockEvalCtxImpl) GetCompactedIndex() kvpb.RaftIndex { - return m.CompactedIndex -} -func (m *mockEvalCtxImpl) GetTerm(kvpb.RaftIndex) (kvpb.RaftTerm, error) { - return m.Term, nil +func (m *mockEvalCtxImpl) PrepareLogEngineTruncation( + kvpb.RaftIndex, +) (kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error) { + if m.MockEvalCtx.LogEngine != nil { + return m.CompactedIndex, m.Term, m.MockEvalCtx.LogEngine.NewReader( + storage.StandardDurability), nil + } + panic("LogEngine not configured") } + func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex { panic("unimplemented") } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index e25b55002d78..d517b29ed6db 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1409,11 +1409,6 @@ func (r *Replica) StoreID() roachpb.StoreID { return r.store.StoreID() } -// LogEngine returns the log engine. -func (r *Replica) LogEngine() storage.Engine { - return r.store.LogEngine() -} - // EvalKnobs returns the EvalContext's Knobs. func (r *Replica) EvalKnobs() kvserverbase.BatchEvalTestingKnobs { return r.store.cfg.TestingKnobs.EvalKnobs diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index 0a34b390df96..24961b55b58d 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -81,24 +81,15 @@ func (rec *SpanSetReplicaEvalContext) GetNodeLocality() roachpb.Locality { return rec.i.GetNodeLocality() } -// GetCompactedIndex returns the compacted index of the raft log. -func (rec *SpanSetReplicaEvalContext) GetCompactedIndex() kvpb.RaftIndex { - return rec.i.GetCompactedIndex() -} - -// GetTerm returns the term for the given index in the Raft log. -func (rec *SpanSetReplicaEvalContext) GetTerm(i kvpb.RaftIndex) (kvpb.RaftTerm, error) { - return rec.i.GetTerm(i) -} - // GetLeaseAppliedIndex returns the lease index of the last applied command. func (rec *SpanSetReplicaEvalContext) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex { return rec.i.GetLeaseAppliedIndex() } -// LogEngine returns the log engine. -func (rec *SpanSetReplicaEvalContext) LogEngine() storage.Engine { - return rec.i.LogEngine() +func (rec *SpanSetReplicaEvalContext) PrepareLogEngineTruncation( + firstIndexToKeep kvpb.RaftIndex, +) (kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error) { + return rec.i.PrepareLogEngineTruncation(firstIndexToKeep) } // IsFirstRange returns true iff the replica belongs to the first range. diff --git a/pkg/kv/kvserver/replica_raftlog.go b/pkg/kv/kvserver/replica_raftlog.go index 505309ab0cd7..5d8ce6e4ef0e 100644 --- a/pkg/kv/kvserver/replica_raftlog.go +++ b/pkg/kv/kvserver/replica_raftlog.go @@ -15,6 +15,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) @@ -289,6 +291,48 @@ func (r *Replica) GetCompactedIndex() kvpb.RaftIndex { return r.raftCompactedIndexRLocked() } +// PrepareLogEngineTruncation prepares for truncating the Raft log up to +// firstIndexToKeep by returning the following fields atomically: +// - compactedIndex: The current compacted index of the Raft log. +// - term: The term of the last entry to truncate. +// - logReader: A reader over the log engine with a snapshot consistent with +// compactedIndex and term. +// - error: If the operation is a no-op (log is already compacted), it returns +// `ErrCompacted`. Otherwise, it returns an error if grabbing the +// term failed. +// +// The caller must close the returned reader error is nil. +func (r *Replica) PrepareLogEngineTruncation( + firstIndexToKeep kvpb.RaftIndex, +) (kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error) { + r.mu.RLock() + defer r.mu.RUnlock() + + // Check if the truncation is a no-op. + compactedIndex := r.raftCompactedIndexRLocked() + firstIndex := compactedIndex + 1 + if firstIndex >= firstIndexToKeep { + // The log truncation is a no-op, return early. + return compactedIndex, 0, nil, raft.ErrCompacted + } + + // Grab the raft term of the last index to truncate. + term, err := r.raftTermShMuLocked(firstIndexToKeep - 1) + if err != nil { + return compactedIndex, 0, nil, err + } + + // Before releasing the read mutex, grab a consistent snapshot of the log + // engine reader. This ensures that the compactedIndex, and term are + // consistent with the log engine reader's snapshot. + logReader := r.store.LogEngine().NewReader(storage.StandardDurability) + if err := logReader.PinEngineStateForIterators(fs.BatchEvalReadCategory); err != nil { + logReader.Close() + return compactedIndex, term, logReader, err + } + return compactedIndex, term, logReader, nil +} + // LogSnapshot returns an immutable point-in-time snapshot of the log storage. // // Requires that r.raftMu is held for writing, and r.mu for reading. In