diff --git a/pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel b/pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel index 4eb139ce93a7..267d86f320d5 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "load.go", "memo_helper.go", "messages.go", + "range_change.go", "rebalance_advisor.go", "store_load_summary.go", "store_set.go", diff --git a/pkg/kv/kvserver/allocator/mmaprototype/allocator.go b/pkg/kv/kvserver/allocator/mmaprototype/allocator.go index a79a47c1f1cb..dc9751176a9d 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/allocator.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/allocator.go @@ -56,16 +56,16 @@ type Allocator interface { // Calls to AdjustPendingChangeDisposition must be correctly sequenced with // full state updates from the local node provided in // ProcessNodeLoadResponse. - AdjustPendingChangeDisposition(change PendingRangeChange, success bool) + AdjustPendingChangeDisposition(change ExternalRangeChange, success bool) // RegisterExternalChange informs this allocator about yet to complete // changes to the cluster which were not initiated by this allocator. The // ownership of all state inside change is handed off to the callee. If ok - // is true, the change was registered, and the caller should subsequently - // use the same change in a subsequent call to + // is true, the change was registered, and the caller is returned an + // ExternalRangeChange that it should subsequently use in a call to // AdjustPendingChangeDisposition when the changes are completed, either // successfully or not. If ok is false, the change was not registered. - RegisterExternalChange(change PendingRangeChange) (ok bool) + RegisterExternalChange(change PendingRangeChange) (_ ExternalRangeChange, ok bool) // ComputeChanges is called periodically and frequently, say every 10s. // @@ -82,7 +82,7 @@ type Allocator interface { // the allocator, to avoid re-proposing the same change and to make // adjustments to the load. ComputeChanges( - ctx context.Context, msg *StoreLeaseholderMsg, opts ChangeOptions) []PendingRangeChange + ctx context.Context, msg *StoreLeaseholderMsg, opts ChangeOptions) []ExternalRangeChange // AdminRelocateOne is a helper for AdminRelocateRange. // diff --git a/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go b/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go index 95d935189861..3e53d4580c08 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go @@ -254,7 +254,7 @@ func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoad } // AdjustPendingChangeDisposition implements the Allocator interface. -func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChange, success bool) { +func (a *allocatorState) AdjustPendingChangeDisposition(change ExternalRangeChange, success bool) { a.mu.Lock() defer a.mu.Unlock() rs, ok := a.cs.ranges[change.RangeID] @@ -274,7 +274,7 @@ func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChang // can be made about whether these changes will be found in the allocator's // state. We gather the found changes. var changes []*pendingReplicaChange - for _, c := range change.pendingReplicaChanges { + for _, c := range change.Changes { ch, ok := a.cs.pendingChanges[c.changeID] if !ok { continue @@ -303,25 +303,27 @@ func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChang } // RegisterExternalChange implements the Allocator interface. -func (a *allocatorState) RegisterExternalChange(change PendingRangeChange) (ok bool) { +func (a *allocatorState) RegisterExternalChange( + change PendingRangeChange, +) (_ ExternalRangeChange, ok bool) { a.mu.Lock() defer a.mu.Unlock() if err := a.cs.preCheckOnApplyReplicaChanges(change); err != nil { a.mmaMetrics.ExternalFailedToRegister.Inc(1) log.KvDistribution.Infof(context.Background(), "did not register external changes: due to %v", err) - return false + return ExternalRangeChange{}, false } else { a.mmaMetrics.ExternaRegisterSuccess.Inc(1) } a.cs.addPendingRangeChange(change) - return true + return MakeExternalRangeChange(change), true } // ComputeChanges implements the Allocator interface. func (a *allocatorState) ComputeChanges( ctx context.Context, msg *StoreLeaseholderMsg, opts ChangeOptions, -) []PendingRangeChange { +) []ExternalRangeChange { a.mu.Lock() defer a.mu.Unlock() if msg.StoreID != opts.LocalStoreID { diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go index 225860d49426..d15c82c96222 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go @@ -14,7 +14,6 @@ import ( "strings" "time" - "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -160,6 +159,10 @@ func (s ReplicaChangeType) String() string { } } +func replicaExists(replicaID roachpb.ReplicaID) bool { + return replicaID >= 0 || replicaID == unknownReplicaID +} + // ReplicaChange describes a change to a replica. type ReplicaChange struct { // The load this change adds to a store. The values will be negative if the @@ -220,9 +223,6 @@ type ReplicaChange struct { // So the above comment is incorrect. We should clean this up. prev ReplicaState next ReplicaIDAndType - - // replicaChangeType is a function of (prev, next) as described above. - replicaChangeType ReplicaChangeType } func (rc ReplicaChange) String() string { @@ -231,30 +231,50 @@ func (rc ReplicaChange) String() string { // SafeFormat implements the redact.SafeFormatter interface. func (rc ReplicaChange) SafeFormat(w redact.SafePrinter, _ rune) { - w.Printf("r%v type: %v target store %v (%v)->(%v)", rc.rangeID, rc.replicaChangeType, rc.target, rc.prev, rc.next) + w.Printf("r%v type: %v target store %v (%v)->(%v)", rc.rangeID, rc.replicaChangeType(), rc.target, rc.prev, rc.next) } // isRemoval returns true if the change is a removal of a replica. func (rc ReplicaChange) isRemoval() bool { - return rc.replicaChangeType == RemoveReplica + return rc.replicaChangeType() == RemoveReplica } // isAddition returns true if the change is an addition of a replica. func (rc ReplicaChange) isAddition() bool { - return rc.replicaChangeType == AddReplica + return rc.replicaChangeType() == AddReplica } // isUpdate returns true if the change is an update to the replica type or // leaseholder status. This includes promotion/demotion changes. func (rc ReplicaChange) isUpdate() bool { - return rc.replicaChangeType == AddLease || rc.replicaChangeType == RemoveLease || - rc.replicaChangeType == ChangeReplica + changeType := rc.replicaChangeType() + return changeType == AddLease || changeType == RemoveLease || changeType == ChangeReplica } -// isPromoDemo returns true if the change is a promotion or demotion of a -// replica (potentially with a lease change). -func (rc ReplicaChange) isPromoDemo() bool { - return rc.replicaChangeType == ChangeReplica +func (rc ReplicaChange) replicaChangeType() ReplicaChangeType { + prevExists := replicaExists(rc.prev.ReplicaID) + nextExists := replicaExists(rc.next.ReplicaID) + if !prevExists && !nextExists { + return Unknown + } + if prevExists && !nextExists { + return RemoveReplica + } + // INVARIANT: nextExists. + + if !prevExists { + return AddReplica + } + if rc.prev.ReplicaType.ReplicaType == rc.next.ReplicaType.ReplicaType { + if rc.prev.ReplicaType.IsLeaseholder == rc.next.ReplicaType.IsLeaseholder { + return Unknown + } + if rc.next.IsLeaseholder { + return AddLease + } + return RemoveLease + } + return ChangeReplica } func MakeLeaseTransferChanges( @@ -298,18 +318,16 @@ func MakeLeaseTransferChanges( } removeLease := ReplicaChange{ - target: removeTarget, - rangeID: rangeID, - prev: remove.ReplicaState, - next: remove.ReplicaIDAndType, - replicaChangeType: RemoveLease, + target: removeTarget, + rangeID: rangeID, + prev: remove.ReplicaState, + next: remove.ReplicaIDAndType, } addLease := ReplicaChange{ - target: addTarget, - rangeID: rangeID, - prev: add.ReplicaState, - next: add.ReplicaIDAndType, - replicaChangeType: AddLease, + target: addTarget, + rangeID: rangeID, + prev: add.ReplicaState, + next: add.ReplicaIDAndType, } removeLease.next.IsLeaseholder = false addLease.next.IsLeaseholder = true @@ -344,8 +362,7 @@ func MakeAddReplicaChange( ReplicaID: noReplicaID, }, }, - next: replicaIDAndType, - replicaChangeType: AddReplica, + next: replicaIDAndType, } addReplica.next.ReplicaID = unknownReplicaID addReplica.loadDelta.add(loadVectorToAdd(rLoad.Load)) @@ -374,7 +391,6 @@ func MakeRemoveReplicaChange( next: ReplicaIDAndType{ ReplicaID: noReplicaID, }, - replicaChangeType: RemoveReplica, } removeReplica.loadDelta.subtract(rLoad.Load) if replicaState.IsLeaseholder { @@ -399,11 +415,10 @@ func MakeReplicaTypeChange( next.ReplicaID = unknownReplicaID next.ReplicaType.ReplicaType = mapReplicaTypeToVoterOrNonVoter(next.ReplicaType.ReplicaType) change := ReplicaChange{ - target: target, - rangeID: rangeID, - prev: prev, - next: next, - replicaChangeType: ChangeReplica, + target: target, + rangeID: rangeID, + prev: prev, + next: next, } if next.IsLeaseholder { change.secondaryLoadDelta[LeaseCount] = 1 @@ -454,26 +469,25 @@ func mapReplicaTypeToVoterOrNonVoter(rType roachpb.ReplicaType) roachpb.ReplicaT } } -// PendingRangeChange is a proposed set of change(s) to a range. It can -// consist of multiple pending replica changes, such as adding or removing -// replicas, or transferring the lease. There is at most one change per store -// in the set. -// -// NB: pendingReplicaChanges is not visible outside the package, so we can be -// certain that callers outside this package that hold a PendingRangeChange -// cannot mutate the internals other than clearing the state. -// -// Additionally, for a PendingRangeChange returned outside the package, we -// ensure that the pendingReplicaChanges slice itself is not shared with the -// rangeState.pendingChanges slice since the rangeState.pendingChanges slice -// can have entries removed from it (and swapped around as part of removal). +// TODO(sumeer): place PendingRangeChange in a different file after +// https://github.com/cockroachdb/cockroach/pull/158024 merges. // -// Some the state inside each *pendingReplicaChange is mutable at arbitrary -// points in time by the code inside this package (with the relevant locking, -// of course). Currently, this state is gcTime, enactedAtTime. Neither of it -// is read by the public methods on PendingRangeChange. +// TODO(sumeer): PendingRangeChange is exported only because external callers +// need to be able to represent load changes when calling +// RegisterExternalChange. The ExternalRangeChange type does not represent +// load. There is possibly a better way to structure this, by including the +// LoadVector and SecondaryLoadVector in the ExternalRangeChange type, and +// unexporting PendingRangeChange. + +// PendingRangeChange is a container for a proposed set of change(s) to a +// range. It can consist of multiple pending replica changes, such as adding +// or removing replicas, or transferring the lease. There is at most one +// change per store in the set. // -// TODO(sumeer): when we expand the set of mutable fields, make a deep copy. +// The clusterState or anything contained in it, does not contain +// PendingRangeChange, and instead individual *pendingReplicaChange are stored +// in various maps and slices. Note that *pendingReplicaChanges contain +// mutable fields. type PendingRangeChange struct { RangeID roachpb.RangeID pendingReplicaChanges []*pendingReplicaChange @@ -515,24 +529,35 @@ func (prc PendingRangeChange) String() string { // previous state and next state. func (prc PendingRangeChange) SafeFormat(w redact.SafePrinter, _ rune) { w.Printf("r%v=[", prc.RangeID) - found := false - if prc.IsTransferLease() { - w.Printf("transfer_to=%v", prc.LeaseTransferTarget()) - found = true - } - if prc.IsChangeReplicas() { - w.Printf("change_replicas=%v", prc.ReplicationChanges()) - found = true - } - if !found { - panic("unknown change type") + nextAddOrChangeReplicaStr := func(next ReplicaType) string { + if next.ReplicaType == roachpb.NON_VOTER { + return "non-voter" + } + if next.IsLeaseholder { + return "voter-leaseholder" + } + return "voter" } - w.Print(" cids=") for i, c := range prc.pendingReplicaChanges { if i > 0 { - w.Print(",") + w.Print(" ") + } + w.Printf("id:%d", c.changeID) + switch c.replicaChangeType() { + case Unknown: + w.Printf("unknown-change:s%v", c.target.StoreID) + case AddLease: + w.Printf("add-lease:s%v", c.target.StoreID) + case RemoveLease: + w.Printf("remove-lease:s%v", c.target.StoreID) + case AddReplica: + w.Printf("add-%s:s%v", nextAddOrChangeReplicaStr(c.next.ReplicaType), c.target.StoreID) + case RemoveReplica: + w.Printf("remove-replica:s%v", c.target.StoreID) + case ChangeReplica: + w.Printf("change-to-%s:s%v", nextAddOrChangeReplicaStr(c.next.ReplicaType), + c.target.StoreID) } - w.Printf("%v", c.changeID) } w.Print("]") } @@ -557,156 +582,34 @@ func (prc PendingRangeChange) SortForTesting() { }) } -// TODO(sumeer): A single PendingRangeChange can model a bunch of replica -// changes and a lease transfer. Classifying the change as either -// IsChangeReplicas or IsTransferLease is unnecessarily limiting. The only -// code that really relies on this is integration code: -// mma_store_rebalancer.go, allocator_sync.go, asim. We should fix those and -// consider removing these methods. - -// IsChangeReplicas returns true if the pending range change is a change -// replicas operation. -func (prc PendingRangeChange) IsChangeReplicas() bool { - for _, c := range prc.pendingReplicaChanges { - if c.isAddition() || c.isRemoval() || c.isPromoDemo() { - continue - } else { - return false - } - } - return true -} - -// IsTransferLease returns true if the pending range change is a transfer lease -// operation. -func (prc PendingRangeChange) IsTransferLease() bool { +// isPureTransferLease returns true if the pending range change is purely a +// transfer lease operation (i.e., it is not a combined replication change and +// lease transfer). +// +// TODO(sumeer): this is a duplicate of +// ExternalRangeChange.IsPureTransferLease. Consider unifying. +func (prc PendingRangeChange) isPureTransferLease() bool { if len(prc.pendingReplicaChanges) != 2 { return false } - var foundAddLease, foundRemoveLease bool + var addLease, removeLease int for _, c := range prc.pendingReplicaChanges { - if c.isAddition() || c.isRemoval() || c.isPromoDemo() { - // Any changes to the replica type or replicaID are not lease transfers, - // since they require a replication change. - return false - } - if c.prev.IsLeaseholder && !c.next.IsLeaseholder { - foundRemoveLease = true - } else if !c.prev.IsLeaseholder && c.next.IsLeaseholder { - foundAddLease = true - } else { - return false - } - } - return foundAddLease && foundRemoveLease -} - -// ReplicationChanges returns the replication changes for the pending range -// change. It panics if the pending range change is not a change replicas -// operation. -// -// TODO(tbg): The ReplicationChanges can include a new leaseholder replica, -// but the incoming leaseholder is not explicitly represented in -// kvpb.ReplicationChanges. This is an existing modeling deficiency in the -// kvserver code. In Replica.maybeTransferLeaseDuringLeaveJoint the first -// VOTER_INCOMING is considered the new leaseholder. So the code below places -// the new leaseholder (an ADD_VOTER) at index 0. It is not clear whether this -// is sufficient for all integration use-cases. Verify and fix as needed. -// -// TODO(sumeer): this method is limiting, since a single PendingRangeChange -// should be allowed to model any set of changes (see the existing TODO on -// IsChangeReplicas). -func (prc PendingRangeChange) ReplicationChanges() kvpb.ReplicationChanges { - if !prc.IsChangeReplicas() { - panic("RangeChange is not a change replicas") - } - chgs := make([]kvpb.ReplicationChange, 0, len(prc.pendingReplicaChanges)) - newLeaseholderIndex := -1 - for _, c := range prc.pendingReplicaChanges { - switch c.replicaChangeType { - case ChangeReplica, AddReplica, RemoveReplica: - // These are the only permitted cases. + switch c.replicaChangeType() { + case AddLease: + addLease++ + case RemoveLease: + removeLease++ default: - panic(errors.AssertionFailedf("change type %v is not a change replicas", c.replicaChangeType)) - } - // The kvserver code represents a change in replica type as an - // addition and a removal of the same replica. For example, if a - // replica changes from VOTER_FULL to NON_VOTER, we will emit a pair - // of {ADD_NON_VOTER, REMOVE_VOTER} for the replica. The ordering of - // this pair does not matter. - // - // TODO(tbg): confirm that the ordering does not matter. - if c.replicaChangeType == ChangeReplica || c.replicaChangeType == AddReplica { - chg := kvpb.ReplicationChange{Target: c.target} - isNewLeaseholder := false - switch c.next.ReplicaType.ReplicaType { - case roachpb.VOTER_FULL: - chg.ChangeType = roachpb.ADD_VOTER - if c.next.IsLeaseholder { - isNewLeaseholder = true - } - case roachpb.NON_VOTER: - chg.ChangeType = roachpb.ADD_NON_VOTER - default: - panic(errors.AssertionFailedf("unexpected replica type %s", c.next.ReplicaType.ReplicaType)) - } - if isNewLeaseholder { - if newLeaseholderIndex >= 0 { - panic(errors.AssertionFailedf( - "multiple new leaseholders in change replicas")) - } - newLeaseholderIndex = len(chgs) - } - chgs = append(chgs, chg) - } - if c.replicaChangeType == ChangeReplica || c.replicaChangeType == RemoveReplica { - chg := kvpb.ReplicationChange{Target: c.target} - prevType := mapReplicaTypeToVoterOrNonVoter(c.prev.ReplicaType.ReplicaType) - switch prevType { - case roachpb.VOTER_FULL: - chg.ChangeType = roachpb.REMOVE_VOTER - case roachpb.NON_VOTER: - chg.ChangeType = roachpb.REMOVE_NON_VOTER - default: - panic(errors.AssertionFailedf("unexpected replica type %s", c.prev.ReplicaType.ReplicaType)) - } - chgs = append(chgs, chg) - } - } - if newLeaseholderIndex >= 0 { - // Move the new leaseholder to index 0. - chgs[0], chgs[newLeaseholderIndex] = chgs[newLeaseholderIndex], chgs[0] - } - return chgs -} - -// LeaseTransferTarget returns the store ID of the store that is the target of -// the lease transfer. It panics if the pending range change is not a transfer -// lease operation. -func (prc PendingRangeChange) LeaseTransferTarget() roachpb.StoreID { - if !prc.IsTransferLease() { - panic("pendingRangeChange is not a lease transfer") - } - for _, c := range prc.pendingReplicaChanges { - if !c.prev.IsLeaseholder && c.next.IsLeaseholder { - return c.target.StoreID + // Any other change type is not a pure lease transfer (e.g. they + // require a replication change). + return false } } - panic("unreachable") -} - -// LeaseTransferFrom returns the store ID of the store that is the source of -// the lease transfer. It panics if the pending range change is not a -func (prc PendingRangeChange) LeaseTransferFrom() roachpb.StoreID { - if !prc.IsTransferLease() { - panic("pendingRangeChange is not a lease transfer") - } - for _, c := range prc.pendingReplicaChanges { - if c.prev.IsLeaseholder && !c.next.IsLeaseholder { - return c.target.StoreID - } + if addLease != 1 || removeLease != 1 { + panic(errors.AssertionFailedf("unexpected add (%d) or remove lease (%d) in lease transfer", + addLease, removeLease)) } - panic("unreachable") + return true } // pendingReplicaChange is a proposed change to a single replica. Some @@ -2037,7 +1940,7 @@ func (cs *clusterState) addPendingRangeChange(change PendingRangeChange) { // below. gcDuration := pendingReplicaChangeGCDuration - if change.IsTransferLease() { + if change.isPureTransferLease() { // Only the lease is being transferred. gcDuration = pendingLeaseTransferGCDuration } diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index daa084af54a1..774816bd5c06 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -35,7 +35,7 @@ type rebalanceEnv struct { // dsm is the diversity scoring memo for computing diversity scores. dsm *diversityScoringMemo // changes accumulates the pending range changes made during rebalancing. - changes []PendingRangeChange + changes []ExternalRangeChange // rangeMoveCount tracks the number of range moves made. rangeMoveCount int // maxRangeMoveCount is the maximum number of range moves allowed in the @@ -121,7 +121,7 @@ type sheddingStore struct { // chance to shed leases. func (re *rebalanceEnv) rebalanceStores( ctx context.Context, localStoreID roachpb.StoreID, -) []PendingRangeChange { +) []ExternalRangeChange { re.mmaid++ id := re.mmaid ctx = logtags.AddTag(ctx, "mmaid", id) @@ -501,11 +501,11 @@ func (re *rebalanceEnv) rebalanceReplicas( replicaChanges, rangeID)) } re.addPendingRangeChange(rangeChange) - re.changes = append(re.changes, rangeChange) + re.changes = append(re.changes, MakeExternalRangeChange(rangeChange)) re.rangeMoveCount++ log.KvDistribution.VEventf(ctx, 2, "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v", - rangeID, removeTarget.StoreID, addTarget.StoreID, re.changes[len(re.changes)-1], ss.adjusted.load, targetSS.adjusted.load) + rangeID, removeTarget.StoreID, addTarget.StoreID, &re.changes[len(re.changes)-1], ss.adjusted.load, targetSS.adjusted.load) } } @@ -674,14 +674,11 @@ func (re *rebalanceEnv) rebalanceLeasesFromLocalStoreID( panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange)) } re.addPendingRangeChange(leaseChange) - re.changes = append(re.changes, leaseChange) - if re.changes[len(re.changes)-1].IsChangeReplicas() || !re.changes[len(re.changes)-1].IsTransferLease() { - panic(fmt.Sprintf("lease transfer is invalid: %v", re.changes[len(re.changes)-1])) - } + re.changes = append(re.changes, MakeExternalRangeChange(leaseChange)) log.KvDistribution.Infof(ctx, "result(success): shedding r%v lease from s%v to s%v [change:%v] with "+ "resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))", - rangeID, removeTarget.StoreID, addTarget.StoreID, re.changes[len(re.changes)-1], + rangeID, removeTarget.StoreID, addTarget.StoreID, &re.changes[len(re.changes)-1], ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load, ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease, targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/range_change.go b/pkg/kv/kvserver/allocator/mmaprototype/range_change.go new file mode 100644 index 000000000000..0d357f1e4383 --- /dev/null +++ b/pkg/kv/kvserver/allocator/mmaprototype/range_change.go @@ -0,0 +1,245 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package mmaprototype + +import ( + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" +) + +// ExternalRangeChange is a proposed set of change(s) to a range. It can +// consist of multiple replica changes, such as adding or removing replicas, +// or transferring the lease. There is at most one change per store in the +// set. +type ExternalRangeChange struct { + roachpb.RangeID + Changes []ExternalReplicaChange +} + +// ExternalReplicaChange is a proposed change to a single replica. Some +// external entity (the leaseholder of the range) may choose to enact this +// change. +type ExternalReplicaChange struct { + changeID + // Target is the target {store, node} for the change. + Target roachpb.ReplicationTarget + // Prev is the state before the change. See the detailed comment in + // ReplicaChange.prev, which is shared by this field (except that this is + // immutable). + Prev ReplicaIDAndType + // Next is the state after the change. See the detailed comment in + // ReplicaChange.next, which is shared by this field. + Next ReplicaIDAndType + // ChangeType is a function of (Prev, Next) and a convenience. + ChangeType ReplicaChangeType +} + +func MakeExternalRangeChange(change PendingRangeChange) ExternalRangeChange { + changes := make([]ExternalReplicaChange, len(change.pendingReplicaChanges)) + for i, rc := range change.pendingReplicaChanges { + changeType := rc.replicaChangeType() + if changeType == Unknown { + panic(errors.AssertionFailedf("unknown replica change type")) + } + changes[i] = ExternalReplicaChange{ + changeID: rc.changeID, + Target: rc.target, + Prev: rc.prev.ReplicaIDAndType, + Next: rc.next, + ChangeType: changeType, + } + } + return ExternalRangeChange{ + RangeID: change.RangeID, + Changes: changes, + } +} + +func (rc *ExternalRangeChange) String() string { + return redact.StringWithoutMarkers(rc) +} + +// SafeFormat implements the redact.SafeFormatter interface. +// +// This is adhoc for debugging. A nicer string format would include the +// previous state and next state. +func (rc *ExternalRangeChange) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("r%v=[", rc.RangeID) + found := false + if rc.IsPureTransferLease() { + w.Printf("transfer_to=%v", rc.LeaseTransferTarget()) + found = true + } + if rc.IsChangeReplicas() { + w.Printf("change_replicas=%v", rc.ReplicationChanges()) + found = true + } + if !found { + panic("unknown change type") + } + w.Print(" cids=") + for i, c := range rc.Changes { + if i > 0 { + w.Print(",") + } + w.Printf("%v", c.changeID) + } + w.Print("]") +} + +// TODO(sumeer): A single ExternalRangeChange can model a bunch of replica +// changes and a lease transfer. Classifying the change as either +// IsChangeReplicas or IsPureTransferLease is unnecessarily limiting. The only +// code that really relies on this is integration code: +// mma_store_rebalancer.go, allocator_sync.go, asim. We should fix those and +// consider removing these methods. + +// IsChangeReplicas returns true if the range change is a change replicas +// operation. +func (rc *ExternalRangeChange) IsChangeReplicas() bool { + for _, c := range rc.Changes { + if c.ChangeType == AddLease || c.ChangeType == RemoveLease { + return false + } + } + return true +} + +// IsPureTransferLease returns true if the range change is purely a transfer +// lease operation (i.e., it is not a combined replication change and lease +// transfer). +func (rc *ExternalRangeChange) IsPureTransferLease() bool { + if len(rc.Changes) != 2 { + return false + } + var addLease, removeLease int + for _, c := range rc.Changes { + switch c.ChangeType { + case AddLease: + addLease++ + case RemoveLease: + removeLease++ + default: + // Any other change type is not a pure lease transfer (e.g. they + // require a replication change). + return false + } + } + if addLease != 1 || removeLease != 1 { + panic(errors.AssertionFailedf("unexpected add (%d) or remove lease (%d) in lease transfer", + addLease, removeLease)) + } + return true +} + +// ReplicationChanges returns the replication changes for the range change. It +// panics if the range change is not a change replicas operation. +// +// TODO(tbg): The ReplicationChanges can include a new leaseholder replica, +// but the incoming leaseholder is not explicitly represented in +// kvpb.ReplicationChanges. This is an existing modeling deficiency in the +// kvserver code. In Replica.maybeTransferLeaseDuringLeaveJoint the first +// VOTER_INCOMING is considered the new leaseholder. So the code below places +// the new leaseholder (an ADD_VOTER) at index 0. It is not clear whether this +// is sufficient for all integration use-cases. Verify and fix as needed. +// +// TODO(sumeer): this method is limiting, since a single PendingRangeChange +// should be allowed to model any set of changes (see the existing TODO on +// IsChangeReplicas). +func (rc *ExternalRangeChange) ReplicationChanges() kvpb.ReplicationChanges { + if !rc.IsChangeReplicas() { + panic("RangeChange is not a change replicas") + } + chgs := make([]kvpb.ReplicationChange, 0, len(rc.Changes)) + newLeaseholderIndex := -1 + for _, c := range rc.Changes { + switch c.ChangeType { + case ChangeReplica, AddReplica, RemoveReplica: + // These are the only permitted cases. + default: + panic(errors.AssertionFailedf("change type %v is not a change replicas", c.ChangeType)) + } + // The kvserver code represents a change in replica type as an + // addition and a removal of the same replica. For example, if a + // replica changes from VOTER_FULL to NON_VOTER, we will emit a pair + // of {ADD_NON_VOTER, REMOVE_VOTER} for the replica. The ordering of + // this pair does not matter. + // + // TODO(tbg): confirm that the ordering does not matter. + if c.ChangeType == ChangeReplica || c.ChangeType == AddReplica { + chg := kvpb.ReplicationChange{Target: c.Target} + isNewLeaseholder := false + switch c.Next.ReplicaType.ReplicaType { + case roachpb.VOTER_FULL: + chg.ChangeType = roachpb.ADD_VOTER + if c.Next.IsLeaseholder { + isNewLeaseholder = true + } + case roachpb.NON_VOTER: + chg.ChangeType = roachpb.ADD_NON_VOTER + default: + panic(errors.AssertionFailedf("unexpected replica type %s", c.Next.ReplicaType.ReplicaType)) + } + if isNewLeaseholder { + if newLeaseholderIndex >= 0 { + panic(errors.AssertionFailedf( + "multiple new leaseholders in change replicas")) + } + newLeaseholderIndex = len(chgs) + } + chgs = append(chgs, chg) + } + if c.ChangeType == ChangeReplica || c.ChangeType == RemoveReplica { + chg := kvpb.ReplicationChange{Target: c.Target} + prevType := mapReplicaTypeToVoterOrNonVoter(c.Prev.ReplicaType.ReplicaType) + switch prevType { + case roachpb.VOTER_FULL: + chg.ChangeType = roachpb.REMOVE_VOTER + case roachpb.NON_VOTER: + chg.ChangeType = roachpb.REMOVE_NON_VOTER + default: + panic(errors.AssertionFailedf("unexpected replica type %s", c.Prev.ReplicaType.ReplicaType)) + } + chgs = append(chgs, chg) + } + } + if newLeaseholderIndex >= 0 { + // Move the new leaseholder to index 0. + chgs[0], chgs[newLeaseholderIndex] = chgs[newLeaseholderIndex], chgs[0] + } + return chgs +} + +// LeaseTransferTarget returns the store ID of the store that is the target of +// the lease transfer. It panics if the range change is not a transfer lease +// operation. +func (rc *ExternalRangeChange) LeaseTransferTarget() roachpb.StoreID { + if !rc.IsPureTransferLease() { + panic("pendingRangeChange is not a lease transfer") + } + for _, c := range rc.Changes { + if !c.Prev.IsLeaseholder && c.Next.IsLeaseholder { + return c.Target.StoreID + } + } + panic("unreachable") +} + +// LeaseTransferFrom returns the store ID of the store that is the source of +// the lease transfer. It panics if the range change is not a transfer lease. +func (rc *ExternalRangeChange) LeaseTransferFrom() roachpb.StoreID { + if !rc.IsPureTransferLease() { + panic("pendingRangeChange is not a lease transfer") + } + for _, c := range rc.Changes { + if c.Prev.IsLeaseholder && !c.Next.IsLeaseholder { + return c.Target.StoreID + } + } + panic("unreachable") +} diff --git a/pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go b/pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go index 9c37825049c0..5c4f524dbb59 100644 --- a/pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go +++ b/pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go @@ -56,7 +56,7 @@ type MMAStoreRebalancer struct { } type pendingChangeAndRangeUsageInfo struct { - change mmaprototype.PendingRangeChange + change mmaprototype.ExternalRangeChange usage allocator.RangeUsageInfo syncChangeID mmaintegration.SyncChangeID } @@ -174,7 +174,7 @@ func (msr *MMAStoreRebalancer) Tick(ctx context.Context, tick time.Time, s state // Record the last time a lease transfer was requested. for _, change := range msr.pendingChanges { - if change.change.IsTransferLease() { + if change.change.IsPureTransferLease() { msr.lastLeaseTransfer = tick } } @@ -187,7 +187,7 @@ func (msr *MMAStoreRebalancer) Tick(ctx context.Context, tick time.Time, s state } var curOp op.ControlledOperation - if msr.pendingChanges[msr.pendingChangeIdx].change.IsTransferLease() { + if msr.pendingChanges[msr.pendingChangeIdx].change.IsPureTransferLease() { curOp = op.NewTransferLeaseOp( tick, curChange.change.RangeID, diff --git a/pkg/kv/kvserver/mma_store_rebalancer.go b/pkg/kv/kvserver/mma_store_rebalancer.go index 4f7474e73c38..a322ba9e01ec 100644 --- a/pkg/kv/kvserver/mma_store_rebalancer.go +++ b/pkg/kv/kvserver/mma_store_rebalancer.go @@ -150,7 +150,7 @@ func (m *mmaStoreRebalancer) rebalance(ctx context.Context) bool { // applyChange safely applies a single change to the store. It handles the case // where the replica might not exist and provides proper error handling. func (m *mmaStoreRebalancer) applyChange( - ctx context.Context, change mmaprototype.PendingRangeChange, + ctx context.Context, change mmaprototype.ExternalRangeChange, ) error { repl := m.store.GetReplicaIfExists(change.RangeID) if repl == nil { @@ -160,7 +160,7 @@ func (m *mmaStoreRebalancer) applyChange( changeID := m.as.MMAPreApply(ctx, repl.RangeUsageInfo(), change) var err error switch { - case change.IsTransferLease(): + case change.IsPureTransferLease(): err = m.applyLeaseTransfer(ctx, repl, change) case change.IsChangeReplicas(): err = m.applyReplicaChanges(ctx, repl, change) @@ -175,7 +175,7 @@ func (m *mmaStoreRebalancer) applyChange( // applyLeaseTransfer applies a lease transfer change. func (m *mmaStoreRebalancer) applyLeaseTransfer( - ctx context.Context, repl replicaToApplyChanges, change mmaprototype.PendingRangeChange, + ctx context.Context, repl replicaToApplyChanges, change mmaprototype.ExternalRangeChange, ) error { return repl.AdminTransferLease( ctx, @@ -186,7 +186,7 @@ func (m *mmaStoreRebalancer) applyLeaseTransfer( // applyReplicaChanges applies replica membership changes. func (m *mmaStoreRebalancer) applyReplicaChanges( - ctx context.Context, repl replicaToApplyChanges, change mmaprototype.PendingRangeChange, + ctx context.Context, repl replicaToApplyChanges, change mmaprototype.ExternalRangeChange, ) error { // TODO(mma): We should be setting a timeout on the ctx here, in the case // where rebalancing takes a long time (stuck behind other snapshots). diff --git a/pkg/kv/kvserver/mmaintegration/BUILD.bazel b/pkg/kv/kvserver/mmaintegration/BUILD.bazel index 882e42b41157..82ef339e28dc 100644 --- a/pkg/kv/kvserver/mmaintegration/BUILD.bazel +++ b/pkg/kv/kvserver/mmaintegration/BUILD.bazel @@ -34,6 +34,7 @@ go_test( deps = [ "//pkg/kv/kvpb", "//pkg/kv/kvserver/allocator", + "//pkg/kv/kvserver/allocator/mmaprototype", "//pkg/roachpb", "//pkg/testutils/datapathutils", "//pkg/util/leaktest", diff --git a/pkg/kv/kvserver/mmaintegration/allocator_op.go b/pkg/kv/kvserver/mmaintegration/allocator_op.go index baf0e23028e3..6e72374f69cf 100644 --- a/pkg/kv/kvserver/mmaintegration/allocator_op.go +++ b/pkg/kv/kvserver/mmaintegration/allocator_op.go @@ -22,7 +22,7 @@ type trackedAllocatorChange struct { // change could not be registered with mma, in which case PostApply must not // inform mma. isMMARegistered bool - mmaChange mmaprototype.PendingRangeChange + mmaChange mmaprototype.ExternalRangeChange // Usage is range load usage. usage allocator.RangeUsageInfo // Exactly one of the following two fields will be set. diff --git a/pkg/kv/kvserver/mmaintegration/allocator_sync.go b/pkg/kv/kvserver/mmaintegration/allocator_sync.go index c0a25773544a..b9c10c7ed0ab 100644 --- a/pkg/kv/kvserver/mmaintegration/allocator_sync.go +++ b/pkg/kv/kvserver/mmaintegration/allocator_sync.go @@ -44,10 +44,11 @@ type storePool interface { type mmaState interface { // RegisterExternalChange is called by the allocator sync to register // external changes with the mma. - RegisterExternalChange(change mmaprototype.PendingRangeChange) (ok bool) + RegisterExternalChange( + change mmaprototype.PendingRangeChange) (_ mmaprototype.ExternalRangeChange, ok bool) // AdjustPendingChangeDisposition is called by the allocator sync to adjust // the disposition of pending changes to a range. - AdjustPendingChangeDisposition(change mmaprototype.PendingRangeChange, success bool) + AdjustPendingChangeDisposition(change mmaprototype.ExternalRangeChange, success bool) // BuildMMARebalanceAdvisor is called by the allocator sync to build a // MMARebalanceAdvisor for the given existing store and candidates. The // advisor should be later passed to IsInConflictWithMMA to determine if a @@ -151,10 +152,10 @@ func (as *AllocatorSync) NonMMAPreTransferLease( transferFrom, transferTo roachpb.ReplicationTarget, ) SyncChangeID { var isMMARegistered bool - var mmaChange mmaprototype.PendingRangeChange + var mmaChange mmaprototype.ExternalRangeChange if kvserverbase.LoadBasedRebalancingModeIsMMA(&as.st.SV) { - mmaChange = convertLeaseTransferToMMA(desc, usage, transferFrom, transferTo) - isMMARegistered = as.mmaAllocator.RegisterExternalChange(mmaChange) + change := convertLeaseTransferToMMA(desc, usage, transferFrom, transferTo) + mmaChange, isMMARegistered = as.mmaAllocator.RegisterExternalChange(change) } trackedChange := trackedAllocatorChange{ isMMARegistered: isMMARegistered, @@ -181,14 +182,14 @@ func (as *AllocatorSync) NonMMAPreChangeReplicas( leaseholderStoreID roachpb.StoreID, ) SyncChangeID { var isMMARegistered bool - var mmaChange mmaprototype.PendingRangeChange + var mmaChange mmaprototype.ExternalRangeChange if kvserverbase.LoadBasedRebalancingModeIsMMA(&as.st.SV) { var err error - mmaChange, err = convertReplicaChangeToMMA(desc, usage, changes, leaseholderStoreID) + change, err := convertReplicaChangeToMMA(desc, usage, changes, leaseholderStoreID) if err != nil { log.KvDistribution.Errorf(ctx, "failed to convert replica change to mma: %v", err) } else { - isMMARegistered = as.mmaAllocator.RegisterExternalChange(mmaChange) + mmaChange, isMMARegistered = as.mmaAllocator.RegisterExternalChange(change) } } trackedChange := trackedAllocatorChange{ @@ -212,7 +213,7 @@ func (as *AllocatorSync) NonMMAPreChangeReplicas( func (as *AllocatorSync) MMAPreApply( ctx context.Context, usage allocator.RangeUsageInfo, - pendingChange mmaprototype.PendingRangeChange, + pendingChange mmaprototype.ExternalRangeChange, ) SyncChangeID { trackedChange := trackedAllocatorChange{ isMMARegistered: true, @@ -220,7 +221,7 @@ func (as *AllocatorSync) MMAPreApply( usage: usage, } switch { - case pendingChange.IsTransferLease(): + case pendingChange.IsPureTransferLease(): trackedChange.leaseTransferOp = &leaseTransferOp{ transferFrom: pendingChange.LeaseTransferFrom(), transferTo: pendingChange.LeaseTransferTarget(), @@ -242,7 +243,7 @@ func (as *AllocatorSync) MMAPreApply( // MarkChangeAsFailed marks the given changes to the range as failed without // going through allocator sync. This is used when mma changes fail before // even registering with mma via MMAPreApply. -func (as *AllocatorSync) MarkChangeAsFailed(change mmaprototype.PendingRangeChange) { +func (as *AllocatorSync) MarkChangeAsFailed(change mmaprototype.ExternalRangeChange) { as.mmaAllocator.AdjustPendingChangeDisposition(change, false /* success */) } diff --git a/pkg/kv/kvserver/mmaintegration/mma_conversion_test.go b/pkg/kv/kvserver/mmaintegration/mma_conversion_test.go index a6b057277f83..2e31c548af91 100644 --- a/pkg/kv/kvserver/mmaintegration/mma_conversion_test.go +++ b/pkg/kv/kvserver/mmaintegration/mma_conversion_test.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -113,8 +114,12 @@ func TestConvertReplicaChangeToMMA(t *testing.T) { mmaChanges.SortForTesting() var b strings.Builder fmt.Fprintf(&b, "PendingRangeChange: %s", mmaChanges.StringForTesting()) - if mmaChanges.IsChangeReplicas() { - fmt.Fprintf(&b, "As kvpb.ReplicationChanges:\n %v\n", mmaChanges.ReplicationChanges()) + externalChange := mmaprototype.MakeExternalRangeChange(mmaChanges) + if externalChange.IsChangeReplicas() { + fmt.Fprintf(&b, "As kvpb.ReplicationChanges:\n %v\n", externalChange.ReplicationChanges()) + } else if externalChange.IsPureTransferLease() { + fmt.Fprintf(&b, "Lease transfer from s%v to s%v\n", + externalChange.LeaseTransferFrom(), externalChange.LeaseTransferTarget()) } return b.String() }