Skip to content

Commit d294253

Browse files
committed
batchevel: PrepareLogEngineTruncation
1 parent 9260e27 commit d294253

File tree

6 files changed

+75
-56
lines changed

6 files changed

+75
-56
lines changed

pkg/kv/kvserver/batcheval/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ go_library(
7878
"//pkg/kv/kvserver/spanset",
7979
"//pkg/kv/kvserver/txnwait",
8080
"//pkg/kv/kvserver/uncertainty",
81+
"//pkg/raft",
8182
"//pkg/roachpb",
8283
"//pkg/settings",
8384
"//pkg/settings/cluster",

pkg/kv/kvserver/batcheval/cmd_truncate_log.go

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
1717
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
18+
"github.com/cockroachdb/cockroach/pkg/raft"
1819
"github.com/cockroachdb/cockroach/pkg/roachpb"
1920
"github.com/cockroachdb/cockroach/pkg/storage"
2021
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -66,20 +67,25 @@ func TruncateLog(
6667
// existing entry's term?
6768
// TODO(pav-kv): some day, make args.Index an inclusive compaction index, and
6869
// eliminate the remaining +-1 arithmetics.
69-
firstIndex := cArgs.EvalCtx.GetCompactedIndex() + 1
70-
if firstIndex >= args.Index {
71-
if log.V(3) {
72-
log.KvExec.Infof(ctx, "attempting to truncate previously truncated raft log. FirstIndex:%d, TruncateFrom:%d",
73-
firstIndex, args.Index)
74-
}
75-
return result.Result{}, nil
76-
}
7770

78-
// args.Index is the first index to keep.
79-
term, err := cArgs.EvalCtx.GetTerm(args.Index - 1)
71+
// Use the log engine to compute stats for the raft log.
72+
// TODO(#136358): After we precisely maintain the Raft Log size, we could stop
73+
// needing the Log Engine to compute the stats.
74+
compactedIndex, term, logEngineReader, err := cArgs.EvalCtx.PrepareLogEngineTruncation(args.Index)
8075
if err != nil {
81-
return result.Result{}, errors.Wrap(err, "getting term")
76+
if errors.Is(err, raft.ErrCompacted) {
77+
// The log has already been truncated past this point - this is a no-op.
78+
if log.V(3) {
79+
log.KvExec.Infof(ctx, "attempting to truncate previously truncated raft log. FirstIndex:%d, TruncateFrom:%d",
80+
compactedIndex+1, args.Index)
81+
}
82+
return result.Result{}, nil
83+
}
84+
return result.Result{}, errors.Wrap(err, "preparing log truncation")
8285
}
86+
defer logEngineReader.Close()
87+
88+
firstIndex := compactedIndex + 1
8389

8490
// Compute the number of bytes freed by this truncation. Note that using
8591
// firstIndex only make sense for the leaseholder as we base this off its
@@ -110,16 +116,6 @@ func TruncateLog(
110116
start := keys.RaftLogKey(rangeID, firstIndex)
111117
end := keys.RaftLogKey(rangeID, args.Index)
112118

113-
// TODO(pav-kv): GetCompactedIndex, GetTerm, and NewReader calls can disagree
114-
// on the state of the log since we don't hold any Replica locks here. Move
115-
// the computation inside Replica where locking can be controlled precisely.
116-
//
117-
// Use the log engine to compute stats for the raft log.
118-
// TODO(#136358): After we precisely maintain the Raft Log size, we could stop
119-
// needing the Log Engine to compute the stats.
120-
logReader := cArgs.EvalCtx.LogEngine().NewReader(storage.StandardDurability)
121-
defer logReader.Close()
122-
123119
// Compute the stats delta that were to occur should the log entries be
124120
// purged. We do this as a side effect of seeing a new TruncatedState,
125121
// downstream of Raft.
@@ -128,7 +124,7 @@ func TruncateLog(
128124
// are not tracked in the raft log delta. The delta will be adjusted below
129125
// raft.
130126
// We can pass zero as nowNanos because we're only interested in SysBytes.
131-
ms, err := storage.ComputeStats(ctx, logReader, start, end, 0 /* nowNanos */)
127+
ms, err := storage.ComputeStats(ctx, logEngineReader, start, end, 0 /* nowNanos */)
132128
if err != nil {
133129
return result.Result{}, errors.Wrap(err, "while computing stats of Raft log freed by truncation")
134130
}

pkg/kv/kvserver/batcheval/eval_context.go

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
1818
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1919
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
20+
"github.com/cockroachdb/cockroach/pkg/raft"
2021
"github.com/cockroachdb/cockroach/pkg/roachpb"
2122
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2223
"github.com/cockroachdb/cockroach/pkg/storage"
@@ -60,11 +61,15 @@ type EvalContext interface {
6061
GetNodeLocality() roachpb.Locality
6162

6263
IsFirstRange() bool
63-
GetCompactedIndex() kvpb.RaftIndex
64-
GetTerm(index kvpb.RaftIndex) (kvpb.RaftTerm, error)
6564
GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex
66-
// LogEngine returns the engine that stores the raft log.
67-
LogEngine() storage.Engine
65+
PrepareLogEngineTruncation(
66+
firstIndexToKeep kvpb.RaftIndex,
67+
) (
68+
compactedIndex kvpb.RaftIndex,
69+
term kvpb.RaftTerm,
70+
logReader storage.Reader,
71+
err error,
72+
)
6873

6974
Desc() *roachpb.RangeDescriptor
7075
ContainsKey(key roachpb.Key) bool
@@ -228,13 +233,6 @@ func (m *mockEvalCtxImpl) GetConcurrencyManager() concurrency.Manager {
228233
}
229234
}
230235

231-
func (m *mockEvalCtxImpl) LogEngine() storage.Engine {
232-
if m.MockEvalCtx.LogEngine != nil {
233-
return m.MockEvalCtx.LogEngine
234-
}
235-
panic("LogEngine not configured")
236-
}
237-
238236
func (m *mockEvalCtxImpl) NodeID() roachpb.NodeID {
239237
return m.MockEvalCtx.NodeID
240238
}
@@ -250,12 +248,17 @@ func (m *mockEvalCtxImpl) GetRangeID() roachpb.RangeID {
250248
func (m *mockEvalCtxImpl) IsFirstRange() bool {
251249
panic("unimplemented")
252250
}
253-
func (m *mockEvalCtxImpl) GetCompactedIndex() kvpb.RaftIndex {
254-
return m.CompactedIndex
255-
}
256-
func (m *mockEvalCtxImpl) GetTerm(kvpb.RaftIndex) (kvpb.RaftTerm, error) {
257-
return m.Term, nil
251+
func (m *mockEvalCtxImpl) PrepareLogEngineTruncation(
252+
kvpb.RaftIndex,
253+
) (kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error) {
254+
if m.MockEvalCtx.LogEngine != nil {
255+
256+
return m.CompactedIndex, m.Term, m.MockEvalCtx.LogEngine.NewReader(
257+
storage.StandardDurability), raft.ErrCompacted
258+
}
259+
panic("LogEngine not configured")
258260
}
261+
259262
func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex {
260263
panic("unimplemented")
261264
}

pkg/kv/kvserver/replica.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,11 +1409,6 @@ func (r *Replica) StoreID() roachpb.StoreID {
14091409
return r.store.StoreID()
14101410
}
14111411

1412-
// LogEngine returns the log engine.
1413-
func (r *Replica) LogEngine() storage.Engine {
1414-
return r.store.LogEngine()
1415-
}
1416-
14171412
// EvalKnobs returns the EvalContext's Knobs.
14181413
func (r *Replica) EvalKnobs() kvserverbase.BatchEvalTestingKnobs {
14191414
return r.store.cfg.TestingKnobs.EvalKnobs

pkg/kv/kvserver/replica_eval_context_span.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,24 +81,15 @@ func (rec *SpanSetReplicaEvalContext) GetNodeLocality() roachpb.Locality {
8181
return rec.i.GetNodeLocality()
8282
}
8383

84-
// GetCompactedIndex returns the compacted index of the raft log.
85-
func (rec *SpanSetReplicaEvalContext) GetCompactedIndex() kvpb.RaftIndex {
86-
return rec.i.GetCompactedIndex()
87-
}
88-
89-
// GetTerm returns the term for the given index in the Raft log.
90-
func (rec *SpanSetReplicaEvalContext) GetTerm(i kvpb.RaftIndex) (kvpb.RaftTerm, error) {
91-
return rec.i.GetTerm(i)
92-
}
93-
9484
// GetLeaseAppliedIndex returns the lease index of the last applied command.
9585
func (rec *SpanSetReplicaEvalContext) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex {
9686
return rec.i.GetLeaseAppliedIndex()
9787
}
9888

99-
// LogEngine returns the log engine.
100-
func (rec *SpanSetReplicaEvalContext) LogEngine() storage.Engine {
101-
return rec.i.LogEngine()
89+
func (rec *SpanSetReplicaEvalContext) PrepareLogEngineTruncation(
90+
firstIndexToKeep kvpb.RaftIndex,
91+
) (kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error) {
92+
return rec.i.PrepareLogEngineTruncation(firstIndexToKeep)
10293
}
10394

10495
// IsFirstRange returns true iff the replica belongs to the first range.

pkg/kv/kvserver/replica_raftlog.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
1616
"github.com/cockroachdb/cockroach/pkg/raft"
1717
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
18+
"github.com/cockroachdb/cockroach/pkg/storage"
1819
"github.com/cockroachdb/cockroach/pkg/util/log"
1920
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
2021
)
@@ -289,6 +290,38 @@ func (r *Replica) GetCompactedIndex() kvpb.RaftIndex {
289290
return r.raftCompactedIndexRLocked()
290291
}
291292

293+
// PrepareLogEngineTruncation prepares for truncating the Raft log up to
294+
// firstIndexToKeep by returning the following fields atomically:
295+
// - compactedIndex: The current compacted index of the Raft log.
296+
// - term: The term of the last entry to truncate.
297+
// - logReader: A reader over the log engine for computing truncation stats.
298+
// - noOp: True if the log is already truncated.
299+
// - error: An error if we couldn't get the term.
300+
//
301+
// The caller must close the returned reader when non-nil.
302+
func (r *Replica) PrepareLogEngineTruncation(
303+
firstIndexToKeep kvpb.RaftIndex,
304+
) (compactedIndex kvpb.RaftIndex, term kvpb.RaftTerm, logReader storage.Reader, err error) {
305+
r.mu.RLock()
306+
defer r.mu.RUnlock()
307+
308+
// Check if the truncation is a no-op.
309+
compactedIndex = r.raftCompactedIndexRLocked()
310+
firstIndex := compactedIndex + 1
311+
if firstIndex >= firstIndexToKeep {
312+
// The log truncation is a no-op.
313+
return compactedIndex, term, logReader, raft.ErrCompacted
314+
}
315+
316+
term, err = r.raftTermShMuLocked(firstIndexToKeep - 1)
317+
if err != nil {
318+
return compactedIndex, term, logReader, err
319+
}
320+
321+
logReader = r.store.LogEngine().NewReader(storage.StandardDurability)
322+
return compactedIndex, term, logReader, err
323+
}
324+
292325
// LogSnapshot returns an immutable point-in-time snapshot of the log storage.
293326
//
294327
// Requires that r.raftMu is held for writing, and r.mu for reading. In

0 commit comments

Comments
 (0)