Skip to content

Commit e484d80

Browse files
craig[bot]pav-kv
andcommitted
Merge #156482
156482: kvserver: annotate engines in splitPreApply r=arulajmani a=pav-kv This PR annotates all storage operations with raft/state engine types, down the stack from `splitPreApply`. Epic: CRDB-55220 Co-authored-by: Pavel Kalinnikov <[email protected]>
2 parents 53ccb25 + d84f7f1 commit e484d80

File tree

5 files changed

+32
-25
lines changed

5 files changed

+32
-25
lines changed

pkg/kv/kvserver/kvstorage/destroy.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -195,16 +195,12 @@ func SubsumeReplica(
195195
}, destroyReplicaImpl(ctx, reader, writer, info, MergedTombstoneReplicaID)
196196
}
197197

198-
// RemoveStaleRHSFromSplit removes all data for the RHS replica of a split. This
199-
// is used in a situation when the RHS replica is already known to have been
200-
// removed from our store, so any pending writes that were supposed to
198+
// RemoveStaleRHSFromSplit removes all replicated data for the RHS replica of a
199+
// split. This is used in a situation when the RHS replica is already known to
200+
// have been removed from our store, so any pending writes that were supposed to
201201
// initialize the RHS replica should be dropped from the write batch.
202202
func RemoveStaleRHSFromSplit(
203-
ctx context.Context,
204-
reader storage.Reader,
205-
writer storage.Writer,
206-
rangeID roachpb.RangeID,
207-
keys roachpb.RSpan,
203+
ctx context.Context, stateRW State, rangeID roachpb.RangeID, keys roachpb.RSpan,
208204
) error {
209205
for _, span := range rditer.Select(rangeID, rditer.SelectOpts{
210206
// Since the RHS replica is uninitalized, we know there isn't anything in
@@ -219,7 +215,7 @@ func RemoveStaleRHSFromSplit(
219215
UnreplicatedByRangeID: false,
220216
}) {
221217
if err := storage.ClearRangeWithHeuristic(
222-
ctx, reader, writer, span.Key, span.EndKey, ClearRangeThresholdPointKeys(),
218+
ctx, stateRW.RO, stateRW.WO, span.Key, span.EndKey, ClearRangeThresholdPointKeys(),
223219
); err != nil {
224220
return err
225221
}

pkg/kv/kvserver/kvstorage/storage.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,17 @@ type ReadWriter struct {
5555

5656
// TODOState interprets the provided storage accessor as the State engine.
5757
//
58-
// TODO(pav-kv): remove when all callers have clarified their access patterns.
58+
// TODO(pav-kv): remove when all callers have clarified their access patterns,
59+
// and switched to WrapState() or other explicitly defined engine types.
5960
func TODOState(rw storage.ReadWriter) State {
6061
return State{RO: rw, WO: rw}
6162
}
6263

64+
// WrapState interprets the provided storage accessor as the State engine.
65+
func WrapState(rw StateRW) State {
66+
return State{RO: rw, WO: rw}
67+
}
68+
6369
// TODORaft interprets the provided storage accessor as the Raft engine.
6470
//
6571
// TODO(pav-kv): remove when all callers have clarified their access patterns.

pkg/kv/kvserver/replica_app_batch.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,10 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly(
321321
// NB: another reason why we shouldn't write HardState at evaluation time is
322322
// that it belongs to the log engine, whereas the evaluated batch must
323323
// contain only state machine updates.
324-
splitPreApply(ctx, b.r, b.batch, res.Split.SplitTrigger, cmd.Cmd.ClosedTimestamp)
324+
splitPreApply(
325+
ctx, b.r, kvstorage.StateRW(b.batch), kvstorage.TODORaft(b.batch),
326+
res.Split.SplitTrigger, cmd.Cmd.ClosedTimestamp,
327+
)
325328

326329
// The rangefeed processor will no longer be provided logical ops for
327330
// its entire range, so it needs to be shut down and all registrations

pkg/kv/kvserver/store_split.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
1414
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
1515
"github.com/cockroachdb/cockroach/pkg/roachpb"
16-
"github.com/cockroachdb/cockroach/pkg/storage"
1716
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
1817
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1918
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -29,7 +28,8 @@ import (
2928
func splitPreApply(
3029
ctx context.Context,
3130
r *Replica,
32-
readWriter storage.ReadWriter,
31+
stateRW kvstorage.StateRW,
32+
raftRW kvstorage.Raft,
3333
split roachpb.SplitTrigger,
3434
initClosedTS *hlc.Timestamp,
3535
) {
@@ -79,13 +79,17 @@ func splitPreApply(
7979
// on this Store, it doesn't have a RaftTruncatedState (which only initialized
8080
// replicas can have), so this deletion will not conflict with or corrupt it.
8181
//
82+
// NB: the key is cleared in stateRW rather than raftRW, deliberately. It
83+
// lives in the raft engine, but here we want to clear it from the state
84+
// engine batch, so that it doesn't make it to the state engine.
85+
//
8286
// TODO(#152847): remove this workaround when there are no historical
8387
// proposals with RaftTruncatedState, e.g. after a below-raft migration.
84-
if ts, err := rsl.LoadRaftTruncatedState(ctx, readWriter); err != nil {
88+
if ts, err := rsl.LoadRaftTruncatedState(ctx, stateRW); err != nil {
8589
log.KvExec.Fatalf(ctx, "cannot load RaftTruncatedState: %v", err)
8690
} else if ts == (kvserverpb.RaftTruncatedState{}) {
8791
// Common case. Do nothing.
88-
} else if err := rsl.ClearRaftTruncatedState(readWriter); err != nil {
92+
} else if err := rsl.ClearRaftTruncatedState(stateRW); err != nil {
8993
log.KvExec.Fatalf(ctx, "cannot clear RaftTruncatedState: %v", err)
9094
}
9195

@@ -122,7 +126,7 @@ func splitPreApply(
122126
}
123127
}
124128
if err := kvstorage.RemoveStaleRHSFromSplit(
125-
ctx, readWriter, readWriter, split.RightDesc.RangeID, split.RightDesc.RSpan(),
129+
ctx, kvstorage.WrapState(stateRW), split.RightDesc.RangeID, split.RightDesc.RSpan(),
126130
); err != nil {
127131
log.KvExec.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err)
128132
}
@@ -138,10 +142,10 @@ func splitPreApply(
138142
//
139143
// [*] Note that uninitialized replicas may cast votes, and if they have, we
140144
// can't load the default Term and Vote values.
141-
if err := rsl.SynthesizeRaftState(ctx, readWriter, kvstorage.TODORaft(readWriter)); err != nil {
145+
if err := rsl.SynthesizeRaftState(ctx, stateRW, raftRW); err != nil {
142146
log.KvExec.Fatalf(ctx, "%v", err)
143147
}
144-
if err := rsl.SetRaftTruncatedState(ctx, readWriter, &kvserverpb.RaftTruncatedState{
148+
if err := rsl.SetRaftTruncatedState(ctx, raftRW.WO, &kvserverpb.RaftTruncatedState{
145149
Index: kvstorage.RaftInitialLogIndex,
146150
Term: kvstorage.RaftInitialLogTerm,
147151
}); err != nil {
@@ -159,7 +163,7 @@ func splitPreApply(
159163
initClosedTS = &hlc.Timestamp{}
160164
}
161165
initClosedTS.Forward(r.GetCurrentClosedTimestamp(ctx))
162-
if err := rsl.SetClosedTimestamp(ctx, readWriter, *initClosedTS); err != nil {
166+
if err := rsl.SetClosedTimestamp(ctx, stateRW, *initClosedTS); err != nil {
163167
log.KvExec.Fatalf(ctx, "%s", err)
164168
}
165169
}

pkg/kv/kvserver/store_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4043,12 +4043,10 @@ func TestSplitPreApplyInitializesTruncatedState(t *testing.T) {
40434043
}, &rightDesc.InternalReplicas[0])
40444044
require.NoError(t, err)
40454045

4046-
split := roachpb.SplitTrigger{
4047-
LeftDesc: leftDesc,
4048-
RightDesc: rightDesc,
4049-
}
4050-
4051-
splitPreApply(ctx, lhsRepl, batch, split, nil)
4046+
splitPreApply(
4047+
ctx, lhsRepl, kvstorage.StateRW(batch), kvstorage.TODORaft(batch),
4048+
roachpb.SplitTrigger{LeftDesc: leftDesc, RightDesc: rightDesc}, nil,
4049+
)
40524050

40534051
// Verify that the RHS truncated state is initialized as expected.
40544052
rsl := kvstorage.MakeStateLoader(rightDesc.RangeID)

0 commit comments

Comments
 (0)