Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_library(
"//pkg/kv/kvserver/spanset",
"//pkg/kv/kvserver/txnwait",
"//pkg/kv/kvserver/uncertainty",
"//pkg/raft",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
Expand Down
40 changes: 18 additions & 22 deletions pkg/kv/kvserver/batcheval/cmd_truncate_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -66,20 +67,25 @@ func TruncateLog(
// existing entry's term?
// TODO(pav-kv): some day, make args.Index an inclusive compaction index, and
// eliminate the remaining +-1 arithmetics.
firstIndex := cArgs.EvalCtx.GetCompactedIndex() + 1
if firstIndex >= args.Index {
if log.V(3) {
log.KvExec.Infof(ctx, "attempting to truncate previously truncated raft log. FirstIndex:%d, TruncateFrom:%d",
firstIndex, args.Index)
}
return result.Result{}, nil
}

// args.Index is the first index to keep.
term, err := cArgs.EvalCtx.GetTerm(args.Index - 1)
// Use the log engine to compute stats for the raft log.
// TODO(#136358): After we precisely maintain the Raft Log size, we could stop
// needing the Log Engine to compute the stats.
compactedIndex, term, logEngineReader, err := cArgs.EvalCtx.PrepareLogEngineTruncation(args.Index)
if err != nil {
return result.Result{}, errors.Wrap(err, "getting term")
if errors.Is(err, raft.ErrCompacted) {
// The log has already been truncated past this point - this is a no-op.
if log.V(3) {
log.KvExec.Infof(ctx, "attempting to truncate previously truncated raft log. FirstIndex:%d, TruncateFrom:%d",
compactedIndex+1, args.Index)
}
return result.Result{}, nil
}
return result.Result{}, errors.Wrap(err, "preparing log truncation")
}
defer logEngineReader.Close()

firstIndex := compactedIndex + 1

// Compute the number of bytes freed by this truncation. Note that using
// firstIndex only make sense for the leaseholder as we base this off its
Expand Down Expand Up @@ -110,16 +116,6 @@ func TruncateLog(
start := keys.RaftLogKey(rangeID, firstIndex)
end := keys.RaftLogKey(rangeID, args.Index)

// TODO(pav-kv): GetCompactedIndex, GetTerm, and NewReader calls can disagree
// on the state of the log since we don't hold any Replica locks here. Move
// the computation inside Replica where locking can be controlled precisely.
//
// Use the log engine to compute stats for the raft log.
// TODO(#136358): After we precisely maintain the Raft Log size, we could stop
// needing the Log Engine to compute the stats.
logReader := cArgs.EvalCtx.LogEngine().NewReader(storage.StandardDurability)
defer logReader.Close()

// Compute the stats delta that were to occur should the log entries be
// purged. We do this as a side effect of seeing a new TruncatedState,
// downstream of Raft.
Expand All @@ -128,7 +124,7 @@ func TruncateLog(
// are not tracked in the raft log delta. The delta will be adjusted below
// raft.
// We can pass zero as nowNanos because we're only interested in SysBytes.
ms, err := storage.ComputeStats(ctx, logReader, start, end, 0 /* nowNanos */)
ms, err := storage.ComputeStats(ctx, logEngineReader, start, end, 0 /* nowNanos */)
if err != nil {
return result.Result{}, errors.Wrap(err, "while computing stats of Raft log freed by truncation")
}
Expand Down
35 changes: 19 additions & 16 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -60,11 +61,15 @@ type EvalContext interface {
GetNodeLocality() roachpb.Locality

IsFirstRange() bool
GetCompactedIndex() kvpb.RaftIndex
GetTerm(index kvpb.RaftIndex) (kvpb.RaftTerm, error)
GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex
// LogEngine returns the engine that stores the raft log.
LogEngine() storage.Engine
PrepareLogEngineTruncation(
firstIndexToKeep kvpb.RaftIndex,
) (
compactedIndex kvpb.RaftIndex,
term kvpb.RaftTerm,
logReader storage.Reader,
err error,
)

Desc() *roachpb.RangeDescriptor
ContainsKey(key roachpb.Key) bool
Expand Down Expand Up @@ -228,13 +233,6 @@ func (m *mockEvalCtxImpl) GetConcurrencyManager() concurrency.Manager {
}
}

func (m *mockEvalCtxImpl) LogEngine() storage.Engine {
if m.MockEvalCtx.LogEngine != nil {
return m.MockEvalCtx.LogEngine
}
panic("LogEngine not configured")
}

func (m *mockEvalCtxImpl) NodeID() roachpb.NodeID {
return m.MockEvalCtx.NodeID
}
Expand All @@ -250,12 +248,17 @@ func (m *mockEvalCtxImpl) GetRangeID() roachpb.RangeID {
func (m *mockEvalCtxImpl) IsFirstRange() bool {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) GetCompactedIndex() kvpb.RaftIndex {
return m.CompactedIndex
}
func (m *mockEvalCtxImpl) GetTerm(kvpb.RaftIndex) (kvpb.RaftTerm, error) {
return m.Term, nil
func (m *mockEvalCtxImpl) PrepareLogEngineTruncation(
kvpb.RaftIndex,
) (kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error) {
if m.MockEvalCtx.LogEngine != nil {

return m.CompactedIndex, m.Term, m.MockEvalCtx.LogEngine.NewReader(
storage.StandardDurability), raft.ErrCompacted
}
panic("LogEngine not configured")
}

func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex {
panic("unimplemented")
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,11 +1409,6 @@ func (r *Replica) StoreID() roachpb.StoreID {
return r.store.StoreID()
}

// LogEngine returns the log engine.
func (r *Replica) LogEngine() storage.Engine {
return r.store.LogEngine()
}

// EvalKnobs returns the EvalContext's Knobs.
func (r *Replica) EvalKnobs() kvserverbase.BatchEvalTestingKnobs {
return r.store.cfg.TestingKnobs.EvalKnobs
Expand Down
17 changes: 4 additions & 13 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,15 @@ func (rec *SpanSetReplicaEvalContext) GetNodeLocality() roachpb.Locality {
return rec.i.GetNodeLocality()
}

// GetCompactedIndex returns the compacted index of the raft log.
func (rec *SpanSetReplicaEvalContext) GetCompactedIndex() kvpb.RaftIndex {
return rec.i.GetCompactedIndex()
}

// GetTerm returns the term for the given index in the Raft log.
func (rec *SpanSetReplicaEvalContext) GetTerm(i kvpb.RaftIndex) (kvpb.RaftTerm, error) {
return rec.i.GetTerm(i)
}

// GetLeaseAppliedIndex returns the lease index of the last applied command.
func (rec *SpanSetReplicaEvalContext) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex {
return rec.i.GetLeaseAppliedIndex()
}

// LogEngine returns the log engine.
func (rec *SpanSetReplicaEvalContext) LogEngine() storage.Engine {
return rec.i.LogEngine()
func (rec *SpanSetReplicaEvalContext) PrepareLogEngineTruncation(
firstIndexToKeep kvpb.RaftIndex,
) (kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error) {
return rec.i.PrepareLogEngineTruncation(firstIndexToKeep)
}

// IsFirstRange returns true iff the replica belongs to the first range.
Expand Down
33 changes: 33 additions & 0 deletions pkg/kv/kvserver/replica_raftlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
)
Expand Down Expand Up @@ -289,6 +290,38 @@ func (r *Replica) GetCompactedIndex() kvpb.RaftIndex {
return r.raftCompactedIndexRLocked()
}

// PrepareLogEngineTruncation prepares for truncating the Raft log up to
// firstIndexToKeep by returning the following fields atomically:
// - compactedIndex: The current compacted index of the Raft log.
// - term: The term of the last entry to truncate.
// - logReader: A reader over the log engine for computing truncation stats.
// - noOp: True if the log is already truncated.
// - error: An error if we couldn't get the term.
//
// The caller must close the returned reader when non-nil.
func (r *Replica) PrepareLogEngineTruncation(
firstIndexToKeep kvpb.RaftIndex,
) (compactedIndex kvpb.RaftIndex, term kvpb.RaftTerm, logReader storage.Reader, err error) {
r.mu.RLock()
defer r.mu.RUnlock()

// Check if the truncation is a no-op.
compactedIndex = r.raftCompactedIndexRLocked()
firstIndex := compactedIndex + 1
if firstIndex >= firstIndexToKeep {
// The log truncation is a no-op.
return compactedIndex, term, logReader, raft.ErrCompacted
}

term, err = r.raftTermShMuLocked(firstIndexToKeep - 1)
if err != nil {
return compactedIndex, term, logReader, err
}

logReader = r.store.LogEngine().NewReader(storage.StandardDurability)
return compactedIndex, term, logReader, err
}

// LogSnapshot returns an immutable point-in-time snapshot of the log storage.
//
// Requires that r.raftMu is held for writing, and r.mu for reading. In
Expand Down
Loading