Skip to content

Commit 1b915b1

Browse files
committed
kvserver: use FullReplicaID in creation stack
Epic: none Release note: none
1 parent e4b2d35 commit 1b915b1

File tree

6 files changed

+85
-96
lines changed

6 files changed

+85
-96
lines changed

pkg/kv/kvserver/kvstorage/replica_state.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,10 @@ func LoadReplicaState(
7171
return ls, nil
7272
}
7373

74+
func (r LoadedReplicaState) FullReplicaID() storage.FullReplicaID {
75+
return storage.FullReplicaID{RangeID: r.ReplState.Desc.RangeID, ReplicaID: r.ReplicaID}
76+
}
77+
7478
// check makes sure that the replica invariants hold for the loaded state.
7579
func (r LoadedReplicaState) check(storeID roachpb.StoreID) error {
7680
desc := r.ReplState.Desc
@@ -116,20 +120,16 @@ const CreateUninitReplicaTODO = 0
116120
// Returns kvpb.RaftGroupDeletedError if this replica can not be created
117121
// because it has been deleted.
118122
func CreateUninitializedReplica(
119-
ctx context.Context,
120-
eng storage.Engine,
121-
storeID roachpb.StoreID,
122-
rangeID roachpb.RangeID,
123-
replicaID roachpb.ReplicaID,
123+
ctx context.Context, eng storage.Engine, storeID roachpb.StoreID, id storage.FullReplicaID,
124124
) error {
125-
sl := stateloader.Make(rangeID)
125+
sl := stateloader.Make(id.RangeID)
126126
// Before creating the replica, see if there is a tombstone which would
127127
// indicate that this replica has been removed.
128128
// TODO(pav-kv): should also check that there is no existing replica, i.e.
129129
// ReplicaID load should find nothing.
130130
if ts, err := sl.LoadRangeTombstone(ctx, eng); err != nil {
131131
return err
132-
} else if replicaID < ts.NextReplicaID {
132+
} else if id.ReplicaID < ts.NextReplicaID {
133133
return &kvpb.RaftGroupDeletedError{}
134134
}
135135

@@ -140,12 +140,12 @@ func CreateUninitializedReplica(
140140
// non-existent. The only RangeID-specific key that can be present is the
141141
// RangeTombstone inspected above.
142142
_ = CreateUninitReplicaTODO
143-
if err := sl.SetRaftReplicaID(ctx, eng, replicaID); err != nil {
143+
if err := sl.SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil {
144144
return err
145145
}
146146

147147
// Make sure that storage invariants for this uninitialized replica hold.
148-
uninitDesc := roachpb.RangeDescriptor{RangeID: rangeID}
149-
_, err := LoadReplicaState(ctx, eng, storeID, &uninitDesc, replicaID)
148+
uninitDesc := roachpb.RangeDescriptor{RangeID: id.RangeID}
149+
_, err := LoadReplicaState(ctx, eng, storeID, &uninitDesc, id.ReplicaID)
150150
return err
151151
}

pkg/kv/kvserver/replica_init.go

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/cockroachdb/cockroach/pkg/roachpb"
3030
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
3131
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
32+
"github.com/cockroachdb/cockroach/pkg/storage"
3233
"github.com/cockroachdb/cockroach/pkg/util"
3334
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
3435
"github.com/cockroachdb/cockroach/pkg/util/envutil"
@@ -79,7 +80,7 @@ func loadInitializedReplicaForTesting(
7980
func newInitializedReplica(
8081
store *Store, loaded kvstorage.LoadedReplicaState, waitForPrevLeaseToExpire bool,
8182
) (*Replica, error) {
82-
r := newUninitializedReplicaWithoutRaftGroup(store, loaded.ReplState.Desc.RangeID, loaded.ReplicaID)
83+
r := newUninitializedReplicaWithoutRaftGroup(store, loaded.FullReplicaID())
8384
r.raftMu.Lock()
8485
defer r.raftMu.Unlock()
8586
r.mu.Lock()
@@ -99,10 +100,8 @@ func newInitializedReplica(
99100
//
100101
// TODO(#94912): we actually have another initialization path which should be
101102
// refactored: Replica.initFromSnapshotLockedRaftMuLocked().
102-
func newUninitializedReplica(
103-
store *Store, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID,
104-
) (*Replica, error) {
105-
r := newUninitializedReplicaWithoutRaftGroup(store, rangeID, replicaID)
103+
func newUninitializedReplica(store *Store, id storage.FullReplicaID) (*Replica, error) {
104+
r := newUninitializedReplicaWithoutRaftGroup(store, id)
106105
r.raftMu.Lock()
107106
defer r.raftMu.Unlock()
108107
r.mu.Lock()
@@ -118,17 +117,15 @@ func newUninitializedReplica(
118117
// newUninitializedReplica() instead. This only exists for
119118
// newInitializedReplica() to avoid creating the Raft group twice (once when
120119
// creating the uninitialized replica, and once when initializing it).
121-
func newUninitializedReplicaWithoutRaftGroup(
122-
store *Store, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID,
123-
) *Replica {
124-
uninitState := stateloader.UninitializedReplicaState(rangeID)
120+
func newUninitializedReplicaWithoutRaftGroup(store *Store, id storage.FullReplicaID) *Replica {
121+
uninitState := stateloader.UninitializedReplicaState(id.RangeID)
125122
r := &Replica{
126123
AmbientContext: store.cfg.AmbientCtx,
127-
RangeID: rangeID,
128-
replicaID: replicaID,
124+
RangeID: id.RangeID,
125+
replicaID: id.ReplicaID,
129126
creationTime: timeutil.Now(),
130127
store: store,
131-
abortSpan: abortspan.New(rangeID),
128+
abortSpan: abortspan.New(id.RangeID),
132129
concMgr: concurrency.NewManager(concurrency.Config{
133130
NodeDesc: store.nodeDesc,
134131
RangeDesc: uninitState.Desc,
@@ -145,7 +142,7 @@ func newUninitializedReplicaWithoutRaftGroup(
145142
}),
146143
allocatorToken: &plan.AllocatorToken{},
147144
}
148-
r.sideTransportClosedTimestamp.init(store.cfg.ClosedTimestampReceiver, rangeID)
145+
r.sideTransportClosedTimestamp.init(store.cfg.ClosedTimestampReceiver, id.RangeID)
149146
r.cachedClosedTimestampPolicy.Store(new(ctpb.RangeClosedTimestampPolicy))
150147

151148
r.mu.pendingLeaseRequest = makePendingLeaseRequest(r)
@@ -165,9 +162,9 @@ func newUninitializedReplicaWithoutRaftGroup(
165162
// Expose proposal data for external test packages.
166163
return store.cfg.TestingKnobs.TestingProposalSubmitFilter(kvserverbase.ProposalFilterArgs{
167164
Ctx: p.Context(),
168-
RangeID: rangeID,
165+
RangeID: id.RangeID,
169166
StoreID: store.StoreID(),
170-
ReplicaID: replicaID,
167+
ReplicaID: id.ReplicaID,
171168
Cmd: p.command,
172169
QuotaAlloc: p.quotaAlloc,
173170
CmdID: p.idKey,
@@ -196,7 +193,7 @@ func newUninitializedReplicaWithoutRaftGroup(
196193
// NB: state will be loaded when the replica gets initialized.
197194
r.shMu.state = uninitState
198195

199-
r.rangeStr.store(replicaID, uninitState.Desc)
196+
r.rangeStr.store(id.ReplicaID, uninitState.Desc)
200197
// Add replica log tag - the value is rangeStr.String().
201198
r.AmbientContext.AddLogTag("r", &r.rangeStr)
202199
r.raftCtx = logtags.AddTag(r.AnnotateCtx(context.Background()), "raft", nil /* value */)
@@ -205,14 +202,14 @@ func newUninitializedReplicaWithoutRaftGroup(
205202
// r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r)))
206203

207204
r.raftMu.rangefeedCTLagObserver = newRangeFeedCTLagObserver()
208-
r.raftMu.stateLoader = stateloader.Make(rangeID)
205+
r.raftMu.stateLoader = stateloader.Make(id.RangeID)
209206

210207
// Initialize all the components of the log storage. The state of the log
211208
// storage, such as RaftTruncatedState and the last entry ID, will be loaded
212209
// when the replica is initialized.
213210
sideloaded := logstore.NewDiskSideloadStorage(
214211
store.cfg.Settings,
215-
rangeID,
212+
id.RangeID,
216213
// NB: sideloaded log entries are persisted in the state engine so that they
217214
// can be ingested to the state machine locally, when being applied.
218215
store.StateEngine().GetAuxiliaryDir(),
@@ -229,13 +226,13 @@ func newUninitializedReplicaWithoutRaftGroup(
229226
r.logStorage.mu.RWMutex = (*syncutil.RWMutex)(&r.mu.ReplicaMutex)
230227
r.logStorage.raftMu.Mutex = &r.raftMu.Mutex
231228
r.logStorage.ls = &logstore.LogStore{
232-
RangeID: rangeID,
229+
RangeID: id.RangeID,
233230
Engine: store.LogEngine(),
234231
Sideload: sideloaded,
235232
StateLoader: r.raftMu.stateLoader.StateLoader,
236233
// NOTE: use the same SyncWaiter loop for all raft log writes performed by a
237234
// given range ID, to ensure that callbacks are processed in order.
238-
SyncWaiter: store.syncWaiters[int(rangeID)%len(store.syncWaiters)],
235+
SyncWaiter: store.syncWaiters[int(id.RangeID)%len(store.syncWaiters)],
239236
Settings: store.cfg.Settings,
240237
DisableSyncLogWriteToss: buildutil.CrdbTestBuild &&
241238
store.TestingKnobs().DisableSyncLogWriteToss,

pkg/kv/kvserver/replica_raft.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2817,9 +2817,10 @@ func (r *Replica) acquireSplitLock(
28172817
ctx context.Context, split *roachpb.SplitTrigger,
28182818
) (func(), error) {
28192819
rightReplDesc, _ := split.RightDesc.GetReplicaDescriptor(r.StoreID())
2820-
rightRepl, _, err := r.store.getOrCreateReplica(
2821-
ctx, split.RightDesc.RangeID, rightReplDesc.ReplicaID, nil, /* creatingReplica */
2822-
)
2820+
rightRepl, _, err := r.store.getOrCreateReplica(ctx, storage.FullReplicaID{
2821+
RangeID: split.RightDesc.RangeID,
2822+
ReplicaID: rightReplDesc.ReplicaID,
2823+
}, nil /* creatingReplica */)
28232824
// If getOrCreateReplica returns RaftGroupDeletedError we know that the RHS
28242825
// has already been removed. This case is handled properly in splitPostApply.
28252826
if errors.HasType(err, (*kvpb.RaftGroupDeletedError)(nil)) {
@@ -2852,9 +2853,10 @@ func (r *Replica) acquireMergeLock(
28522853
// complete, after which the replica will realize it has been destroyed and
28532854
// reject the snapshot.
28542855
rightReplDesc, _ := merge.RightDesc.GetReplicaDescriptor(r.StoreID())
2855-
rightRepl, _, err := r.store.getOrCreateReplica(
2856-
ctx, merge.RightDesc.RangeID, rightReplDesc.ReplicaID, nil, /* creatingReplica */
2857-
)
2856+
rightRepl, _, err := r.store.getOrCreateReplica(ctx, storage.FullReplicaID{
2857+
RangeID: merge.RightDesc.RangeID,
2858+
ReplicaID: rightReplDesc.ReplicaID,
2859+
}, nil /* creatingReplica */)
28582860
if err != nil {
28592861
return nil, err
28602862
}

pkg/kv/kvserver/store_create_replica.go

Lines changed: 23 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1313
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
1414
"github.com/cockroachdb/cockroach/pkg/roachpb"
15+
"github.com/cockroachdb/cockroach/pkg/storage"
1516
"github.com/cockroachdb/cockroach/pkg/util/log"
1617
"github.com/cockroachdb/cockroach/pkg/util/retry"
1718
"github.com/cockroachdb/errors"
@@ -51,13 +52,10 @@ var errRetry = errors.New("retry: orphaned replica")
5152
//
5253
// The caller must not hold the store's lock.
5354
func (s *Store) getOrCreateReplica(
54-
ctx context.Context,
55-
rangeID roachpb.RangeID,
56-
replicaID roachpb.ReplicaID,
57-
creatingReplica *roachpb.ReplicaDescriptor,
55+
ctx context.Context, id storage.FullReplicaID, creatingReplica *roachpb.ReplicaDescriptor,
5856
) (_ *Replica, created bool, _ error) {
59-
if replicaID == 0 {
60-
log.Fatalf(ctx, "cannot construct a Replica for range %d with 0 id", rangeID)
57+
if id.ReplicaID == 0 {
58+
log.Fatalf(ctx, "cannot construct a Replica for range %d with 0 id", id.RangeID)
6159
}
6260
// We need a retry loop as the replica we find in the map may be in the
6361
// process of being removed or may need to be removed. Retries in the loop
@@ -71,12 +69,7 @@ func (s *Store) getOrCreateReplica(
7169
})
7270
for {
7371
r.Next()
74-
r, created, err := s.tryGetOrCreateReplica(
75-
ctx,
76-
rangeID,
77-
replicaID,
78-
creatingReplica,
79-
)
72+
r, created, err := s.tryGetOrCreateReplica(ctx, id, creatingReplica)
8073
if errors.Is(err, errRetry) {
8174
continue
8275
}
@@ -92,12 +85,9 @@ func (s *Store) getOrCreateReplica(
9285
// removed. Returns errRetry error if the replica is in a transitional state and
9386
// its retrieval needs to be retried. Other errors are permanent.
9487
func (s *Store) tryGetReplica(
95-
ctx context.Context,
96-
rangeID roachpb.RangeID,
97-
replicaID roachpb.ReplicaID,
98-
creatingReplica *roachpb.ReplicaDescriptor,
88+
ctx context.Context, id storage.FullReplicaID, creatingReplica *roachpb.ReplicaDescriptor,
9989
) (*Replica, error) {
100-
repl, found := s.mu.replicasByRangeID.Load(rangeID)
90+
repl, found := s.mu.replicasByRangeID.Load(id.RangeID)
10191
if !found {
10292
return nil, nil
10393
}
@@ -120,14 +110,14 @@ func (s *Store) tryGetReplica(
120110
}
121111

122112
// The current replica needs to be removed, remove it and go back around.
123-
if toTooOld := repl.replicaID < replicaID; toTooOld {
113+
if toTooOld := repl.replicaID < id.ReplicaID; toTooOld {
124114
if shouldLog := log.V(1); shouldLog {
125115
log.Infof(ctx, "found message for replica ID %d which is newer than %v",
126-
replicaID, repl)
116+
id.ReplicaID, repl)
127117
}
128118

129119
repl.mu.RUnlock()
130-
if err := s.removeReplicaRaftMuLocked(ctx, repl, replicaID, "superseded by newer Replica", RemoveOptions{
120+
if err := s.removeReplicaRaftMuLocked(ctx, repl, id.ReplicaID, "superseded by newer Replica", RemoveOptions{
131121
DestroyData: true,
132122
}); err != nil {
133123
log.Fatalf(ctx, "failed to remove replica: %v", err)
@@ -137,17 +127,17 @@ func (s *Store) tryGetReplica(
137127
}
138128
defer repl.mu.RUnlock()
139129

140-
if repl.replicaID > replicaID {
130+
if repl.replicaID > id.ReplicaID {
141131
// The sender is behind and is sending to an old replica.
142132
// We could silently drop this message but this way we'll inform the
143133
// sender that they may no longer exist.
144134
repl.raftMu.Unlock()
145135
return nil, &kvpb.RaftGroupDeletedError{}
146136
}
147-
if repl.replicaID != replicaID {
137+
if repl.replicaID != id.ReplicaID {
148138
// This case should have been caught by handleToReplicaTooOld.
149139
log.Fatalf(ctx, "intended replica id %d unexpectedly does not match the current replica %v",
150-
replicaID, repl)
140+
id.ReplicaID, repl)
151141
}
152142
return repl, nil
153143
}
@@ -159,13 +149,10 @@ func (s *Store) tryGetReplica(
159149
// tryGetOrCreateReplica will likely succeed, hence the loop in
160150
// getOrCreateReplica.
161151
func (s *Store) tryGetOrCreateReplica(
162-
ctx context.Context,
163-
rangeID roachpb.RangeID,
164-
replicaID roachpb.ReplicaID,
165-
creatingReplica *roachpb.ReplicaDescriptor,
152+
ctx context.Context, id storage.FullReplicaID, creatingReplica *roachpb.ReplicaDescriptor,
166153
) (_ *Replica, created bool, _ error) {
167154
// The common case: look up an existing replica.
168-
if repl, err := s.tryGetReplica(ctx, rangeID, replicaID, creatingReplica); err != nil {
155+
if repl, err := s.tryGetReplica(ctx, id, creatingReplica); err != nil {
169156
return nil, false, err
170157
} else if repl != nil {
171158
return repl, false, nil
@@ -175,24 +162,24 @@ func (s *Store) tryGetOrCreateReplica(
175162
// be racing at this point, so grab a "lock" over this rangeID (represented by
176163
// s.mu.creatingReplicas[rangeID]) by one goroutine, and retry others.
177164
s.mu.Lock()
178-
if _, ok := s.mu.creatingReplicas[rangeID]; ok {
165+
if _, ok := s.mu.creatingReplicas[id.RangeID]; ok {
179166
// Lost the race - another goroutine is currently creating that replica. Let
180167
// the caller retry so that they can eventually see it.
181168
s.mu.Unlock()
182169
return nil, false, errRetry
183170
}
184-
s.mu.creatingReplicas[rangeID] = struct{}{}
171+
s.mu.creatingReplicas[id.RangeID] = struct{}{}
185172
s.mu.Unlock()
186173
defer func() {
187174
s.mu.Lock()
188-
delete(s.mu.creatingReplicas, rangeID)
175+
delete(s.mu.creatingReplicas, id.RangeID)
189176
s.mu.Unlock()
190177
}()
191178
// Now we are the only goroutine trying to create a replica for this rangeID.
192179

193180
// Repeat the quick path in case someone has overtaken us while we were
194181
// grabbing the "lock".
195-
if repl, err := s.tryGetReplica(ctx, rangeID, replicaID, creatingReplica); err != nil {
182+
if repl, err := s.tryGetReplica(ctx, id, creatingReplica); err != nil {
196183
return nil, false, err
197184
} else if repl != nil {
198185
return repl, false, nil
@@ -204,16 +191,16 @@ func (s *Store) tryGetOrCreateReplica(
204191
// Replica for this rangeID, and that's us.
205192

206193
_ = kvstorage.CreateUninitReplicaTODO
194+
// TODO(sep-raft-log): needs both engines due to tombstone (which lives on
195+
// statemachine).
207196
if err := kvstorage.CreateUninitializedReplica(
208-
// TODO(sep-raft-log): needs both engines due to tombstone (which lives on
209-
// statemachine).
210-
ctx, s.TODOEngine(), s.StoreID(), rangeID, replicaID,
197+
ctx, s.TODOEngine(), s.StoreID(), id,
211198
); err != nil {
212199
return nil, false, err
213200
}
214201

215202
// Create a new uninitialized replica and lock it for raft processing.
216-
repl, err := newUninitializedReplica(s, rangeID, replicaID)
203+
repl, err := newUninitializedReplica(s, id)
217204
if err != nil {
218205
return nil, false, err
219206
}

pkg/kv/kvserver/store_raft.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
2020
"github.com/cockroachdb/cockroach/pkg/roachpb"
2121
"github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
22+
"github.com/cockroachdb/cockroach/pkg/storage"
2223
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
2324
"github.com/cockroachdb/cockroach/pkg/util/grunning"
2425
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -421,12 +422,9 @@ func (s *Store) withReplicaForRequest(
421422
f func(context.Context, *Replica) *kvpb.Error,
422423
) *kvpb.Error {
423424
// Lazily create the replica.
424-
r, _, err := s.getOrCreateReplica(
425-
ctx,
426-
req.RangeID,
427-
req.ToReplica.ReplicaID,
428-
&req.FromReplica,
429-
)
425+
r, _, err := s.getOrCreateReplica(ctx, storage.FullReplicaID{
426+
RangeID: req.RangeID, ReplicaID: req.ToReplica.ReplicaID,
427+
}, &req.FromReplica)
430428
if err != nil {
431429
return kvpb.NewError(err)
432430
}

0 commit comments

Comments
 (0)