Skip to content

Commit 72d87ce

Browse files
committed
kvstorage: annotate engines in replica destruction
Epic: none Release note: none
1 parent 57e1480 commit 72d87ce

File tree

5 files changed

+27
-27
lines changed

5 files changed

+27
-27
lines changed

pkg/kv/kvserver/kvstorage/destroy.go

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -82,35 +82,27 @@ type DestroyReplicaInfo struct {
8282
// This call issues all writes ordered by key. This is to support a large
8383
// variety of uses, including SSTable generation for snapshot application.
8484
func DestroyReplica(
85-
ctx context.Context,
86-
reader storage.Reader,
87-
writer storage.Writer,
88-
info DestroyReplicaInfo,
89-
next roachpb.ReplicaID,
85+
ctx context.Context, rw ReadWriter, info DestroyReplicaInfo, next roachpb.ReplicaID,
9086
) error {
91-
return destroyReplicaImpl(ctx, reader, writer, info, next)
87+
return destroyReplicaImpl(ctx, rw, info, next)
9288
}
9389

9490
func destroyReplicaImpl(
95-
ctx context.Context,
96-
reader storage.Reader,
97-
writer storage.Writer,
98-
info DestroyReplicaInfo,
99-
next roachpb.ReplicaID,
91+
ctx context.Context, rw ReadWriter, info DestroyReplicaInfo, next roachpb.ReplicaID,
10092
) error {
10193
if next <= info.ReplicaID {
10294
return errors.AssertionFailedf("%v must not survive its own tombstone", info.FullReplicaID)
10395
}
10496
sl := MakeStateLoader(info.RangeID)
10597
// Assert that the ReplicaID in storage matches the one being removed.
106-
if loaded, err := sl.LoadRaftReplicaID(ctx, reader); err != nil {
98+
if loaded, err := sl.LoadRaftReplicaID(ctx, rw.State.RO); err != nil {
10799
return err
108100
} else if id := loaded.ReplicaID; id != info.ReplicaID {
109101
return errors.AssertionFailedf("%v has a mismatching ID %d", info.FullReplicaID, id)
110102
}
111103
// Assert that the provided tombstone moves the existing one strictly forward.
112104
// A failure would indicate that something is wrong in the replica lifecycle.
113-
if ts, err := sl.LoadRangeTombstone(ctx, reader); err != nil {
105+
if ts, err := sl.LoadRangeTombstone(ctx, rw.State.RO); err != nil {
114106
return err
115107
} else if next <= ts.NextReplicaID {
116108
return errors.AssertionFailedf("%v cannot rewind tombstone from %d to %d",
@@ -141,12 +133,13 @@ func destroyReplicaImpl(
141133
EndKey: keys.RangeTombstoneKey(info.RangeID),
142134
}
143135
if err := storage.ClearRangeWithHeuristic(
144-
ctx, reader, writer, span.Key, span.EndKey, ClearRangeThresholdPointKeys(),
136+
ctx, rw.State.RO, rw.State.WO,
137+
span.Key, span.EndKey, ClearRangeThresholdPointKeys(),
145138
); err != nil {
146139
return err
147140
}
148141
// Save a tombstone to ensure that replica IDs never get reused.
149-
if err := sl.SetRangeTombstone(ctx, writer, kvserverpb.RangeTombstone{
142+
if err := sl.SetRangeTombstone(ctx, rw.State.WO, kvserverpb.RangeTombstone{
150143
NextReplicaID: next, // NB: NextReplicaID > 0
151144
}); err != nil {
152145
return err
@@ -166,20 +159,21 @@ func destroyReplicaImpl(
166159
// TODO(#152845): with separated raft storage, clear only the unapplied suffix
167160
// of the raft log, which is in this span.
168161
if err := storage.ClearRangeWithHeuristic(
169-
ctx, reader, writer, span.Key, span.EndKey, ClearRangeThresholdPointKeys(),
162+
ctx, rw.Raft.RO, rw.Raft.WO,
163+
span.Key, span.EndKey, ClearRangeThresholdPointKeys(),
170164
); err != nil {
171165
return err
172166
}
173-
// TODO(#152845): this key should be cleared in the state machine engine.
174-
if err := sl.ClearRaftReplicaID(writer); err != nil {
167+
if err := sl.ClearRaftReplicaID(rw.State.WO); err != nil {
175168
return err
176169
}
177170
span = roachpb.Span{
178171
Key: span.EndKey.Next(), // RaftReplicaIDKey.Next()
179172
EndKey: keys.MakeRangeIDUnreplicatedPrefix(info.RangeID).PrefixEnd(),
180173
}
181174
if err := storage.ClearRangeWithHeuristic(
182-
ctx, reader, writer, span.Key, span.EndKey, ClearRangeThresholdPointKeys(),
175+
ctx, rw.Raft.RO, rw.Raft.WO,
176+
span.Key, span.EndKey, ClearRangeThresholdPointKeys(),
183177
); err != nil {
184178
return err
185179
}
@@ -190,7 +184,8 @@ func destroyReplicaImpl(
190184
Ranged: rditer.SelectAllRanged(info.Keys),
191185
}) {
192186
if err := storage.ClearRangeWithHeuristic(
193-
ctx, reader, writer, span.Key, span.EndKey, ClearRangeThresholdPointKeys(),
187+
ctx, rw.State.RO, rw.State.WO,
188+
span.Key, span.EndKey, ClearRangeThresholdPointKeys(),
194189
); err != nil {
195190
return err
196191
}
@@ -206,14 +201,14 @@ func destroyReplicaImpl(
206201
// function clears.
207202
// TODO(pav-kv): get rid of SelectOpts.
208203
func SubsumeReplica(
209-
ctx context.Context, reader storage.Reader, writer storage.Writer, info DestroyReplicaInfo,
204+
ctx context.Context, rw ReadWriter, info DestroyReplicaInfo,
210205
) (rditer.SelectOpts, error) {
211206
// Forget about the user keys.
212207
info.Keys = roachpb.RSpan{}
213208
return rditer.SelectOpts{
214209
ReplicatedByRangeID: true,
215210
UnreplicatedByRangeID: true,
216-
}, destroyReplicaImpl(ctx, reader, writer, info, MergedTombstoneReplicaID)
211+
}, destroyReplicaImpl(ctx, rw, info, MergedTombstoneReplicaID)
217212
}
218213

219214
// RemoveStaleRHSFromSplit removes all replicated data for the RHS replica of a

pkg/kv/kvserver/kvstorage/destroy_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ func TestDestroyReplica(t *testing.T) {
6868
})
6969
mutate("destroy", func(rw storage.ReadWriter) {
7070
require.NoError(t, DestroyReplica(
71-
ctx, rw, rw, DestroyReplicaInfo{FullReplicaID: r.id, Keys: r.keys}, r.id.ReplicaID+1,
71+
ctx, TODOReadWriter(rw),
72+
DestroyReplicaInfo{FullReplicaID: r.id, Keys: r.keys}, r.id.ReplicaID+1,
7273
))
7374
})
7475

pkg/kv/kvserver/replica_app_batch.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly(
364364
rhsRepl.readOnlyCmdMu.Unlock()
365365

366366
if _, err := kvstorage.SubsumeReplica(
367-
ctx, b.batch, b.batch, rhsRepl.destroyInfoRaftMuLocked(),
367+
ctx, kvstorage.TODOReadWriter(b.batch), rhsRepl.destroyInfoRaftMuLocked(),
368368
); err != nil {
369369
return errors.Wrapf(err, "unable to subsume replica before merge")
370370
}
@@ -453,7 +453,8 @@ func (b *replicaAppBatch) runPostAddTriggersReplicaOnly(
453453
// above, and DestroyReplica will also add a range tombstone to the
454454
// batch, so that when we commit it, the removal is finalized.
455455
if err := kvstorage.DestroyReplica(
456-
ctx, b.batch, b.batch, b.r.destroyInfoRaftMuLocked(), change.NextReplicaID(),
456+
ctx, kvstorage.TODOReadWriter(b.batch),
457+
b.r.destroyInfoRaftMuLocked(), change.NextReplicaID(),
457458
); err != nil {
458459
return errors.Wrapf(err, "unable to destroy replica before removal")
459460
}

pkg/kv/kvserver/replica_destroy.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb
9191

9292
// TODO(sep-raft-log): need both engines separately here.
9393
if err := kvstorage.DestroyReplica(
94-
ctx, r.store.TODOEngine(), batch, r.destroyInfoRaftMuLocked(), nextReplicaID,
94+
ctx, kvstorage.TODOReaderWriter(r.store.TODOEngine(), batch),
95+
r.destroyInfoRaftMuLocked(), nextReplicaID,
9596
); err != nil {
9697
return err
9798
}

pkg/kv/kvserver/snapshot_apply_prepare.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,9 @@ func (s *snapWriteBuilder) clearSubsumedReplicaDiskData(ctx context.Context) err
149149
for _, sub := range s.subsume {
150150
// We have to create an SST for the subsumed replica's range-id local keys.
151151
if err := s.writeSST(ctx, func(ctx context.Context, w storage.Writer) error {
152-
opts, err := kvstorage.SubsumeReplica(ctx, reader, w, sub)
152+
opts, err := kvstorage.SubsumeReplica(
153+
ctx, kvstorage.TODOReaderWriter(reader, w), sub,
154+
)
153155
s.cleared = append(s.cleared, rditer.Select(sub.RangeID, opts)...)
154156
return err
155157
}); err != nil {

0 commit comments

Comments
 (0)