Skip to content

Commit a8f7f91

Browse files
committed
batchevel: PrepareLogEngineTruncation
1 parent f37e9c2 commit a8f7f91

File tree

6 files changed

+78
-68
lines changed

6 files changed

+78
-68
lines changed

pkg/kv/kvserver/batcheval/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ go_library(
7878
"//pkg/kv/kvserver/spanset",
7979
"//pkg/kv/kvserver/txnwait",
8080
"//pkg/kv/kvserver/uncertainty",
81+
"//pkg/raft",
8182
"//pkg/roachpb",
8283
"//pkg/settings",
8384
"//pkg/settings/cluster",

pkg/kv/kvserver/batcheval/cmd_truncate_log.go

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
1616
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset"
1717
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
18+
"github.com/cockroachdb/cockroach/pkg/raft"
1819
"github.com/cockroachdb/cockroach/pkg/roachpb"
1920
"github.com/cockroachdb/cockroach/pkg/storage"
2021
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -66,21 +67,25 @@ func TruncateLog(
6667
// existing entry's term?
6768
// TODO(pav-kv): some day, make args.Index an inclusive compaction index, and
6869
// eliminate the remaining +-1 arithmetics.
69-
firstIndex := cArgs.EvalCtx.GetCompactedIndex() + 1
70-
if firstIndex >= args.Index {
71-
if log.V(3) {
72-
log.KvExec.Infof(ctx, "attempting to truncate previously truncated raft log. FirstIndex:%d, TruncateFrom:%d",
73-
firstIndex, args.Index)
74-
}
75-
return result.Result{}, nil
76-
}
7770

78-
// args.Index is the first index to keep.
79-
term, err := cArgs.EvalCtx.GetTerm(args.Index - 1)
71+
// Use the log engine to compute stats for the raft log.
72+
// TODO(#136358): After we precisely maintain the Raft Log size, we could stop
73+
// needing the Log Engine to compute the stats.
74+
compactedIndex, term, logEngineReader, err := cArgs.EvalCtx.PrepareLogEngineTruncation(args.Index)
8075
if err != nil {
81-
return result.Result{}, errors.Wrap(err, "getting term")
76+
if errors.Is(err, raft.ErrCompacted) {
77+
// The log has already been truncated past this point - this is a no-op.
78+
if log.V(3) {
79+
log.KvExec.Infof(ctx, "attempting to truncate previously truncated raft log. FirstIndex:%d, TruncateFrom:%d",
80+
compactedIndex+1, args.Index)
81+
}
82+
return result.Result{}, nil
83+
}
84+
return result.Result{}, errors.Wrap(err, "preparing log truncation")
8285
}
86+
defer logEngineReader.Close()
8387

88+
firstIndex := compactedIndex + 1
8489
// Compute the number of bytes freed by this truncation. Note that using
8590
// firstIndex only make sense for the leaseholder as we base this off its
8691
// own first index (other replicas may have other first indexes). In
@@ -92,34 +97,12 @@ func TruncateLog(
9297
// flight TruncateLogRequests, and using the firstIndex will result in
9398
// duplicate accounting. The ExpectedFirstIndex, populated for clusters at
9499
// LooselyCoupledRaftLogTruncation, allows us to avoid this problem.
95-
//
96-
// We have an additional source of error not mitigated by
97-
// ExpectedFirstIndex. There is nothing synchronizing firstIndex with the
98-
// state visible in readWriter. The former uses the in-memory state or
99-
// fetches directly from the Engine. The latter uses Engine state from some
100-
// point in time which can fall anywhere in the time interval starting from
101-
// when the readWriter was created up to where we create an MVCCIterator
102-
// below.
103-
// TODO(sumeer): we can eliminate this error as part of addressing
104-
// https://github.com/cockroachdb/cockroach/issues/55461 and
105-
// https://github.com/cockroachdb/cockroach/issues/70974 that discuss taking
106-
// a consistent snapshot of some Replica state and the engine.
107100
if args.ExpectedFirstIndex > firstIndex {
108101
firstIndex = args.ExpectedFirstIndex
109102
}
110103
start := keys.RaftLogKey(rangeID, firstIndex)
111104
end := keys.RaftLogKey(rangeID, args.Index)
112105

113-
// TODO(pav-kv): GetCompactedIndex, GetTerm, and NewReader calls can disagree
114-
// on the state of the log since we don't hold any Replica locks here. Move
115-
// the computation inside Replica where locking can be controlled precisely.
116-
//
117-
// Use the log engine to compute stats for the raft log.
118-
// TODO(#136358): After we precisely maintain the Raft Log size, we could stop
119-
// needing the Log Engine to compute the stats.
120-
logReader := cArgs.EvalCtx.LogEngine().NewReader(storage.StandardDurability)
121-
defer logReader.Close()
122-
123106
// Compute the stats delta that were to occur should the log entries be
124107
// purged. We do this as a side effect of seeing a new TruncatedState,
125108
// downstream of Raft.
@@ -128,7 +111,7 @@ func TruncateLog(
128111
// are not tracked in the raft log delta. The delta will be adjusted below
129112
// raft.
130113
// We can pass zero as nowNanos because we're only interested in SysBytes.
131-
ms, err := storage.ComputeStats(ctx, logReader, start, end, 0 /* nowNanos */)
114+
ms, err := storage.ComputeStats(ctx, logEngineReader, start, end, 0 /* nowNanos */)
132115
if err != nil {
133116
return result.Result{}, errors.Wrap(err, "while computing stats of Raft log freed by truncation")
134117
}

pkg/kv/kvserver/batcheval/eval_context.go

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,10 @@ type EvalContext interface {
6060
GetNodeLocality() roachpb.Locality
6161

6262
IsFirstRange() bool
63-
GetCompactedIndex() kvpb.RaftIndex
64-
GetTerm(index kvpb.RaftIndex) (kvpb.RaftTerm, error)
6563
GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex
66-
// LogEngine returns the engine that stores the raft log.
67-
LogEngine() storage.Engine
64+
PrepareLogEngineTruncation(firstIndexToKeep kvpb.RaftIndex) (
65+
kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error,
66+
)
6867

6968
Desc() *roachpb.RangeDescriptor
7069
ContainsKey(key roachpb.Key) bool
@@ -228,13 +227,6 @@ func (m *mockEvalCtxImpl) GetConcurrencyManager() concurrency.Manager {
228227
}
229228
}
230229

231-
func (m *mockEvalCtxImpl) LogEngine() storage.Engine {
232-
if m.MockEvalCtx.LogEngine != nil {
233-
return m.MockEvalCtx.LogEngine
234-
}
235-
panic("LogEngine not configured")
236-
}
237-
238230
func (m *mockEvalCtxImpl) NodeID() roachpb.NodeID {
239231
return m.MockEvalCtx.NodeID
240232
}
@@ -250,12 +242,16 @@ func (m *mockEvalCtxImpl) GetRangeID() roachpb.RangeID {
250242
func (m *mockEvalCtxImpl) IsFirstRange() bool {
251243
panic("unimplemented")
252244
}
253-
func (m *mockEvalCtxImpl) GetCompactedIndex() kvpb.RaftIndex {
254-
return m.CompactedIndex
255-
}
256-
func (m *mockEvalCtxImpl) GetTerm(kvpb.RaftIndex) (kvpb.RaftTerm, error) {
257-
return m.Term, nil
245+
func (m *mockEvalCtxImpl) PrepareLogEngineTruncation(
246+
kvpb.RaftIndex,
247+
) (kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error) {
248+
if m.MockEvalCtx.LogEngine != nil {
249+
return m.CompactedIndex, m.Term, m.MockEvalCtx.LogEngine.NewReader(
250+
storage.StandardDurability), nil
251+
}
252+
panic("LogEngine not configured")
258253
}
254+
259255
func (m *mockEvalCtxImpl) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex {
260256
panic("unimplemented")
261257
}

pkg/kv/kvserver/replica.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,11 +1409,6 @@ func (r *Replica) StoreID() roachpb.StoreID {
14091409
return r.store.StoreID()
14101410
}
14111411

1412-
// LogEngine returns the log engine.
1413-
func (r *Replica) LogEngine() storage.Engine {
1414-
return r.store.LogEngine()
1415-
}
1416-
14171412
// EvalKnobs returns the EvalContext's Knobs.
14181413
func (r *Replica) EvalKnobs() kvserverbase.BatchEvalTestingKnobs {
14191414
return r.store.cfg.TestingKnobs.EvalKnobs

pkg/kv/kvserver/replica_eval_context_span.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,24 +81,15 @@ func (rec *SpanSetReplicaEvalContext) GetNodeLocality() roachpb.Locality {
8181
return rec.i.GetNodeLocality()
8282
}
8383

84-
// GetCompactedIndex returns the compacted index of the raft log.
85-
func (rec *SpanSetReplicaEvalContext) GetCompactedIndex() kvpb.RaftIndex {
86-
return rec.i.GetCompactedIndex()
87-
}
88-
89-
// GetTerm returns the term for the given index in the Raft log.
90-
func (rec *SpanSetReplicaEvalContext) GetTerm(i kvpb.RaftIndex) (kvpb.RaftTerm, error) {
91-
return rec.i.GetTerm(i)
92-
}
93-
9484
// GetLeaseAppliedIndex returns the lease index of the last applied command.
9585
func (rec *SpanSetReplicaEvalContext) GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex {
9686
return rec.i.GetLeaseAppliedIndex()
9787
}
9888

99-
// LogEngine returns the log engine.
100-
func (rec *SpanSetReplicaEvalContext) LogEngine() storage.Engine {
101-
return rec.i.LogEngine()
89+
func (rec *SpanSetReplicaEvalContext) PrepareLogEngineTruncation(
90+
firstIndexToKeep kvpb.RaftIndex,
91+
) (kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error) {
92+
return rec.i.PrepareLogEngineTruncation(firstIndexToKeep)
10293
}
10394

10495
// IsFirstRange returns true iff the replica belongs to the first range.

pkg/kv/kvserver/replica_raftlog.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftlog"
1616
"github.com/cockroachdb/cockroach/pkg/raft"
1717
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
18+
"github.com/cockroachdb/cockroach/pkg/storage"
19+
"github.com/cockroachdb/cockroach/pkg/storage/fs"
1820
"github.com/cockroachdb/cockroach/pkg/util/log"
1921
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
2022
)
@@ -289,6 +291,48 @@ func (r *Replica) GetCompactedIndex() kvpb.RaftIndex {
289291
return r.raftCompactedIndexRLocked()
290292
}
291293

294+
// PrepareLogEngineTruncation prepares for truncating the Raft log up to
295+
// firstIndexToKeep by returning the following fields atomically:
296+
// - compactedIndex: The current compacted index of the Raft log.
297+
// - term: The term of the last entry to truncate.
298+
// - logReader: A reader over the log engine with a snapshot consistent with
299+
// compactedIndex and term.
300+
// - error: If the operation is a no-op (log is already compacted), it returns
301+
// `ErrCompacted`. Otherwise, it returns an error if grabbing the
302+
// term failed.
303+
//
304+
// The caller must close the returned reader error is nil.
305+
func (r *Replica) PrepareLogEngineTruncation(
306+
firstIndexToKeep kvpb.RaftIndex,
307+
) (kvpb.RaftIndex, kvpb.RaftTerm, storage.Reader, error) {
308+
r.mu.RLock()
309+
defer r.mu.RUnlock()
310+
311+
// Check if the truncation is a no-op.
312+
compactedIndex := r.raftCompactedIndexRLocked()
313+
firstIndex := compactedIndex + 1
314+
if firstIndex >= firstIndexToKeep {
315+
// The log truncation is a no-op, return early.
316+
return compactedIndex, 0, nil, raft.ErrCompacted
317+
}
318+
319+
// Grab the raft term of the last index to truncate.
320+
term, err := r.raftTermShMuLocked(firstIndexToKeep - 1)
321+
if err != nil {
322+
return compactedIndex, 0, nil, err
323+
}
324+
325+
// Before releasing the read mutex, grab a consistent snapshot of the log
326+
// engine reader. This ensures that the compactedIndex, and term are
327+
// consistent with the log engine reader's snapshot.
328+
logReader := r.store.LogEngine().NewReader(storage.StandardDurability)
329+
if err := logReader.PinEngineStateForIterators(fs.BatchEvalReadCategory); err != nil {
330+
logReader.Close()
331+
return compactedIndex, term, logReader, err
332+
}
333+
return compactedIndex, term, logReader, nil
334+
}
335+
292336
// LogSnapshot returns an immutable point-in-time snapshot of the log storage.
293337
//
294338
// Requires that r.raftMu is held for writing, and r.mu for reading. In

0 commit comments

Comments
 (0)