Skip to content

Commit cad9544

Browse files
craig[bot]pav-kv
andcommitted
Merge #150173
150173: kvserver: use FullReplicaID in creation stack r=arulajmani a=pav-kv This PR moves `FullReplicaID` to be next to `roachpb.{RangeID,ReplicaID}`, and uses it (for type safety) in a few replica creation places where `RangeID` and `ReplicaID` are passed in together. Epic: CRDB-49111 Co-authored-by: Pavel Kalinnikov <[email protected]>
2 parents e4b2d35 + f17cf57 commit cad9544

17 files changed

+120
-136
lines changed

pkg/kv/kvserver/client_raft_log_queue_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
2727
"github.com/cockroachdb/cockroach/pkg/server"
2828
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
29-
"github.com/cockroachdb/cockroach/pkg/storage"
3029
"github.com/cockroachdb/cockroach/pkg/storage/fs"
3130
"github.com/cockroachdb/cockroach/pkg/testutils"
3231
"github.com/cockroachdb/cockroach/pkg/testutils/listenerutil"
@@ -263,7 +262,7 @@ func TestCrashWhileTruncatingSideloadedEntries(t *testing.T) {
263262
propFilter := newAtomicFunc(func(kvserverbase.ProposalFilterArgs) *kvpb.Error {
264263
return nil
265264
})
266-
applyThrottle := newAtomicFunc(func(storage.FullReplicaID) {})
265+
applyThrottle := newAtomicFunc(func(roachpb.FullReplicaID) {})
267266
postSideEffects := newAtomicFunc(func(args kvserverbase.ApplyFilterArgs) (int, *kvpb.Error) {
268267
return 0, nil
269268
})
@@ -293,7 +292,7 @@ func TestCrashWhileTruncatingSideloadedEntries(t *testing.T) {
293292
Store: &kvserver.StoreTestingKnobs{
294293
DisableRaftLogQueue: true, // we send a log truncation manually
295294
DisableSyncLogWriteToss: true, // always use async log writes
296-
TestingAfterRaftLogSync: func(id storage.FullReplicaID) { applyThrottle.get()(id) },
295+
TestingAfterRaftLogSync: func(id roachpb.FullReplicaID) { applyThrottle.get()(id) },
297296
TestingProposalFilter: func(args kvserverbase.ProposalFilterArgs) *kvpb.Error {
298297
return propFilter.get()(args)
299298
},
@@ -347,7 +346,7 @@ func TestCrashWhileTruncatingSideloadedEntries(t *testing.T) {
347346
// Before writing more commands, block the raft commands application flow on
348347
// the follower replica.
349348
unblockApply := make(chan struct{})
350-
applyThrottle.set(func(id storage.FullReplicaID) {
349+
applyThrottle.set(func(id roachpb.FullReplicaID) {
351350
if id == follower.ID() {
352351
applyThrottle.reset()
353352
<-unblockApply

pkg/kv/kvserver/kvstorage/datadriven_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (e *env) close() {
6060
func (e *env) handleNewReplica(
6161
t *testing.T,
6262
ctx context.Context,
63-
id storage.FullReplicaID,
63+
id roachpb.FullReplicaID,
6464
skipRaftReplicaID bool,
6565
k, ek roachpb.RKey,
6666
) *roachpb.RangeDescriptor {
@@ -154,7 +154,7 @@ func TestDataDriven(t *testing.T) {
154154
d.ScanArgs(t, "skip-raft-replica-id", &skipRaftReplicaID)
155155
}
156156
if desc := e.handleNewReplica(t, ctx,
157-
storage.FullReplicaID{RangeID: roachpb.RangeID(rangeID), ReplicaID: roachpb.ReplicaID(replicaID)},
157+
roachpb.FullReplicaID{RangeID: roachpb.RangeID(rangeID), ReplicaID: roachpb.ReplicaID(replicaID)},
158158
skipRaftReplicaID, keys.MustAddr(roachpb.Key(k)), keys.MustAddr(roachpb.Key(ek)),
159159
); desc != nil {
160160
fmt.Fprintln(&buf, desc)

pkg/kv/kvserver/kvstorage/init.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -388,8 +388,8 @@ type Replica struct {
388388
}
389389

390390
// ID returns the FullReplicaID.
391-
func (r Replica) ID() storage.FullReplicaID {
392-
return storage.FullReplicaID{
391+
func (r Replica) ID() roachpb.FullReplicaID {
392+
return roachpb.FullReplicaID{
393393
RangeID: r.RangeID,
394394
ReplicaID: r.ReplicaID,
395395
}

pkg/kv/kvserver/kvstorage/replica_state.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ func LoadReplicaState(
7171
return ls, nil
7272
}
7373

74+
func (r LoadedReplicaState) FullReplicaID() roachpb.FullReplicaID {
75+
return roachpb.FullReplicaID{RangeID: r.ReplState.Desc.RangeID, ReplicaID: r.ReplicaID}
76+
}
77+
7478
// check makes sure that the replica invariants hold for the loaded state.
7579
func (r LoadedReplicaState) check(storeID roachpb.StoreID) error {
7680
desc := r.ReplState.Desc
@@ -116,20 +120,16 @@ const CreateUninitReplicaTODO = 0
116120
// Returns kvpb.RaftGroupDeletedError if this replica can not be created
117121
// because it has been deleted.
118122
func CreateUninitializedReplica(
119-
ctx context.Context,
120-
eng storage.Engine,
121-
storeID roachpb.StoreID,
122-
rangeID roachpb.RangeID,
123-
replicaID roachpb.ReplicaID,
123+
ctx context.Context, eng storage.Engine, storeID roachpb.StoreID, id roachpb.FullReplicaID,
124124
) error {
125-
sl := stateloader.Make(rangeID)
125+
sl := stateloader.Make(id.RangeID)
126126
// Before creating the replica, see if there is a tombstone which would
127127
// indicate that this replica has been removed.
128128
// TODO(pav-kv): should also check that there is no existing replica, i.e.
129129
// ReplicaID load should find nothing.
130130
if ts, err := sl.LoadRangeTombstone(ctx, eng); err != nil {
131131
return err
132-
} else if replicaID < ts.NextReplicaID {
132+
} else if id.ReplicaID < ts.NextReplicaID {
133133
return &kvpb.RaftGroupDeletedError{}
134134
}
135135

@@ -140,12 +140,12 @@ func CreateUninitializedReplica(
140140
// non-existent. The only RangeID-specific key that can be present is the
141141
// RangeTombstone inspected above.
142142
_ = CreateUninitReplicaTODO
143-
if err := sl.SetRaftReplicaID(ctx, eng, replicaID); err != nil {
143+
if err := sl.SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil {
144144
return err
145145
}
146146

147147
// Make sure that storage invariants for this uninitialized replica hold.
148-
uninitDesc := roachpb.RangeDescriptor{RangeID: rangeID}
149-
_, err := LoadReplicaState(ctx, eng, storeID, &uninitDesc, replicaID)
148+
uninitDesc := roachpb.RangeDescriptor{RangeID: id.RangeID}
149+
_, err := LoadReplicaState(ctx, eng, storeID, &uninitDesc, id.ReplicaID)
150150
return err
151151
}

pkg/kv/kvserver/replica.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,8 +1096,8 @@ func (r *Replica) ReplicaID() roachpb.ReplicaID {
10961096
}
10971097

10981098
// ID returns the FullReplicaID for the Replica.
1099-
func (r *Replica) ID() storage.FullReplicaID {
1100-
return storage.FullReplicaID{RangeID: r.RangeID, ReplicaID: r.replicaID}
1099+
func (r *Replica) ID() roachpb.FullReplicaID {
1100+
return roachpb.FullReplicaID{RangeID: r.RangeID, ReplicaID: r.replicaID}
11011101
}
11021102

11031103
// LogStorageRaftMuLocked returns the Replica's log storage.

pkg/kv/kvserver/replica_init.go

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func loadInitializedReplicaForTesting(
7979
func newInitializedReplica(
8080
store *Store, loaded kvstorage.LoadedReplicaState, waitForPrevLeaseToExpire bool,
8181
) (*Replica, error) {
82-
r := newUninitializedReplicaWithoutRaftGroup(store, loaded.ReplState.Desc.RangeID, loaded.ReplicaID)
82+
r := newUninitializedReplicaWithoutRaftGroup(store, loaded.FullReplicaID())
8383
r.raftMu.Lock()
8484
defer r.raftMu.Unlock()
8585
r.mu.Lock()
@@ -99,10 +99,8 @@ func newInitializedReplica(
9999
//
100100
// TODO(#94912): we actually have another initialization path which should be
101101
// refactored: Replica.initFromSnapshotLockedRaftMuLocked().
102-
func newUninitializedReplica(
103-
store *Store, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID,
104-
) (*Replica, error) {
105-
r := newUninitializedReplicaWithoutRaftGroup(store, rangeID, replicaID)
102+
func newUninitializedReplica(store *Store, id roachpb.FullReplicaID) (*Replica, error) {
103+
r := newUninitializedReplicaWithoutRaftGroup(store, id)
106104
r.raftMu.Lock()
107105
defer r.raftMu.Unlock()
108106
r.mu.Lock()
@@ -118,17 +116,15 @@ func newUninitializedReplica(
118116
// newUninitializedReplica() instead. This only exists for
119117
// newInitializedReplica() to avoid creating the Raft group twice (once when
120118
// creating the uninitialized replica, and once when initializing it).
121-
func newUninitializedReplicaWithoutRaftGroup(
122-
store *Store, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID,
123-
) *Replica {
124-
uninitState := stateloader.UninitializedReplicaState(rangeID)
119+
func newUninitializedReplicaWithoutRaftGroup(store *Store, id roachpb.FullReplicaID) *Replica {
120+
uninitState := stateloader.UninitializedReplicaState(id.RangeID)
125121
r := &Replica{
126122
AmbientContext: store.cfg.AmbientCtx,
127-
RangeID: rangeID,
128-
replicaID: replicaID,
123+
RangeID: id.RangeID,
124+
replicaID: id.ReplicaID,
129125
creationTime: timeutil.Now(),
130126
store: store,
131-
abortSpan: abortspan.New(rangeID),
127+
abortSpan: abortspan.New(id.RangeID),
132128
concMgr: concurrency.NewManager(concurrency.Config{
133129
NodeDesc: store.nodeDesc,
134130
RangeDesc: uninitState.Desc,
@@ -145,7 +141,7 @@ func newUninitializedReplicaWithoutRaftGroup(
145141
}),
146142
allocatorToken: &plan.AllocatorToken{},
147143
}
148-
r.sideTransportClosedTimestamp.init(store.cfg.ClosedTimestampReceiver, rangeID)
144+
r.sideTransportClosedTimestamp.init(store.cfg.ClosedTimestampReceiver, id.RangeID)
149145
r.cachedClosedTimestampPolicy.Store(new(ctpb.RangeClosedTimestampPolicy))
150146

151147
r.mu.pendingLeaseRequest = makePendingLeaseRequest(r)
@@ -165,9 +161,9 @@ func newUninitializedReplicaWithoutRaftGroup(
165161
// Expose proposal data for external test packages.
166162
return store.cfg.TestingKnobs.TestingProposalSubmitFilter(kvserverbase.ProposalFilterArgs{
167163
Ctx: p.Context(),
168-
RangeID: rangeID,
164+
RangeID: id.RangeID,
169165
StoreID: store.StoreID(),
170-
ReplicaID: replicaID,
166+
ReplicaID: id.ReplicaID,
171167
Cmd: p.command,
172168
QuotaAlloc: p.quotaAlloc,
173169
CmdID: p.idKey,
@@ -196,7 +192,7 @@ func newUninitializedReplicaWithoutRaftGroup(
196192
// NB: state will be loaded when the replica gets initialized.
197193
r.shMu.state = uninitState
198194

199-
r.rangeStr.store(replicaID, uninitState.Desc)
195+
r.rangeStr.store(id.ReplicaID, uninitState.Desc)
200196
// Add replica log tag - the value is rangeStr.String().
201197
r.AmbientContext.AddLogTag("r", &r.rangeStr)
202198
r.raftCtx = logtags.AddTag(r.AnnotateCtx(context.Background()), "raft", nil /* value */)
@@ -205,14 +201,14 @@ func newUninitializedReplicaWithoutRaftGroup(
205201
// r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r)))
206202

207203
r.raftMu.rangefeedCTLagObserver = newRangeFeedCTLagObserver()
208-
r.raftMu.stateLoader = stateloader.Make(rangeID)
204+
r.raftMu.stateLoader = stateloader.Make(id.RangeID)
209205

210206
// Initialize all the components of the log storage. The state of the log
211207
// storage, such as RaftTruncatedState and the last entry ID, will be loaded
212208
// when the replica is initialized.
213209
sideloaded := logstore.NewDiskSideloadStorage(
214210
store.cfg.Settings,
215-
rangeID,
211+
id.RangeID,
216212
// NB: sideloaded log entries are persisted in the state engine so that they
217213
// can be ingested to the state machine locally, when being applied.
218214
store.StateEngine().GetAuxiliaryDir(),
@@ -229,13 +225,13 @@ func newUninitializedReplicaWithoutRaftGroup(
229225
r.logStorage.mu.RWMutex = (*syncutil.RWMutex)(&r.mu.ReplicaMutex)
230226
r.logStorage.raftMu.Mutex = &r.raftMu.Mutex
231227
r.logStorage.ls = &logstore.LogStore{
232-
RangeID: rangeID,
228+
RangeID: id.RangeID,
233229
Engine: store.LogEngine(),
234230
Sideload: sideloaded,
235231
StateLoader: r.raftMu.stateLoader.StateLoader,
236232
// NOTE: use the same SyncWaiter loop for all raft log writes performed by a
237233
// given range ID, to ensure that callbacks are processed in order.
238-
SyncWaiter: store.syncWaiters[int(rangeID)%len(store.syncWaiters)],
234+
SyncWaiter: store.syncWaiters[int(id.RangeID)%len(store.syncWaiters)],
239235
Settings: store.cfg.Settings,
240236
DisableSyncLogWriteToss: buildutil.CrdbTestBuild &&
241237
store.TestingKnobs().DisableSyncLogWriteToss,

pkg/kv/kvserver/replica_raft.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2817,9 +2817,10 @@ func (r *Replica) acquireSplitLock(
28172817
ctx context.Context, split *roachpb.SplitTrigger,
28182818
) (func(), error) {
28192819
rightReplDesc, _ := split.RightDesc.GetReplicaDescriptor(r.StoreID())
2820-
rightRepl, _, err := r.store.getOrCreateReplica(
2821-
ctx, split.RightDesc.RangeID, rightReplDesc.ReplicaID, nil, /* creatingReplica */
2822-
)
2820+
rightRepl, _, err := r.store.getOrCreateReplica(ctx, roachpb.FullReplicaID{
2821+
RangeID: split.RightDesc.RangeID,
2822+
ReplicaID: rightReplDesc.ReplicaID,
2823+
}, nil /* creatingReplica */)
28232824
// If getOrCreateReplica returns RaftGroupDeletedError we know that the RHS
28242825
// has already been removed. This case is handled properly in splitPostApply.
28252826
if errors.HasType(err, (*kvpb.RaftGroupDeletedError)(nil)) {
@@ -2852,9 +2853,10 @@ func (r *Replica) acquireMergeLock(
28522853
// complete, after which the replica will realize it has been destroyed and
28532854
// reject the snapshot.
28542855
rightReplDesc, _ := merge.RightDesc.GetReplicaDescriptor(r.StoreID())
2855-
rightRepl, _, err := r.store.getOrCreateReplica(
2856-
ctx, merge.RightDesc.RangeID, rightReplDesc.ReplicaID, nil, /* creatingReplica */
2857-
)
2856+
rightRepl, _, err := r.store.getOrCreateReplica(ctx, roachpb.FullReplicaID{
2857+
RangeID: merge.RightDesc.RangeID,
2858+
ReplicaID: rightReplDesc.ReplicaID,
2859+
}, nil /* creatingReplica */)
28582860
if err != nil {
28592861
return nil, err
28602862
}

pkg/kv/kvserver/snapshot_apply_prepare.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
// snapWriteBuilder contains the data needed to prepare the on-disk state for a
2424
// snapshot.
2525
type snapWriteBuilder struct {
26-
id storage.FullReplicaID
26+
id roachpb.FullReplicaID
2727

2828
todoEng storage.Engine
2929
sl stateloader.StateLoader

pkg/kv/kvserver/snapshot_apply_prepare_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func TestPrepareSnapApply(t *testing.T) {
6969
}
7070
}
7171

72-
id := storage.FullReplicaID{RangeID: 123, ReplicaID: 4}
72+
id := roachpb.FullReplicaID{RangeID: 123, ReplicaID: 4}
7373
descA := desc(101, "a", "b")
7474
descB := desc(102, "b", "z")
7575
createRangeData(t, eng, *descA)

0 commit comments

Comments
 (0)