Skip to content

Commit d65c1d6

Browse files
committed
mma: make external caller mostly use an ExternalRangeChange
An ExternalRangeChange does not have the currently mutable fields of PendingRangeChange (and the set of mutable fields is going to expand in the near future). Todos related to limitation of viewing a change as a "change replicas" or "lease transfer" etc., are moved to ExternalRangeChange, so compartmentalized to methods that are called by integration code outside the mma package. There is still some potential future cleanup involving unexporting PendingRangeChange, which is not in scope for this PR. The PendingRangeChange becomes a temporary container for a slice of []*pendingReplicaChanges. Printing this struct no longer involves viewing it as a "change replicas" or "lease transfer". Furthermore, the ReplicaChange.replicaChangeType field is removed and the change type is now a function of prev and next. This sets us up for making prev mutable in a future PR. Informs #157049 Epic: CRDB-55052 Release note: None
1 parent 6477907 commit d65c1d6

File tree

11 files changed

+297
-155
lines changed

11 files changed

+297
-155
lines changed

pkg/kv/kvserver/allocator/mmaprototype/allocator.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,16 @@ type Allocator interface {
5656
// Calls to AdjustPendingChangeDisposition must be correctly sequenced with
5757
// full state updates from the local node provided in
5858
// ProcessNodeLoadResponse.
59-
AdjustPendingChangeDisposition(change PendingRangeChange, success bool)
59+
AdjustPendingChangeDisposition(change ExternalRangeChange, success bool)
6060

6161
// RegisterExternalChange informs this allocator about yet to complete
6262
// changes to the cluster which were not initiated by this allocator. The
6363
// ownership of all state inside change is handed off to the callee. If ok
64-
// is true, the change was registered, and the caller should subsequently
65-
// use the same change in a subsequent call to
64+
// is true, the change was registered, and the caller is returned an
65+
// ExternalRangeChange that it should subsequently use in a call to
6666
// AdjustPendingChangeDisposition when the changes are completed, either
6767
// successfully or not. If ok is false, the change was not registered.
68-
RegisterExternalChange(change PendingRangeChange) (ok bool)
68+
RegisterExternalChange(change PendingRangeChange) (_ ExternalRangeChange, ok bool)
6969

7070
// ComputeChanges is called periodically and frequently, say every 10s.
7171
//
@@ -82,7 +82,7 @@ type Allocator interface {
8282
// the allocator, to avoid re-proposing the same change and to make
8383
// adjustments to the load.
8484
ComputeChanges(
85-
ctx context.Context, msg *StoreLeaseholderMsg, opts ChangeOptions) []PendingRangeChange
85+
ctx context.Context, msg *StoreLeaseholderMsg, opts ChangeOptions) []ExternalRangeChange
8686

8787
// AdminRelocateOne is a helper for AdminRelocateRange.
8888
//

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoad
254254
}
255255

256256
// AdjustPendingChangeDisposition implements the Allocator interface.
257-
func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChange, success bool) {
257+
func (a *allocatorState) AdjustPendingChangeDisposition(change ExternalRangeChange, success bool) {
258258
a.mu.Lock()
259259
defer a.mu.Unlock()
260260
rs, ok := a.cs.ranges[change.RangeID]
@@ -274,7 +274,7 @@ func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChang
274274
// can be made about whether these changes will be found in the allocator's
275275
// state. We gather the found changes.
276276
var changes []*pendingReplicaChange
277-
for _, c := range change.pendingReplicaChanges {
277+
for _, c := range change.Changes {
278278
ch, ok := a.cs.pendingChanges[c.changeID]
279279
if !ok {
280280
continue
@@ -303,25 +303,27 @@ func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChang
303303
}
304304

305305
// RegisterExternalChange implements the Allocator interface.
306-
func (a *allocatorState) RegisterExternalChange(change PendingRangeChange) (ok bool) {
306+
func (a *allocatorState) RegisterExternalChange(
307+
change PendingRangeChange,
308+
) (_ ExternalRangeChange, ok bool) {
307309
a.mu.Lock()
308310
defer a.mu.Unlock()
309311
if err := a.cs.preCheckOnApplyReplicaChanges(change); err != nil {
310312
a.mmaMetrics.ExternalFailedToRegister.Inc(1)
311313
log.KvDistribution.Infof(context.Background(),
312314
"did not register external changes: due to %v", err)
313-
return false
315+
return ExternalRangeChange{}, false
314316
} else {
315317
a.mmaMetrics.ExternaRegisterSuccess.Inc(1)
316318
}
317319
a.cs.addPendingRangeChange(change)
318-
return true
320+
return MakeExternalRangeChange(change), true
319321
}
320322

321323
// ComputeChanges implements the Allocator interface.
322324
func (a *allocatorState) ComputeChanges(
323325
ctx context.Context, msg *StoreLeaseholderMsg, opts ChangeOptions,
324-
) []PendingRangeChange {
326+
) []ExternalRangeChange {
325327
a.mu.Lock()
326328
defer a.mu.Unlock()
327329
if msg.StoreID != opts.LocalStoreID {

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 137 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ func (s ReplicaChangeType) String() string {
159159
}
160160
}
161161

162+
func replicaExists(replicaID roachpb.ReplicaID) bool {
163+
return replicaID >= 0 || replicaID == unknownReplicaID
164+
}
165+
162166
// ReplicaChange describes a change to a replica.
163167
type ReplicaChange struct {
164168
// The load this change adds to a store. The values will be negative if the
@@ -219,9 +223,6 @@ type ReplicaChange struct {
219223
// So the above comment is incorrect. We should clean this up.
220224
prev ReplicaState
221225
next ReplicaIDAndType
222-
223-
// replicaChangeType is a function of (prev, next) as described above.
224-
replicaChangeType ReplicaChangeType
225226
}
226227

227228
func (rc ReplicaChange) String() string {
@@ -230,30 +231,50 @@ func (rc ReplicaChange) String() string {
230231

231232
// SafeFormat implements the redact.SafeFormatter interface.
232233
func (rc ReplicaChange) SafeFormat(w redact.SafePrinter, _ rune) {
233-
w.Printf("r%v type: %v target store %v (%v)->(%v)", rc.rangeID, rc.replicaChangeType, rc.target, rc.prev, rc.next)
234+
w.Printf("r%v type: %v target store %v (%v)->(%v)", rc.rangeID, rc.replicaChangeType(), rc.target, rc.prev, rc.next)
234235
}
235236

236237
// isRemoval returns true if the change is a removal of a replica.
237238
func (rc ReplicaChange) isRemoval() bool {
238-
return rc.replicaChangeType == RemoveReplica
239+
return rc.replicaChangeType() == RemoveReplica
239240
}
240241

241242
// isAddition returns true if the change is an addition of a replica.
242243
func (rc ReplicaChange) isAddition() bool {
243-
return rc.replicaChangeType == AddReplica
244+
return rc.replicaChangeType() == AddReplica
244245
}
245246

246247
// isUpdate returns true if the change is an update to the replica type or
247248
// leaseholder status. This includes promotion/demotion changes.
248249
func (rc ReplicaChange) isUpdate() bool {
249-
return rc.replicaChangeType == AddLease || rc.replicaChangeType == RemoveLease ||
250-
rc.replicaChangeType == ChangeReplica
250+
changeType := rc.replicaChangeType()
251+
return changeType == AddLease || changeType == RemoveLease || changeType == ChangeReplica
251252
}
252253

253-
// isPromoDemo returns true if the change is a promotion or demotion of a
254-
// replica (potentially with a lease change).
255-
func (rc ReplicaChange) isPromoDemo() bool {
256-
return rc.replicaChangeType == ChangeReplica
254+
func (rc ReplicaChange) replicaChangeType() ReplicaChangeType {
255+
prevExists := replicaExists(rc.prev.ReplicaID)
256+
nextExists := replicaExists(rc.next.ReplicaID)
257+
if !prevExists && !nextExists {
258+
return Unknown
259+
}
260+
if prevExists && !nextExists {
261+
return RemoveReplica
262+
}
263+
// INVARIANT: nextExists.
264+
265+
if !prevExists {
266+
return AddReplica
267+
}
268+
if rc.prev.ReplicaType.ReplicaType == rc.next.ReplicaType.ReplicaType {
269+
if rc.prev.ReplicaType.IsLeaseholder == rc.next.ReplicaType.IsLeaseholder {
270+
return Unknown
271+
}
272+
if rc.next.IsLeaseholder {
273+
return AddLease
274+
}
275+
return RemoveLease
276+
}
277+
return ChangeReplica
257278
}
258279

259280
func MakeLeaseTransferChanges(
@@ -297,18 +318,16 @@ func MakeLeaseTransferChanges(
297318
}
298319

299320
removeLease := ReplicaChange{
300-
target: removeTarget,
301-
rangeID: rangeID,
302-
prev: remove.ReplicaState,
303-
next: remove.ReplicaIDAndType,
304-
replicaChangeType: RemoveLease,
321+
target: removeTarget,
322+
rangeID: rangeID,
323+
prev: remove.ReplicaState,
324+
next: remove.ReplicaIDAndType,
305325
}
306326
addLease := ReplicaChange{
307-
target: addTarget,
308-
rangeID: rangeID,
309-
prev: add.ReplicaState,
310-
next: add.ReplicaIDAndType,
311-
replicaChangeType: AddLease,
327+
target: addTarget,
328+
rangeID: rangeID,
329+
prev: add.ReplicaState,
330+
next: add.ReplicaIDAndType,
312331
}
313332
removeLease.next.IsLeaseholder = false
314333
addLease.next.IsLeaseholder = true
@@ -343,8 +362,7 @@ func MakeAddReplicaChange(
343362
ReplicaID: noReplicaID,
344363
},
345364
},
346-
next: replicaIDAndType,
347-
replicaChangeType: AddReplica,
365+
next: replicaIDAndType,
348366
}
349367
addReplica.next.ReplicaID = unknownReplicaID
350368
addReplica.loadDelta.add(loadVectorToAdd(rLoad.Load))
@@ -373,7 +391,6 @@ func MakeRemoveReplicaChange(
373391
next: ReplicaIDAndType{
374392
ReplicaID: noReplicaID,
375393
},
376-
replicaChangeType: RemoveReplica,
377394
}
378395
removeReplica.loadDelta.subtract(rLoad.Load)
379396
if replicaState.IsLeaseholder {
@@ -398,11 +415,10 @@ func MakeReplicaTypeChange(
398415
next.ReplicaID = unknownReplicaID
399416
next.ReplicaType.ReplicaType = mapReplicaTypeToVoterOrNonVoter(next.ReplicaType.ReplicaType)
400417
change := ReplicaChange{
401-
target: target,
402-
rangeID: rangeID,
403-
prev: prev,
404-
next: next,
405-
replicaChangeType: ChangeReplica,
418+
target: target,
419+
rangeID: rangeID,
420+
prev: prev,
421+
next: next,
406422
}
407423
if next.IsLeaseholder {
408424
change.secondaryLoadDelta[LeaseCount] = 1
@@ -453,26 +469,25 @@ func mapReplicaTypeToVoterOrNonVoter(rType roachpb.ReplicaType) roachpb.ReplicaT
453469
}
454470
}
455471

456-
// PendingRangeChange is a proposed set of change(s) to a range. It can
457-
// consist of multiple pending replica changes, such as adding or removing
458-
// replicas, or transferring the lease. There is at most one change per store
459-
// in the set.
460-
//
461-
// NB: pendingReplicaChanges is not visible outside the package, so we can be
462-
// certain that callers outside this package that hold a PendingRangeChange
463-
// cannot mutate the internals other than clearing the state.
472+
// TODO(sumeer): place PendingRangeChange in a different file after
473+
// https://github.com/cockroachdb/cockroach/pull/158024 merges.
464474
//
465-
// Additionally, for a PendingRangeChange returned outside the package, we
466-
// ensure that the pendingReplicaChanges slice itself is not shared with the
467-
// rangeState.pendingChanges slice since the rangeState.pendingChanges slice
468-
// can have entries removed from it (and swapped around as part of removal).
475+
// TODO(sumeer): PendingRangeChange is exported only because external callers
476+
// need to be able to represent load changes when calling
477+
// RegisterExternalChange. The ExternalRangeChange type does not represent
478+
// load. There is possibly a better way to structure this, by including the
479+
// LoadVector and SecondaryLoadVector in the ExternalRangeChange type, and
480+
// unexporting PendingRangeChange.
481+
482+
// PendingRangeChange is a container for a proposed set of change(s) to a
483+
// range. It can consist of multiple pending replica changes, such as adding
484+
// or removing replicas, or transferring the lease. There is at most one
485+
// change per store in the set.
469486
//
470-
// Some the state inside each *pendingReplicaChange is mutable at arbitrary
471-
// points in time by the code inside this package (with the relevant locking,
472-
// of course). Currently, this state is gcTime, enactedAtTime. Neither of it
473-
// is read by the public methods on PendingRangeChange.
474-
//
475-
// TODO(sumeer): when we expand the set of mutable fields, make a deep copy.
487+
// The clusterState or anything contained in it, does not contain
488+
// PendingRangeChange, and instead individual *pendingReplicaChange are stored
489+
// in various maps and slices. Note that *pendingReplicaChanges contain
490+
// mutable fields.
476491
type PendingRangeChange struct {
477492
RangeID roachpb.RangeID
478493
pendingReplicaChanges []*pendingReplicaChange
@@ -504,6 +519,49 @@ func MakePendingRangeChange(rangeID roachpb.RangeID, changes []ReplicaChange) Pe
504519
}
505520
}
506521

522+
func (prc PendingRangeChange) String() string {
523+
return redact.StringWithoutMarkers(prc)
524+
}
525+
526+
// SafeFormat implements the redact.SafeFormatter interface.
527+
//
528+
// This is adhoc for debugging. A nicer string format would include the
529+
// previous state and next state.
530+
func (prc PendingRangeChange) SafeFormat(w redact.SafePrinter, _ rune) {
531+
w.Printf("r%v=[", prc.RangeID)
532+
nextAddOrChangeReplicaStr := func(next ReplicaType) string {
533+
if next.ReplicaType == roachpb.NON_VOTER {
534+
return "non-voter"
535+
}
536+
if next.IsLeaseholder {
537+
return "voter-leaseholder"
538+
}
539+
return "voter"
540+
}
541+
for i, c := range prc.pendingReplicaChanges {
542+
if i > 0 {
543+
w.Print(" ")
544+
}
545+
w.Printf("id:%d", c.changeID)
546+
switch c.replicaChangeType() {
547+
case Unknown:
548+
w.Printf("unknown-change:s%v", c.target.StoreID)
549+
case AddLease:
550+
w.Printf("add-lease:s%v", c.target.StoreID)
551+
case RemoveLease:
552+
w.Printf("remove-lease:s%v", c.target.StoreID)
553+
case AddReplica:
554+
w.Printf("add-%s:s%v", nextAddOrChangeReplicaStr(c.next.ReplicaType), c.target.StoreID)
555+
case RemoveReplica:
556+
w.Printf("remove-replica:s%v", c.target.StoreID)
557+
case ChangeReplica:
558+
w.Printf("change-to-%s:s%v", nextAddOrChangeReplicaStr(c.next.ReplicaType),
559+
c.target.StoreID)
560+
}
561+
}
562+
w.Print("]")
563+
}
564+
507565
// StringForTesting prints the untransformed internal state for testing.
508566
func (prc PendingRangeChange) StringForTesting() string {
509567
var b strings.Builder
@@ -524,6 +582,36 @@ func (prc PendingRangeChange) SortForTesting() {
524582
})
525583
}
526584

585+
// isPureTransferLease returns true if the pending range change is purely a
586+
// transfer lease operation (i.e., it is not a combined replication change and
587+
// lease transfer).
588+
//
589+
// TODO(sumeer): this is a duplicate of
590+
// ExternalRangeChange.IsPureTransferLease. Consider unifying.
591+
func (prc PendingRangeChange) isPureTransferLease() bool {
592+
if len(prc.pendingReplicaChanges) != 2 {
593+
return false
594+
}
595+
var addLease, removeLease int
596+
for _, c := range prc.pendingReplicaChanges {
597+
switch c.replicaChangeType() {
598+
case AddLease:
599+
addLease++
600+
case RemoveLease:
601+
removeLease++
602+
default:
603+
// Any other change type is not a pure lease transfer (e.g. they
604+
// require a replication change).
605+
return false
606+
}
607+
}
608+
if addLease != 1 || removeLease != 1 {
609+
panic(errors.AssertionFailedf("unexpected add (%d) or remove lease (%d) in lease transfer",
610+
addLease, removeLease))
611+
}
612+
return true
613+
}
614+
527615
// pendingReplicaChange is a proposed change to a single replica. Some
528616
// external entity (the leaseholder of the range) may choose to enact this
529617
// change. It may not be enacted if it will cause some invariant (like the
@@ -1852,7 +1940,7 @@ func (cs *clusterState) addPendingRangeChange(change PendingRangeChange) {
18521940
// below.
18531941

18541942
gcDuration := pendingReplicaChangeGCDuration
1855-
if change.IsTransferLease() {
1943+
if change.isPureTransferLease() {
18561944
// Only the lease is being transferred.
18571945
gcDuration = pendingLeaseTransferGCDuration
18581946
}

0 commit comments

Comments
 (0)