diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 09a20269314a..04474036ddee 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -78,6 +78,7 @@ go_library( "//pkg/kv/kvserver/spanset", "//pkg/kv/kvserver/txnwait", "//pkg/kv/kvserver/uncertainty", + "//pkg/raft", "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/kv/kvserver/batcheval/cmd_truncate_log.go b/pkg/kv/kvserver/batcheval/cmd_truncate_log.go index 41b8979479d5..134b28440787 100644 --- a/pkg/kv/kvserver/batcheval/cmd_truncate_log.go +++ b/pkg/kv/kvserver/batcheval/cmd_truncate_log.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" + "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -66,20 +67,25 @@ func TruncateLog( // existing entry's term? // TODO(pav-kv): some day, make args.Index an inclusive compaction index, and // eliminate the remaining +-1 arithmetics. - firstIndex := cArgs.EvalCtx.GetCompactedIndex() + 1 - if firstIndex >= args.Index { - if log.V(3) { - log.KvExec.Infof(ctx, "attempting to truncate previously truncated raft log. FirstIndex:%d, TruncateFrom:%d", - firstIndex, args.Index) - } - return result.Result{}, nil - } - // args.Index is the first index to keep. - term, err := cArgs.EvalCtx.GetTerm(args.Index - 1) + // Use the log engine to compute stats for the raft log. + // TODO(#136358): After we precisely maintain the Raft Log size, we could stop + // needing the Log Engine to compute the stats. + compactedIndex, term, logEngineReader, err := cArgs.EvalCtx.PrepareLogEngineTruncation(args.Index) if err != nil { - return result.Result{}, errors.Wrap(err, "getting term") + if errors.Is(err, raft.ErrCompacted) { + // The log has already been truncated past this point - this is a no-op. + if log.V(3) { + log.KvExec.Infof(ctx, "attempting to truncate previously truncated raft log. FirstIndex:%d, TruncateFrom:%d", + compactedIndex+1, args.Index) + } + return result.Result{}, nil + } + return result.Result{}, errors.Wrap(err, "preparing log truncation") } + defer logEngineReader.Close() + + firstIndex := compactedIndex + 1 // Compute the number of bytes freed by this truncation. Note that using // firstIndex only make sense for the leaseholder as we base this off its @@ -110,16 +116,6 @@ func TruncateLog( start := keys.RaftLogKey(rangeID, firstIndex) end := keys.RaftLogKey(rangeID, args.Index) - // TODO(pav-kv): GetCompactedIndex, GetTerm, and NewReader calls can disagree - // on the state of the log since we don't hold any Replica locks here. Move - // the computation inside Replica where locking can be controlled precisely. - // - // Use the log engine to compute stats for the raft log. - // TODO(#136358): After we precisely maintain the Raft Log size, we could stop - // needing the Log Engine to compute the stats. - logReader := cArgs.EvalCtx.LogEngine().NewReader(storage.StandardDurability) - defer logReader.Close() - // Compute the stats delta that were to occur should the log entries be // purged. We do this as a side effect of seeing a new TruncatedState, // downstream of Raft. @@ -128,7 +124,7 @@ func TruncateLog( // are not tracked in the raft log delta. The delta will be adjusted below // raft. // We can pass zero as nowNanos because we're only interested in SysBytes. - ms, err := storage.ComputeStats(ctx, logReader, start, end, 0 /* nowNanos */) + ms, err := storage.ComputeStats(ctx, logEngineReader, start, end, 0 /* nowNanos */) if err != nil { return result.Result{}, errors.Wrap(err, "while computing stats of Raft log freed by truncation") } diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 00640fad5a64..60bdf078621c 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" + "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" @@ -60,11 +61,15 @@ type EvalContext interface { GetNodeLocality() roachpb.Locality IsFirstRange() bool - GetCompactedIndex() kvpb.RaftIndex - GetTerm(index kvpb.RaftIndex) (kvpb.RaftTerm, error) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex - // LogEngine returns the engine that stores the raft log. - LogEngine() storage.Engine + PrepareLogEngineTruncation( + firstIndexToKeep kvpb.RaftIndex, + ) ( + compactedIndex kvpb.RaftIndex, + term kvpb.RaftTerm, + logReader storage.Reader, + err error, + ) Desc() *roachpb.RangeDescriptor ContainsKey(key roachpb.Key) bool @@ -228,13 +233,6 @@ func (m *mockEvalCtxImpl) GetConcurrencyManager() concurrency.Manager { } } -func (m *mockEvalCtxImpl) LogEngine() storage.Engine { - if m.MockEvalCtx.LogEngine != nil { - return m.MockEvalCtx.LogEngine - } - panic("LogEngine not configured") -} - func (m *mockEvalCtxImpl) NodeID() roachpb.NodeID { return m.MockEvalCtx.NodeID } @@ -250,12 +248,17 @@ func (m *mockEvalCtxImpl) GetRangeID() roachpb.RangeID { func (m *mockEvalCtxImpl) IsFirstRange() bool { panic("unimplemented") } -func (m *mockEvalCtxImpl) GetCompactedIndex() kvpb.RaftIndex { - return m.CompactedIndex -} -func (m *mockEvalCtxImpl) GetTerm(kvpb.RaftIndex) (kvpb.RaftTerm, error) { - return m.Term, nil +func (m *mockEvalCtxImpl) PrepareLogEngineTruncation( + kvpb.RaftIndex, +) (kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error) { + if m.MockEvalCtx.LogEngine != nil { + + return m.CompactedIndex, m.Term, m.MockEvalCtx.LogEngine.NewReader( + storage.StandardDurability), raft.ErrCompacted + } + panic("LogEngine not configured") } + func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex { panic("unimplemented") } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index e25b55002d78..d517b29ed6db 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1409,11 +1409,6 @@ func (r *Replica) StoreID() roachpb.StoreID { return r.store.StoreID() } -// LogEngine returns the log engine. -func (r *Replica) LogEngine() storage.Engine { - return r.store.LogEngine() -} - // EvalKnobs returns the EvalContext's Knobs. func (r *Replica) EvalKnobs() kvserverbase.BatchEvalTestingKnobs { return r.store.cfg.TestingKnobs.EvalKnobs diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index e7d901448c0f..fb1c90f04b62 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -81,24 +81,15 @@ func (rec *SpanSetReplicaEvalContext) GetNodeLocality() roachpb.Locality { return rec.i.GetNodeLocality() } -// GetCompactedIndex returns the compacted index of the raft log. -func (rec *SpanSetReplicaEvalContext) GetCompactedIndex() kvpb.RaftIndex { - return rec.i.GetCompactedIndex() -} - -// GetTerm returns the term for the given index in the Raft log. -func (rec *SpanSetReplicaEvalContext) GetTerm(i kvpb.RaftIndex) (kvpb.RaftTerm, error) { - return rec.i.GetTerm(i) -} - // GetLeaseAppliedIndex returns the lease index of the last applied command. func (rec *SpanSetReplicaEvalContext) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex { return rec.i.GetLeaseAppliedIndex() } -// LogEngine returns the log engine. -func (rec *SpanSetReplicaEvalContext) LogEngine() storage.Engine { - return rec.i.LogEngine() +func (rec *SpanSetReplicaEvalContext) PrepareLogEngineTruncation( + firstIndexToKeep kvpb.RaftIndex, +) (kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error) { + return rec.i.PrepareLogEngineTruncation(firstIndexToKeep) } // IsFirstRange returns true iff the replica belongs to the first range. diff --git a/pkg/kv/kvserver/replica_raftlog.go b/pkg/kv/kvserver/replica_raftlog.go index 505309ab0cd7..0eedb2e7464c 100644 --- a/pkg/kv/kvserver/replica_raftlog.go +++ b/pkg/kv/kvserver/replica_raftlog.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog" "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" ) @@ -289,6 +290,38 @@ func (r *Replica) GetCompactedIndex() kvpb.RaftIndex { return r.raftCompactedIndexRLocked() } +// PrepareLogEngineTruncation prepares for truncating the Raft log up to +// firstIndexToKeep by returning the following fields atomically: +// - compactedIndex: The current compacted index of the Raft log. +// - term: The term of the last entry to truncate. +// - logReader: A reader over the log engine for computing truncation stats. +// - noOp: True if the log is already truncated. +// - error: An error if we couldn't get the term. +// +// The caller must close the returned reader when non-nil. +func (r *Replica) PrepareLogEngineTruncation( + firstIndexToKeep kvpb.RaftIndex, +) (compactedIndex kvpb.RaftIndex, term kvpb.RaftTerm, logReader storage.Reader, err error) { + r.mu.RLock() + defer r.mu.RUnlock() + + // Check if the truncation is a no-op. + compactedIndex = r.raftCompactedIndexRLocked() + firstIndex := compactedIndex + 1 + if firstIndex >= firstIndexToKeep { + // The log truncation is a no-op. + return compactedIndex, term, logReader, raft.ErrCompacted + } + + term, err = r.raftTermShMuLocked(firstIndexToKeep - 1) + if err != nil { + return compactedIndex, term, logReader, err + } + + logReader = r.store.LogEngine().NewReader(storage.StandardDurability) + return compactedIndex, term, logReader, err +} + // LogSnapshot returns an immutable point-in-time snapshot of the log storage. // // Requires that r.raftMu is held for writing, and r.mu for reading. In