Skip to content

Commit b724895

Browse files
craig[bot]iskettaneh
andcommitted
Merge #157932
157932: batchevel: make cmd_truncate_log use the log engine r=iskettaneh a=iskettaneh This is the last known place where touch the unreplicated rangeID local keyspace. This commit provides the LogEngine through EvalCtx so that cmd_truncate_log can use it to compute the stats. Fixes: #157895 Release note: None Co-authored-by: iskettaneh <[email protected]>
2 parents b2093a9 + 9260e27 commit b724895

File tree

5 files changed

+67
-10
lines changed

5 files changed

+67
-10
lines changed

pkg/kv/kvserver/batcheval/cmd_truncate_log.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ func TruncateLog(
110110
start := keys.RaftLogKey(rangeID, firstIndex)
111111
end := keys.RaftLogKey(rangeID, args.Index)
112112

113+
// TODO(pav-kv): GetCompactedIndex, GetTerm, and NewReader calls can disagree
114+
// on the state of the log since we don't hold any Replica locks here. Move
115+
// the computation inside Replica where locking can be controlled precisely.
116+
//
117+
// Use the log engine to compute stats for the raft log.
118+
// TODO(#136358): After we precisely maintain the Raft Log size, we could stop
119+
// needing the Log Engine to compute the stats.
120+
logReader := cArgs.EvalCtx.LogEngine().NewReader(storage.StandardDurability)
121+
defer logReader.Close()
122+
113123
// Compute the stats delta that were to occur should the log entries be
114124
// purged. We do this as a side effect of seeing a new TruncatedState,
115125
// downstream of Raft.
@@ -118,7 +128,7 @@ func TruncateLog(
118128
// are not tracked in the raft log delta. The delta will be adjusted below
119129
// raft.
120130
// We can pass zero as nowNanos because we're only interested in SysBytes.
121-
ms, err := storage.ComputeStats(ctx, readWriter, start, end, 0 /* nowNanos */)
131+
ms, err := storage.ComputeStats(ctx, logReader, start, end, 0 /* nowNanos */)
122132
if err != nil {
123133
return result.Result{}, errors.Wrap(err, "while computing stats of Raft log freed by truncation")
124134
}

pkg/kv/kvserver/batcheval/cmd_truncate_log_test.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,22 +64,35 @@ func TestTruncateLog(t *testing.T) {
6464
)
6565

6666
st := cluster.MakeTestingClusterSettings()
67+
68+
// The truncate command takes the stateEng as any other command. However, it
69+
// also accepts the log engine via EvalCtx for the stats computation.
70+
stateEng := storage.NewDefaultInMemForTesting()
71+
logEng := storage.NewDefaultInMemForTesting()
72+
defer stateEng.Close()
73+
defer logEng.Close()
74+
6775
evalCtx := &MockEvalCtx{
6876
ClusterSettings: st,
6977
Desc: &roachpb.RangeDescriptor{RangeID: rangeID},
7078
Term: term,
7179
CompactedIndex: compactedIndex,
80+
LogEngine: logEng,
7281
}
7382

74-
eng := storage.NewDefaultInMemForTesting()
75-
defer eng.Close()
76-
7783
truncState := kvserverpb.RaftTruncatedState{
7884
Index: compactedIndex + 2,
7985
Term: term,
8086
}
8187

82-
putTruncatedState(t, eng, rangeID, truncState)
88+
putTruncatedState(t, stateEng, rangeID, truncState)
89+
90+
// Write some raft log entries to the log engine to create non-zero stats.
91+
// These entries will be "truncated" by our request.
92+
for i := compactedIndex + 1; i < compactedIndex+8; i++ {
93+
key := keys.RaftLogKey(rangeID, kvpb.RaftIndex(i))
94+
require.NoError(t, logEng.PutUnversioned(key, []byte("some-data")))
95+
}
8396

8497
// Send a truncation request.
8598
req := kvpb.TruncateLogRequest{
@@ -91,10 +104,8 @@ func TestTruncateLog(t *testing.T) {
91104
Args: &req,
92105
}
93106
resp := &kvpb.TruncateLogResponse{}
94-
res, err := TruncateLog(ctx, eng, cArgs, resp)
95-
if err != nil {
96-
t.Fatal(err)
97-
}
107+
res, err := TruncateLog(ctx, stateEng, cArgs, resp)
108+
require.NoError(t, err)
98109

99110
expTruncState := kvserverpb.RaftTruncatedState{
100111
Index: req.Index - 1,
@@ -103,9 +114,22 @@ func TestTruncateLog(t *testing.T) {
103114

104115
// The unreplicated key that we see should be the initial truncated
105116
// state (it's only updated below Raft).
106-
gotTruncatedState := readTruncStates(t, eng, rangeID)
117+
gotTruncatedState := readTruncStates(t, stateEng, rangeID)
107118
assert.Equal(t, truncState, gotTruncatedState)
108119

109120
assert.NotNil(t, res.Replicated.RaftTruncatedState)
110121
assert.Equal(t, expTruncState, *res.Replicated.RaftTruncatedState)
122+
123+
// Verify the stats match what's actually in the log engine.
124+
start := keys.RaftLogKey(rangeID, compactedIndex+1)
125+
end := keys.RaftLogKey(rangeID, compactedIndex+8)
126+
expectedStats, err := storage.ComputeStats(ctx, logEng, start, end, 0)
127+
require.NoError(t, err)
128+
assert.Equal(t, -expectedStats.SysBytes, res.Replicated.RaftLogDelta,
129+
"RaftLogDelta should match stats computed from log engine")
130+
131+
// The state machine engine's stats should be zero.
132+
zeroStats, err := storage.ComputeStats(ctx, stateEng, start, end, 0)
133+
require.NoError(t, err)
134+
require.Zero(t, zeroStats.SysBytes)
111135
}

pkg/kv/kvserver/batcheval/eval_context.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
2020
"github.com/cockroachdb/cockroach/pkg/roachpb"
2121
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
22+
"github.com/cockroachdb/cockroach/pkg/storage"
2223
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
2324
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2425
"github.com/cockroachdb/cockroach/pkg/util/limit"
@@ -62,6 +63,8 @@ type EvalContext interface {
6263
GetCompactedIndex() kvpb.RaftIndex
6364
GetTerm(index kvpb.RaftIndex) (kvpb.RaftTerm, error)
6465
GetLeaseAppliedIndex() kvpb.LeaseAppliedIndex
66+
// LogEngine returns the engine that stores the raft log.
67+
LogEngine() storage.Engine
6568

6669
Desc() *roachpb.RangeDescriptor
6770
ContainsKey(key roachpb.Key) bool
@@ -175,6 +178,7 @@ type MockEvalCtx struct {
175178
GCThreshold hlc.Timestamp
176179
Term kvpb.RaftTerm
177180
CompactedIndex kvpb.RaftIndex
181+
LogEngine storage.Engine
178182
CanCreateTxnRecordFn func() (bool, kvpb.TransactionAbortedReason)
179183
MinTxnCommitTSFn func() hlc.Timestamp
180184
LastReplicaGCTimestamp hlc.Timestamp
@@ -223,6 +227,14 @@ func (m *mockEvalCtxImpl) GetConcurrencyManager() concurrency.Manager {
223227
panic("ConcurrencyManager not configured")
224228
}
225229
}
230+
231+
func (m *mockEvalCtxImpl) LogEngine() storage.Engine {
232+
if m.MockEvalCtx.LogEngine != nil {
233+
return m.MockEvalCtx.LogEngine
234+
}
235+
panic("LogEngine not configured")
236+
}
237+
226238
func (m *mockEvalCtxImpl) NodeID() roachpb.NodeID {
227239
return m.MockEvalCtx.NodeID
228240
}

pkg/kv/kvserver/replica.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1409,6 +1409,11 @@ func (r *Replica) StoreID() roachpb.StoreID {
14091409
return r.store.StoreID()
14101410
}
14111411

1412+
// LogEngine returns the log engine.
1413+
func (r *Replica) LogEngine() storage.Engine {
1414+
return r.store.LogEngine()
1415+
}
1416+
14121417
// EvalKnobs returns the EvalContext's Knobs.
14131418
func (r *Replica) EvalKnobs() kvserverbase.BatchEvalTestingKnobs {
14141419
return r.store.cfg.TestingKnobs.EvalKnobs

pkg/kv/kvserver/replica_eval_context_span.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
2020
"github.com/cockroachdb/cockroach/pkg/roachpb"
2121
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
22+
"github.com/cockroachdb/cockroach/pkg/storage"
2223
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
2324
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2425
"github.com/cockroachdb/cockroach/pkg/util/mon"
@@ -95,6 +96,11 @@ func (rec *SpanSetReplicaEvalContext) GetLeaseAppliedIndex() kvpb.LeaseAppliedIn
9596
return rec.i.GetLeaseAppliedIndex()
9697
}
9798

99+
// LogEngine returns the log engine.
100+
func (rec *SpanSetReplicaEvalContext) LogEngine() storage.Engine {
101+
return rec.i.LogEngine()
102+
}
103+
98104
// IsFirstRange returns true iff the replica belongs to the first range.
99105
func (rec *SpanSetReplicaEvalContext) IsFirstRange() bool {
100106
return rec.i.IsFirstRange()

0 commit comments

Comments
 (0)