From d90d646fa7a68b25dfd8da847e92a9f912a0a857 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Fri, 21 Nov 2025 09:39:15 -0500 Subject: [PATCH 1/2] mma: move some methods of PendingRangeChange This is in preparation for them becoming methods on ExternalRangeChange. --- .../allocator/mmaprototype/BUILD.bazel | 1 + .../allocator/mmaprototype/cluster_state.go | 185 ---------------- .../allocator/mmaprototype/range_change.go | 197 ++++++++++++++++++ 3 files changed, 198 insertions(+), 185 deletions(-) create mode 100644 pkg/kv/kvserver/allocator/mmaprototype/range_change.go diff --git a/pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel b/pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel index 4eb139ce93a7..267d86f320d5 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel +++ b/pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "load.go", "memo_helper.go", "messages.go", + "range_change.go", "rebalance_advisor.go", "store_load_summary.go", "store_set.go", diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go index 225860d49426..8047c04d0975 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go @@ -14,7 +14,6 @@ import ( "strings" "time" - "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -505,38 +504,6 @@ func MakePendingRangeChange(rangeID roachpb.RangeID, changes []ReplicaChange) Pe } } -func (prc PendingRangeChange) String() string { - return redact.StringWithoutMarkers(prc) -} - -// SafeFormat implements the redact.SafeFormatter interface. -// -// This is adhoc for debugging. A nicer string format would include the -// previous state and next state. -func (prc PendingRangeChange) SafeFormat(w redact.SafePrinter, _ rune) { - w.Printf("r%v=[", prc.RangeID) - found := false - if prc.IsTransferLease() { - w.Printf("transfer_to=%v", prc.LeaseTransferTarget()) - found = true - } - if prc.IsChangeReplicas() { - w.Printf("change_replicas=%v", prc.ReplicationChanges()) - found = true - } - if !found { - panic("unknown change type") - } - w.Print(" cids=") - for i, c := range prc.pendingReplicaChanges { - if i > 0 { - w.Print(",") - } - w.Printf("%v", c.changeID) - } - w.Print("]") -} - // StringForTesting prints the untransformed internal state for testing. func (prc PendingRangeChange) StringForTesting() string { var b strings.Builder @@ -557,158 +524,6 @@ func (prc PendingRangeChange) SortForTesting() { }) } -// TODO(sumeer): A single PendingRangeChange can model a bunch of replica -// changes and a lease transfer. Classifying the change as either -// IsChangeReplicas or IsTransferLease is unnecessarily limiting. The only -// code that really relies on this is integration code: -// mma_store_rebalancer.go, allocator_sync.go, asim. We should fix those and -// consider removing these methods. - -// IsChangeReplicas returns true if the pending range change is a change -// replicas operation. -func (prc PendingRangeChange) IsChangeReplicas() bool { - for _, c := range prc.pendingReplicaChanges { - if c.isAddition() || c.isRemoval() || c.isPromoDemo() { - continue - } else { - return false - } - } - return true -} - -// IsTransferLease returns true if the pending range change is a transfer lease -// operation. -func (prc PendingRangeChange) IsTransferLease() bool { - if len(prc.pendingReplicaChanges) != 2 { - return false - } - var foundAddLease, foundRemoveLease bool - for _, c := range prc.pendingReplicaChanges { - if c.isAddition() || c.isRemoval() || c.isPromoDemo() { - // Any changes to the replica type or replicaID are not lease transfers, - // since they require a replication change. - return false - } - if c.prev.IsLeaseholder && !c.next.IsLeaseholder { - foundRemoveLease = true - } else if !c.prev.IsLeaseholder && c.next.IsLeaseholder { - foundAddLease = true - } else { - return false - } - } - return foundAddLease && foundRemoveLease -} - -// ReplicationChanges returns the replication changes for the pending range -// change. It panics if the pending range change is not a change replicas -// operation. -// -// TODO(tbg): The ReplicationChanges can include a new leaseholder replica, -// but the incoming leaseholder is not explicitly represented in -// kvpb.ReplicationChanges. This is an existing modeling deficiency in the -// kvserver code. In Replica.maybeTransferLeaseDuringLeaveJoint the first -// VOTER_INCOMING is considered the new leaseholder. So the code below places -// the new leaseholder (an ADD_VOTER) at index 0. It is not clear whether this -// is sufficient for all integration use-cases. Verify and fix as needed. -// -// TODO(sumeer): this method is limiting, since a single PendingRangeChange -// should be allowed to model any set of changes (see the existing TODO on -// IsChangeReplicas). -func (prc PendingRangeChange) ReplicationChanges() kvpb.ReplicationChanges { - if !prc.IsChangeReplicas() { - panic("RangeChange is not a change replicas") - } - chgs := make([]kvpb.ReplicationChange, 0, len(prc.pendingReplicaChanges)) - newLeaseholderIndex := -1 - for _, c := range prc.pendingReplicaChanges { - switch c.replicaChangeType { - case ChangeReplica, AddReplica, RemoveReplica: - // These are the only permitted cases. - default: - panic(errors.AssertionFailedf("change type %v is not a change replicas", c.replicaChangeType)) - } - // The kvserver code represents a change in replica type as an - // addition and a removal of the same replica. For example, if a - // replica changes from VOTER_FULL to NON_VOTER, we will emit a pair - // of {ADD_NON_VOTER, REMOVE_VOTER} for the replica. The ordering of - // this pair does not matter. - // - // TODO(tbg): confirm that the ordering does not matter. - if c.replicaChangeType == ChangeReplica || c.replicaChangeType == AddReplica { - chg := kvpb.ReplicationChange{Target: c.target} - isNewLeaseholder := false - switch c.next.ReplicaType.ReplicaType { - case roachpb.VOTER_FULL: - chg.ChangeType = roachpb.ADD_VOTER - if c.next.IsLeaseholder { - isNewLeaseholder = true - } - case roachpb.NON_VOTER: - chg.ChangeType = roachpb.ADD_NON_VOTER - default: - panic(errors.AssertionFailedf("unexpected replica type %s", c.next.ReplicaType.ReplicaType)) - } - if isNewLeaseholder { - if newLeaseholderIndex >= 0 { - panic(errors.AssertionFailedf( - "multiple new leaseholders in change replicas")) - } - newLeaseholderIndex = len(chgs) - } - chgs = append(chgs, chg) - } - if c.replicaChangeType == ChangeReplica || c.replicaChangeType == RemoveReplica { - chg := kvpb.ReplicationChange{Target: c.target} - prevType := mapReplicaTypeToVoterOrNonVoter(c.prev.ReplicaType.ReplicaType) - switch prevType { - case roachpb.VOTER_FULL: - chg.ChangeType = roachpb.REMOVE_VOTER - case roachpb.NON_VOTER: - chg.ChangeType = roachpb.REMOVE_NON_VOTER - default: - panic(errors.AssertionFailedf("unexpected replica type %s", c.prev.ReplicaType.ReplicaType)) - } - chgs = append(chgs, chg) - } - } - if newLeaseholderIndex >= 0 { - // Move the new leaseholder to index 0. - chgs[0], chgs[newLeaseholderIndex] = chgs[newLeaseholderIndex], chgs[0] - } - return chgs -} - -// LeaseTransferTarget returns the store ID of the store that is the target of -// the lease transfer. It panics if the pending range change is not a transfer -// lease operation. -func (prc PendingRangeChange) LeaseTransferTarget() roachpb.StoreID { - if !prc.IsTransferLease() { - panic("pendingRangeChange is not a lease transfer") - } - for _, c := range prc.pendingReplicaChanges { - if !c.prev.IsLeaseholder && c.next.IsLeaseholder { - return c.target.StoreID - } - } - panic("unreachable") -} - -// LeaseTransferFrom returns the store ID of the store that is the source of -// the lease transfer. It panics if the pending range change is not a -func (prc PendingRangeChange) LeaseTransferFrom() roachpb.StoreID { - if !prc.IsTransferLease() { - panic("pendingRangeChange is not a lease transfer") - } - for _, c := range prc.pendingReplicaChanges { - if c.prev.IsLeaseholder && !c.next.IsLeaseholder { - return c.target.StoreID - } - } - panic("unreachable") -} - // pendingReplicaChange is a proposed change to a single replica. Some // external entity (the leaseholder of the range) may choose to enact this // change. It may not be enacted if it will cause some invariant (like the diff --git a/pkg/kv/kvserver/allocator/mmaprototype/range_change.go b/pkg/kv/kvserver/allocator/mmaprototype/range_change.go new file mode 100644 index 000000000000..71c9cca1ae72 --- /dev/null +++ b/pkg/kv/kvserver/allocator/mmaprototype/range_change.go @@ -0,0 +1,197 @@ +// Copyright 2025 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package mmaprototype + +import ( + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" +) + +func (prc PendingRangeChange) String() string { + return redact.StringWithoutMarkers(prc) +} + +// SafeFormat implements the redact.SafeFormatter interface. +// +// This is adhoc for debugging. A nicer string format would include the +// previous state and next state. +func (prc PendingRangeChange) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("r%v=[", prc.RangeID) + found := false + if prc.IsTransferLease() { + w.Printf("transfer_to=%v", prc.LeaseTransferTarget()) + found = true + } + if prc.IsChangeReplicas() { + w.Printf("change_replicas=%v", prc.ReplicationChanges()) + found = true + } + if !found { + panic("unknown change type") + } + w.Print(" cids=") + for i, c := range prc.pendingReplicaChanges { + if i > 0 { + w.Print(",") + } + w.Printf("%v", c.changeID) + } + w.Print("]") +} + +// TODO(sumeer): A single PendingRangeChange can model a bunch of replica +// changes and a lease transfer. Classifying the change as either +// IsChangeReplicas or IsTransferLease is unnecessarily limiting. The only +// code that really relies on this is integration code: +// mma_store_rebalancer.go, allocator_sync.go, asim. We should fix those and +// consider removing these methods. + +// IsChangeReplicas returns true if the pending range change is a change +// replicas operation. +func (prc PendingRangeChange) IsChangeReplicas() bool { + for _, c := range prc.pendingReplicaChanges { + if c.isAddition() || c.isRemoval() || c.isPromoDemo() { + continue + } else { + return false + } + } + return true +} + +// IsTransferLease returns true if the pending range change is a transfer lease +// operation. +func (prc PendingRangeChange) IsTransferLease() bool { + if len(prc.pendingReplicaChanges) != 2 { + return false + } + var foundAddLease, foundRemoveLease bool + for _, c := range prc.pendingReplicaChanges { + if c.isAddition() || c.isRemoval() || c.isPromoDemo() { + // Any changes to the replica type or replicaID are not lease transfers, + // since they require a replication change. + return false + } + if c.prev.IsLeaseholder && !c.next.IsLeaseholder { + foundRemoveLease = true + } else if !c.prev.IsLeaseholder && c.next.IsLeaseholder { + foundAddLease = true + } else { + return false + } + } + return foundAddLease && foundRemoveLease +} + +// ReplicationChanges returns the replication changes for the pending range +// change. It panics if the pending range change is not a change replicas +// operation. +// +// TODO(tbg): The ReplicationChanges can include a new leaseholder replica, +// but the incoming leaseholder is not explicitly represented in +// kvpb.ReplicationChanges. This is an existing modeling deficiency in the +// kvserver code. In Replica.maybeTransferLeaseDuringLeaveJoint the first +// VOTER_INCOMING is considered the new leaseholder. So the code below places +// the new leaseholder (an ADD_VOTER) at index 0. It is not clear whether this +// is sufficient for all integration use-cases. Verify and fix as needed. +// +// TODO(sumeer): this method is limiting, since a single PendingRangeChange +// should be allowed to model any set of changes (see the existing TODO on +// IsChangeReplicas). +func (prc PendingRangeChange) ReplicationChanges() kvpb.ReplicationChanges { + if !prc.IsChangeReplicas() { + panic("RangeChange is not a change replicas") + } + chgs := make([]kvpb.ReplicationChange, 0, len(prc.pendingReplicaChanges)) + newLeaseholderIndex := -1 + for _, c := range prc.pendingReplicaChanges { + switch c.replicaChangeType { + case ChangeReplica, AddReplica, RemoveReplica: + // These are the only permitted cases. + default: + panic(errors.AssertionFailedf("change type %v is not a change replicas", c.replicaChangeType)) + } + // The kvserver code represents a change in replica type as an + // addition and a removal of the same replica. For example, if a + // replica changes from VOTER_FULL to NON_VOTER, we will emit a pair + // of {ADD_NON_VOTER, REMOVE_VOTER} for the replica. The ordering of + // this pair does not matter. + // + // TODO(tbg): confirm that the ordering does not matter. + if c.replicaChangeType == ChangeReplica || c.replicaChangeType == AddReplica { + chg := kvpb.ReplicationChange{Target: c.target} + isNewLeaseholder := false + switch c.next.ReplicaType.ReplicaType { + case roachpb.VOTER_FULL: + chg.ChangeType = roachpb.ADD_VOTER + if c.next.IsLeaseholder { + isNewLeaseholder = true + } + case roachpb.NON_VOTER: + chg.ChangeType = roachpb.ADD_NON_VOTER + default: + panic(errors.AssertionFailedf("unexpected replica type %s", c.next.ReplicaType.ReplicaType)) + } + if isNewLeaseholder { + if newLeaseholderIndex >= 0 { + panic(errors.AssertionFailedf( + "multiple new leaseholders in change replicas")) + } + newLeaseholderIndex = len(chgs) + } + chgs = append(chgs, chg) + } + if c.replicaChangeType == ChangeReplica || c.replicaChangeType == RemoveReplica { + chg := kvpb.ReplicationChange{Target: c.target} + prevType := mapReplicaTypeToVoterOrNonVoter(c.prev.ReplicaType.ReplicaType) + switch prevType { + case roachpb.VOTER_FULL: + chg.ChangeType = roachpb.REMOVE_VOTER + case roachpb.NON_VOTER: + chg.ChangeType = roachpb.REMOVE_NON_VOTER + default: + panic(errors.AssertionFailedf("unexpected replica type %s", c.prev.ReplicaType.ReplicaType)) + } + chgs = append(chgs, chg) + } + } + if newLeaseholderIndex >= 0 { + // Move the new leaseholder to index 0. + chgs[0], chgs[newLeaseholderIndex] = chgs[newLeaseholderIndex], chgs[0] + } + return chgs +} + +// LeaseTransferTarget returns the store ID of the store that is the target of +// the lease transfer. It panics if the pending range change is not a transfer +// lease operation. +func (prc PendingRangeChange) LeaseTransferTarget() roachpb.StoreID { + if !prc.IsTransferLease() { + panic("pendingRangeChange is not a lease transfer") + } + for _, c := range prc.pendingReplicaChanges { + if !c.prev.IsLeaseholder && c.next.IsLeaseholder { + return c.target.StoreID + } + } + panic("unreachable") +} + +// LeaseTransferFrom returns the store ID of the store that is the source of +// the lease transfer. It panics if the pending range change is not a +func (prc PendingRangeChange) LeaseTransferFrom() roachpb.StoreID { + if !prc.IsTransferLease() { + panic("pendingRangeChange is not a lease transfer") + } + for _, c := range prc.pendingReplicaChanges { + if c.prev.IsLeaseholder && !c.next.IsLeaseholder { + return c.target.StoreID + } + } + panic("unreachable") +} From d2b2bb3bbc742b7d2d0c44ab6a90a1e2dfa0d7ca Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Thu, 20 Nov 2025 11:23:31 -0500 Subject: [PATCH 2/2] mma: make external caller mostly use an ExternalRangeChange An ExternalRangeChange does not have the currently mutable fields of PendingRangeChange (and the set of mutable fields is going to expand in the near future). Todos related to limitation of viewing a change as a "change replicas" or "lease transfer" etc., are moved to ExternalRangeChange, so compartmentalized to methods that are called by integration code outside the mma package. There is still some potential future cleanup involving unexporting PendingRangeChange, which is not in scope for this PR. The PendingRangeChange becomes a temporary container for a slice of []*pendingReplicaChanges. Printing this struct no longer involves viewing it as a "change replicas" or "lease transfer". Furthermore, the ReplicaChange.replicaChangeType field is removed and the change type is now a function of prev and next. This sets us up for making prev mutable in a future PR. Informs #157049 Epic: CRDB-55052 Release note: None --- .../allocator/mmaprototype/allocator.go | 10 +- .../allocator/mmaprototype/allocator_state.go | 14 +- .../allocator/mmaprototype/cluster_state.go | 186 +++++++++++++----- .../cluster_state_rebalance_stores.go | 15 +- .../allocator/mmaprototype/range_change.go | 180 ++++++++++------- .../mmaintegration/mma_store_rebalancer.go | 6 +- pkg/kv/kvserver/mma_store_rebalancer.go | 8 +- pkg/kv/kvserver/mmaintegration/BUILD.bazel | 1 + .../kvserver/mmaintegration/allocator_op.go | 2 +- .../kvserver/mmaintegration/allocator_sync.go | 23 +-- .../mmaintegration/mma_conversion_test.go | 9 +- 11 files changed, 298 insertions(+), 156 deletions(-) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/allocator.go b/pkg/kv/kvserver/allocator/mmaprototype/allocator.go index a79a47c1f1cb..dc9751176a9d 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/allocator.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/allocator.go @@ -56,16 +56,16 @@ type Allocator interface { // Calls to AdjustPendingChangeDisposition must be correctly sequenced with // full state updates from the local node provided in // ProcessNodeLoadResponse. - AdjustPendingChangeDisposition(change PendingRangeChange, success bool) + AdjustPendingChangeDisposition(change ExternalRangeChange, success bool) // RegisterExternalChange informs this allocator about yet to complete // changes to the cluster which were not initiated by this allocator. The // ownership of all state inside change is handed off to the callee. If ok - // is true, the change was registered, and the caller should subsequently - // use the same change in a subsequent call to + // is true, the change was registered, and the caller is returned an + // ExternalRangeChange that it should subsequently use in a call to // AdjustPendingChangeDisposition when the changes are completed, either // successfully or not. If ok is false, the change was not registered. - RegisterExternalChange(change PendingRangeChange) (ok bool) + RegisterExternalChange(change PendingRangeChange) (_ ExternalRangeChange, ok bool) // ComputeChanges is called periodically and frequently, say every 10s. // @@ -82,7 +82,7 @@ type Allocator interface { // the allocator, to avoid re-proposing the same change and to make // adjustments to the load. ComputeChanges( - ctx context.Context, msg *StoreLeaseholderMsg, opts ChangeOptions) []PendingRangeChange + ctx context.Context, msg *StoreLeaseholderMsg, opts ChangeOptions) []ExternalRangeChange // AdminRelocateOne is a helper for AdminRelocateRange. // diff --git a/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go b/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go index 95d935189861..3e53d4580c08 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go @@ -254,7 +254,7 @@ func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoad } // AdjustPendingChangeDisposition implements the Allocator interface. -func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChange, success bool) { +func (a *allocatorState) AdjustPendingChangeDisposition(change ExternalRangeChange, success bool) { a.mu.Lock() defer a.mu.Unlock() rs, ok := a.cs.ranges[change.RangeID] @@ -274,7 +274,7 @@ func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChang // can be made about whether these changes will be found in the allocator's // state. We gather the found changes. var changes []*pendingReplicaChange - for _, c := range change.pendingReplicaChanges { + for _, c := range change.Changes { ch, ok := a.cs.pendingChanges[c.changeID] if !ok { continue @@ -303,25 +303,27 @@ func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChang } // RegisterExternalChange implements the Allocator interface. -func (a *allocatorState) RegisterExternalChange(change PendingRangeChange) (ok bool) { +func (a *allocatorState) RegisterExternalChange( + change PendingRangeChange, +) (_ ExternalRangeChange, ok bool) { a.mu.Lock() defer a.mu.Unlock() if err := a.cs.preCheckOnApplyReplicaChanges(change); err != nil { a.mmaMetrics.ExternalFailedToRegister.Inc(1) log.KvDistribution.Infof(context.Background(), "did not register external changes: due to %v", err) - return false + return ExternalRangeChange{}, false } else { a.mmaMetrics.ExternaRegisterSuccess.Inc(1) } a.cs.addPendingRangeChange(change) - return true + return MakeExternalRangeChange(change), true } // ComputeChanges implements the Allocator interface. func (a *allocatorState) ComputeChanges( ctx context.Context, msg *StoreLeaseholderMsg, opts ChangeOptions, -) []PendingRangeChange { +) []ExternalRangeChange { a.mu.Lock() defer a.mu.Unlock() if msg.StoreID != opts.LocalStoreID { diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go index 8047c04d0975..d15c82c96222 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go @@ -159,6 +159,10 @@ func (s ReplicaChangeType) String() string { } } +func replicaExists(replicaID roachpb.ReplicaID) bool { + return replicaID >= 0 || replicaID == unknownReplicaID +} + // ReplicaChange describes a change to a replica. type ReplicaChange struct { // The load this change adds to a store. The values will be negative if the @@ -219,9 +223,6 @@ type ReplicaChange struct { // So the above comment is incorrect. We should clean this up. prev ReplicaState next ReplicaIDAndType - - // replicaChangeType is a function of (prev, next) as described above. - replicaChangeType ReplicaChangeType } func (rc ReplicaChange) String() string { @@ -230,30 +231,50 @@ func (rc ReplicaChange) String() string { // SafeFormat implements the redact.SafeFormatter interface. func (rc ReplicaChange) SafeFormat(w redact.SafePrinter, _ rune) { - w.Printf("r%v type: %v target store %v (%v)->(%v)", rc.rangeID, rc.replicaChangeType, rc.target, rc.prev, rc.next) + w.Printf("r%v type: %v target store %v (%v)->(%v)", rc.rangeID, rc.replicaChangeType(), rc.target, rc.prev, rc.next) } // isRemoval returns true if the change is a removal of a replica. func (rc ReplicaChange) isRemoval() bool { - return rc.replicaChangeType == RemoveReplica + return rc.replicaChangeType() == RemoveReplica } // isAddition returns true if the change is an addition of a replica. func (rc ReplicaChange) isAddition() bool { - return rc.replicaChangeType == AddReplica + return rc.replicaChangeType() == AddReplica } // isUpdate returns true if the change is an update to the replica type or // leaseholder status. This includes promotion/demotion changes. func (rc ReplicaChange) isUpdate() bool { - return rc.replicaChangeType == AddLease || rc.replicaChangeType == RemoveLease || - rc.replicaChangeType == ChangeReplica + changeType := rc.replicaChangeType() + return changeType == AddLease || changeType == RemoveLease || changeType == ChangeReplica } -// isPromoDemo returns true if the change is a promotion or demotion of a -// replica (potentially with a lease change). -func (rc ReplicaChange) isPromoDemo() bool { - return rc.replicaChangeType == ChangeReplica +func (rc ReplicaChange) replicaChangeType() ReplicaChangeType { + prevExists := replicaExists(rc.prev.ReplicaID) + nextExists := replicaExists(rc.next.ReplicaID) + if !prevExists && !nextExists { + return Unknown + } + if prevExists && !nextExists { + return RemoveReplica + } + // INVARIANT: nextExists. + + if !prevExists { + return AddReplica + } + if rc.prev.ReplicaType.ReplicaType == rc.next.ReplicaType.ReplicaType { + if rc.prev.ReplicaType.IsLeaseholder == rc.next.ReplicaType.IsLeaseholder { + return Unknown + } + if rc.next.IsLeaseholder { + return AddLease + } + return RemoveLease + } + return ChangeReplica } func MakeLeaseTransferChanges( @@ -297,18 +318,16 @@ func MakeLeaseTransferChanges( } removeLease := ReplicaChange{ - target: removeTarget, - rangeID: rangeID, - prev: remove.ReplicaState, - next: remove.ReplicaIDAndType, - replicaChangeType: RemoveLease, + target: removeTarget, + rangeID: rangeID, + prev: remove.ReplicaState, + next: remove.ReplicaIDAndType, } addLease := ReplicaChange{ - target: addTarget, - rangeID: rangeID, - prev: add.ReplicaState, - next: add.ReplicaIDAndType, - replicaChangeType: AddLease, + target: addTarget, + rangeID: rangeID, + prev: add.ReplicaState, + next: add.ReplicaIDAndType, } removeLease.next.IsLeaseholder = false addLease.next.IsLeaseholder = true @@ -343,8 +362,7 @@ func MakeAddReplicaChange( ReplicaID: noReplicaID, }, }, - next: replicaIDAndType, - replicaChangeType: AddReplica, + next: replicaIDAndType, } addReplica.next.ReplicaID = unknownReplicaID addReplica.loadDelta.add(loadVectorToAdd(rLoad.Load)) @@ -373,7 +391,6 @@ func MakeRemoveReplicaChange( next: ReplicaIDAndType{ ReplicaID: noReplicaID, }, - replicaChangeType: RemoveReplica, } removeReplica.loadDelta.subtract(rLoad.Load) if replicaState.IsLeaseholder { @@ -398,11 +415,10 @@ func MakeReplicaTypeChange( next.ReplicaID = unknownReplicaID next.ReplicaType.ReplicaType = mapReplicaTypeToVoterOrNonVoter(next.ReplicaType.ReplicaType) change := ReplicaChange{ - target: target, - rangeID: rangeID, - prev: prev, - next: next, - replicaChangeType: ChangeReplica, + target: target, + rangeID: rangeID, + prev: prev, + next: next, } if next.IsLeaseholder { change.secondaryLoadDelta[LeaseCount] = 1 @@ -453,26 +469,25 @@ func mapReplicaTypeToVoterOrNonVoter(rType roachpb.ReplicaType) roachpb.ReplicaT } } -// PendingRangeChange is a proposed set of change(s) to a range. It can -// consist of multiple pending replica changes, such as adding or removing -// replicas, or transferring the lease. There is at most one change per store -// in the set. -// -// NB: pendingReplicaChanges is not visible outside the package, so we can be -// certain that callers outside this package that hold a PendingRangeChange -// cannot mutate the internals other than clearing the state. +// TODO(sumeer): place PendingRangeChange in a different file after +// https://github.com/cockroachdb/cockroach/pull/158024 merges. // -// Additionally, for a PendingRangeChange returned outside the package, we -// ensure that the pendingReplicaChanges slice itself is not shared with the -// rangeState.pendingChanges slice since the rangeState.pendingChanges slice -// can have entries removed from it (and swapped around as part of removal). +// TODO(sumeer): PendingRangeChange is exported only because external callers +// need to be able to represent load changes when calling +// RegisterExternalChange. The ExternalRangeChange type does not represent +// load. There is possibly a better way to structure this, by including the +// LoadVector and SecondaryLoadVector in the ExternalRangeChange type, and +// unexporting PendingRangeChange. + +// PendingRangeChange is a container for a proposed set of change(s) to a +// range. It can consist of multiple pending replica changes, such as adding +// or removing replicas, or transferring the lease. There is at most one +// change per store in the set. // -// Some the state inside each *pendingReplicaChange is mutable at arbitrary -// points in time by the code inside this package (with the relevant locking, -// of course). Currently, this state is gcTime, enactedAtTime. Neither of it -// is read by the public methods on PendingRangeChange. -// -// TODO(sumeer): when we expand the set of mutable fields, make a deep copy. +// The clusterState or anything contained in it, does not contain +// PendingRangeChange, and instead individual *pendingReplicaChange are stored +// in various maps and slices. Note that *pendingReplicaChanges contain +// mutable fields. type PendingRangeChange struct { RangeID roachpb.RangeID pendingReplicaChanges []*pendingReplicaChange @@ -504,6 +519,49 @@ func MakePendingRangeChange(rangeID roachpb.RangeID, changes []ReplicaChange) Pe } } +func (prc PendingRangeChange) String() string { + return redact.StringWithoutMarkers(prc) +} + +// SafeFormat implements the redact.SafeFormatter interface. +// +// This is adhoc for debugging. A nicer string format would include the +// previous state and next state. +func (prc PendingRangeChange) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("r%v=[", prc.RangeID) + nextAddOrChangeReplicaStr := func(next ReplicaType) string { + if next.ReplicaType == roachpb.NON_VOTER { + return "non-voter" + } + if next.IsLeaseholder { + return "voter-leaseholder" + } + return "voter" + } + for i, c := range prc.pendingReplicaChanges { + if i > 0 { + w.Print(" ") + } + w.Printf("id:%d", c.changeID) + switch c.replicaChangeType() { + case Unknown: + w.Printf("unknown-change:s%v", c.target.StoreID) + case AddLease: + w.Printf("add-lease:s%v", c.target.StoreID) + case RemoveLease: + w.Printf("remove-lease:s%v", c.target.StoreID) + case AddReplica: + w.Printf("add-%s:s%v", nextAddOrChangeReplicaStr(c.next.ReplicaType), c.target.StoreID) + case RemoveReplica: + w.Printf("remove-replica:s%v", c.target.StoreID) + case ChangeReplica: + w.Printf("change-to-%s:s%v", nextAddOrChangeReplicaStr(c.next.ReplicaType), + c.target.StoreID) + } + } + w.Print("]") +} + // StringForTesting prints the untransformed internal state for testing. func (prc PendingRangeChange) StringForTesting() string { var b strings.Builder @@ -524,6 +582,36 @@ func (prc PendingRangeChange) SortForTesting() { }) } +// isPureTransferLease returns true if the pending range change is purely a +// transfer lease operation (i.e., it is not a combined replication change and +// lease transfer). +// +// TODO(sumeer): this is a duplicate of +// ExternalRangeChange.IsPureTransferLease. Consider unifying. +func (prc PendingRangeChange) isPureTransferLease() bool { + if len(prc.pendingReplicaChanges) != 2 { + return false + } + var addLease, removeLease int + for _, c := range prc.pendingReplicaChanges { + switch c.replicaChangeType() { + case AddLease: + addLease++ + case RemoveLease: + removeLease++ + default: + // Any other change type is not a pure lease transfer (e.g. they + // require a replication change). + return false + } + } + if addLease != 1 || removeLease != 1 { + panic(errors.AssertionFailedf("unexpected add (%d) or remove lease (%d) in lease transfer", + addLease, removeLease)) + } + return true +} + // pendingReplicaChange is a proposed change to a single replica. Some // external entity (the leaseholder of the range) may choose to enact this // change. It may not be enacted if it will cause some invariant (like the @@ -1852,7 +1940,7 @@ func (cs *clusterState) addPendingRangeChange(change PendingRangeChange) { // below. gcDuration := pendingReplicaChangeGCDuration - if change.IsTransferLease() { + if change.isPureTransferLease() { // Only the lease is being transferred. gcDuration = pendingLeaseTransferGCDuration } diff --git a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go index daa084af54a1..774816bd5c06 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go @@ -35,7 +35,7 @@ type rebalanceEnv struct { // dsm is the diversity scoring memo for computing diversity scores. dsm *diversityScoringMemo // changes accumulates the pending range changes made during rebalancing. - changes []PendingRangeChange + changes []ExternalRangeChange // rangeMoveCount tracks the number of range moves made. rangeMoveCount int // maxRangeMoveCount is the maximum number of range moves allowed in the @@ -121,7 +121,7 @@ type sheddingStore struct { // chance to shed leases. func (re *rebalanceEnv) rebalanceStores( ctx context.Context, localStoreID roachpb.StoreID, -) []PendingRangeChange { +) []ExternalRangeChange { re.mmaid++ id := re.mmaid ctx = logtags.AddTag(ctx, "mmaid", id) @@ -501,11 +501,11 @@ func (re *rebalanceEnv) rebalanceReplicas( replicaChanges, rangeID)) } re.addPendingRangeChange(rangeChange) - re.changes = append(re.changes, rangeChange) + re.changes = append(re.changes, MakeExternalRangeChange(rangeChange)) re.rangeMoveCount++ log.KvDistribution.VEventf(ctx, 2, "result(success): rebalancing r%v from s%v to s%v [change: %v] with resulting loads source: %v target: %v", - rangeID, removeTarget.StoreID, addTarget.StoreID, re.changes[len(re.changes)-1], ss.adjusted.load, targetSS.adjusted.load) + rangeID, removeTarget.StoreID, addTarget.StoreID, &re.changes[len(re.changes)-1], ss.adjusted.load, targetSS.adjusted.load) } } @@ -674,14 +674,11 @@ func (re *rebalanceEnv) rebalanceLeasesFromLocalStoreID( panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange)) } re.addPendingRangeChange(leaseChange) - re.changes = append(re.changes, leaseChange) - if re.changes[len(re.changes)-1].IsChangeReplicas() || !re.changes[len(re.changes)-1].IsTransferLease() { - panic(fmt.Sprintf("lease transfer is invalid: %v", re.changes[len(re.changes)-1])) - } + re.changes = append(re.changes, MakeExternalRangeChange(leaseChange)) log.KvDistribution.Infof(ctx, "result(success): shedding r%v lease from s%v to s%v [change:%v] with "+ "resulting loads source:%v target:%v (means: %v) (frac_pending: (src:%.2f,target:%.2f) (src:%.2f,target:%.2f))", - rangeID, removeTarget.StoreID, addTarget.StoreID, re.changes[len(re.changes)-1], + rangeID, removeTarget.StoreID, addTarget.StoreID, &re.changes[len(re.changes)-1], ss.adjusted.load, targetSS.adjusted.load, means.storeLoad.load, ss.maxFractionPendingIncrease, ss.maxFractionPendingDecrease, targetSS.maxFractionPendingIncrease, targetSS.maxFractionPendingDecrease) diff --git a/pkg/kv/kvserver/allocator/mmaprototype/range_change.go b/pkg/kv/kvserver/allocator/mmaprototype/range_change.go index 71c9cca1ae72..0d357f1e4383 100644 --- a/pkg/kv/kvserver/allocator/mmaprototype/range_change.go +++ b/pkg/kv/kvserver/allocator/mmaprototype/range_change.go @@ -12,30 +12,78 @@ import ( "github.com/cockroachdb/redact" ) -func (prc PendingRangeChange) String() string { - return redact.StringWithoutMarkers(prc) +// ExternalRangeChange is a proposed set of change(s) to a range. It can +// consist of multiple replica changes, such as adding or removing replicas, +// or transferring the lease. There is at most one change per store in the +// set. +type ExternalRangeChange struct { + roachpb.RangeID + Changes []ExternalReplicaChange +} + +// ExternalReplicaChange is a proposed change to a single replica. Some +// external entity (the leaseholder of the range) may choose to enact this +// change. +type ExternalReplicaChange struct { + changeID + // Target is the target {store, node} for the change. + Target roachpb.ReplicationTarget + // Prev is the state before the change. See the detailed comment in + // ReplicaChange.prev, which is shared by this field (except that this is + // immutable). + Prev ReplicaIDAndType + // Next is the state after the change. See the detailed comment in + // ReplicaChange.next, which is shared by this field. + Next ReplicaIDAndType + // ChangeType is a function of (Prev, Next) and a convenience. + ChangeType ReplicaChangeType +} + +func MakeExternalRangeChange(change PendingRangeChange) ExternalRangeChange { + changes := make([]ExternalReplicaChange, len(change.pendingReplicaChanges)) + for i, rc := range change.pendingReplicaChanges { + changeType := rc.replicaChangeType() + if changeType == Unknown { + panic(errors.AssertionFailedf("unknown replica change type")) + } + changes[i] = ExternalReplicaChange{ + changeID: rc.changeID, + Target: rc.target, + Prev: rc.prev.ReplicaIDAndType, + Next: rc.next, + ChangeType: changeType, + } + } + return ExternalRangeChange{ + RangeID: change.RangeID, + Changes: changes, + } +} + +func (rc *ExternalRangeChange) String() string { + return redact.StringWithoutMarkers(rc) } // SafeFormat implements the redact.SafeFormatter interface. // // This is adhoc for debugging. A nicer string format would include the // previous state and next state. -func (prc PendingRangeChange) SafeFormat(w redact.SafePrinter, _ rune) { - w.Printf("r%v=[", prc.RangeID) +func (rc *ExternalRangeChange) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("r%v=[", rc.RangeID) found := false - if prc.IsTransferLease() { - w.Printf("transfer_to=%v", prc.LeaseTransferTarget()) + if rc.IsPureTransferLease() { + w.Printf("transfer_to=%v", rc.LeaseTransferTarget()) found = true } - if prc.IsChangeReplicas() { - w.Printf("change_replicas=%v", prc.ReplicationChanges()) + if rc.IsChangeReplicas() { + w.Printf("change_replicas=%v", rc.ReplicationChanges()) found = true } if !found { panic("unknown change type") } w.Print(" cids=") - for i, c := range prc.pendingReplicaChanges { + for i, c := range rc.Changes { if i > 0 { w.Print(",") } @@ -44,53 +92,53 @@ func (prc PendingRangeChange) SafeFormat(w redact.SafePrinter, _ rune) { w.Print("]") } -// TODO(sumeer): A single PendingRangeChange can model a bunch of replica +// TODO(sumeer): A single ExternalRangeChange can model a bunch of replica // changes and a lease transfer. Classifying the change as either -// IsChangeReplicas or IsTransferLease is unnecessarily limiting. The only +// IsChangeReplicas or IsPureTransferLease is unnecessarily limiting. The only // code that really relies on this is integration code: // mma_store_rebalancer.go, allocator_sync.go, asim. We should fix those and // consider removing these methods. -// IsChangeReplicas returns true if the pending range change is a change -// replicas operation. -func (prc PendingRangeChange) IsChangeReplicas() bool { - for _, c := range prc.pendingReplicaChanges { - if c.isAddition() || c.isRemoval() || c.isPromoDemo() { - continue - } else { +// IsChangeReplicas returns true if the range change is a change replicas +// operation. +func (rc *ExternalRangeChange) IsChangeReplicas() bool { + for _, c := range rc.Changes { + if c.ChangeType == AddLease || c.ChangeType == RemoveLease { return false } } return true } -// IsTransferLease returns true if the pending range change is a transfer lease -// operation. -func (prc PendingRangeChange) IsTransferLease() bool { - if len(prc.pendingReplicaChanges) != 2 { +// IsPureTransferLease returns true if the range change is purely a transfer +// lease operation (i.e., it is not a combined replication change and lease +// transfer). +func (rc *ExternalRangeChange) IsPureTransferLease() bool { + if len(rc.Changes) != 2 { return false } - var foundAddLease, foundRemoveLease bool - for _, c := range prc.pendingReplicaChanges { - if c.isAddition() || c.isRemoval() || c.isPromoDemo() { - // Any changes to the replica type or replicaID are not lease transfers, - // since they require a replication change. - return false - } - if c.prev.IsLeaseholder && !c.next.IsLeaseholder { - foundRemoveLease = true - } else if !c.prev.IsLeaseholder && c.next.IsLeaseholder { - foundAddLease = true - } else { + var addLease, removeLease int + for _, c := range rc.Changes { + switch c.ChangeType { + case AddLease: + addLease++ + case RemoveLease: + removeLease++ + default: + // Any other change type is not a pure lease transfer (e.g. they + // require a replication change). return false } } - return foundAddLease && foundRemoveLease + if addLease != 1 || removeLease != 1 { + panic(errors.AssertionFailedf("unexpected add (%d) or remove lease (%d) in lease transfer", + addLease, removeLease)) + } + return true } -// ReplicationChanges returns the replication changes for the pending range -// change. It panics if the pending range change is not a change replicas -// operation. +// ReplicationChanges returns the replication changes for the range change. It +// panics if the range change is not a change replicas operation. // // TODO(tbg): The ReplicationChanges can include a new leaseholder replica, // but the incoming leaseholder is not explicitly represented in @@ -103,18 +151,18 @@ func (prc PendingRangeChange) IsTransferLease() bool { // TODO(sumeer): this method is limiting, since a single PendingRangeChange // should be allowed to model any set of changes (see the existing TODO on // IsChangeReplicas). -func (prc PendingRangeChange) ReplicationChanges() kvpb.ReplicationChanges { - if !prc.IsChangeReplicas() { +func (rc *ExternalRangeChange) ReplicationChanges() kvpb.ReplicationChanges { + if !rc.IsChangeReplicas() { panic("RangeChange is not a change replicas") } - chgs := make([]kvpb.ReplicationChange, 0, len(prc.pendingReplicaChanges)) + chgs := make([]kvpb.ReplicationChange, 0, len(rc.Changes)) newLeaseholderIndex := -1 - for _, c := range prc.pendingReplicaChanges { - switch c.replicaChangeType { + for _, c := range rc.Changes { + switch c.ChangeType { case ChangeReplica, AddReplica, RemoveReplica: // These are the only permitted cases. default: - panic(errors.AssertionFailedf("change type %v is not a change replicas", c.replicaChangeType)) + panic(errors.AssertionFailedf("change type %v is not a change replicas", c.ChangeType)) } // The kvserver code represents a change in replica type as an // addition and a removal of the same replica. For example, if a @@ -123,19 +171,19 @@ func (prc PendingRangeChange) ReplicationChanges() kvpb.ReplicationChanges { // this pair does not matter. // // TODO(tbg): confirm that the ordering does not matter. - if c.replicaChangeType == ChangeReplica || c.replicaChangeType == AddReplica { - chg := kvpb.ReplicationChange{Target: c.target} + if c.ChangeType == ChangeReplica || c.ChangeType == AddReplica { + chg := kvpb.ReplicationChange{Target: c.Target} isNewLeaseholder := false - switch c.next.ReplicaType.ReplicaType { + switch c.Next.ReplicaType.ReplicaType { case roachpb.VOTER_FULL: chg.ChangeType = roachpb.ADD_VOTER - if c.next.IsLeaseholder { + if c.Next.IsLeaseholder { isNewLeaseholder = true } case roachpb.NON_VOTER: chg.ChangeType = roachpb.ADD_NON_VOTER default: - panic(errors.AssertionFailedf("unexpected replica type %s", c.next.ReplicaType.ReplicaType)) + panic(errors.AssertionFailedf("unexpected replica type %s", c.Next.ReplicaType.ReplicaType)) } if isNewLeaseholder { if newLeaseholderIndex >= 0 { @@ -146,16 +194,16 @@ func (prc PendingRangeChange) ReplicationChanges() kvpb.ReplicationChanges { } chgs = append(chgs, chg) } - if c.replicaChangeType == ChangeReplica || c.replicaChangeType == RemoveReplica { - chg := kvpb.ReplicationChange{Target: c.target} - prevType := mapReplicaTypeToVoterOrNonVoter(c.prev.ReplicaType.ReplicaType) + if c.ChangeType == ChangeReplica || c.ChangeType == RemoveReplica { + chg := kvpb.ReplicationChange{Target: c.Target} + prevType := mapReplicaTypeToVoterOrNonVoter(c.Prev.ReplicaType.ReplicaType) switch prevType { case roachpb.VOTER_FULL: chg.ChangeType = roachpb.REMOVE_VOTER case roachpb.NON_VOTER: chg.ChangeType = roachpb.REMOVE_NON_VOTER default: - panic(errors.AssertionFailedf("unexpected replica type %s", c.prev.ReplicaType.ReplicaType)) + panic(errors.AssertionFailedf("unexpected replica type %s", c.Prev.ReplicaType.ReplicaType)) } chgs = append(chgs, chg) } @@ -168,29 +216,29 @@ func (prc PendingRangeChange) ReplicationChanges() kvpb.ReplicationChanges { } // LeaseTransferTarget returns the store ID of the store that is the target of -// the lease transfer. It panics if the pending range change is not a transfer -// lease operation. -func (prc PendingRangeChange) LeaseTransferTarget() roachpb.StoreID { - if !prc.IsTransferLease() { +// the lease transfer. It panics if the range change is not a transfer lease +// operation. +func (rc *ExternalRangeChange) LeaseTransferTarget() roachpb.StoreID { + if !rc.IsPureTransferLease() { panic("pendingRangeChange is not a lease transfer") } - for _, c := range prc.pendingReplicaChanges { - if !c.prev.IsLeaseholder && c.next.IsLeaseholder { - return c.target.StoreID + for _, c := range rc.Changes { + if !c.Prev.IsLeaseholder && c.Next.IsLeaseholder { + return c.Target.StoreID } } panic("unreachable") } // LeaseTransferFrom returns the store ID of the store that is the source of -// the lease transfer. It panics if the pending range change is not a -func (prc PendingRangeChange) LeaseTransferFrom() roachpb.StoreID { - if !prc.IsTransferLease() { +// the lease transfer. It panics if the range change is not a transfer lease. +func (rc *ExternalRangeChange) LeaseTransferFrom() roachpb.StoreID { + if !rc.IsPureTransferLease() { panic("pendingRangeChange is not a lease transfer") } - for _, c := range prc.pendingReplicaChanges { - if c.prev.IsLeaseholder && !c.next.IsLeaseholder { - return c.target.StoreID + for _, c := range rc.Changes { + if c.Prev.IsLeaseholder && !c.Next.IsLeaseholder { + return c.Target.StoreID } } panic("unreachable") diff --git a/pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go b/pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go index 9c37825049c0..5c4f524dbb59 100644 --- a/pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go +++ b/pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go @@ -56,7 +56,7 @@ type MMAStoreRebalancer struct { } type pendingChangeAndRangeUsageInfo struct { - change mmaprototype.PendingRangeChange + change mmaprototype.ExternalRangeChange usage allocator.RangeUsageInfo syncChangeID mmaintegration.SyncChangeID } @@ -174,7 +174,7 @@ func (msr *MMAStoreRebalancer) Tick(ctx context.Context, tick time.Time, s state // Record the last time a lease transfer was requested. for _, change := range msr.pendingChanges { - if change.change.IsTransferLease() { + if change.change.IsPureTransferLease() { msr.lastLeaseTransfer = tick } } @@ -187,7 +187,7 @@ func (msr *MMAStoreRebalancer) Tick(ctx context.Context, tick time.Time, s state } var curOp op.ControlledOperation - if msr.pendingChanges[msr.pendingChangeIdx].change.IsTransferLease() { + if msr.pendingChanges[msr.pendingChangeIdx].change.IsPureTransferLease() { curOp = op.NewTransferLeaseOp( tick, curChange.change.RangeID, diff --git a/pkg/kv/kvserver/mma_store_rebalancer.go b/pkg/kv/kvserver/mma_store_rebalancer.go index 4f7474e73c38..a322ba9e01ec 100644 --- a/pkg/kv/kvserver/mma_store_rebalancer.go +++ b/pkg/kv/kvserver/mma_store_rebalancer.go @@ -150,7 +150,7 @@ func (m *mmaStoreRebalancer) rebalance(ctx context.Context) bool { // applyChange safely applies a single change to the store. It handles the case // where the replica might not exist and provides proper error handling. func (m *mmaStoreRebalancer) applyChange( - ctx context.Context, change mmaprototype.PendingRangeChange, + ctx context.Context, change mmaprototype.ExternalRangeChange, ) error { repl := m.store.GetReplicaIfExists(change.RangeID) if repl == nil { @@ -160,7 +160,7 @@ func (m *mmaStoreRebalancer) applyChange( changeID := m.as.MMAPreApply(ctx, repl.RangeUsageInfo(), change) var err error switch { - case change.IsTransferLease(): + case change.IsPureTransferLease(): err = m.applyLeaseTransfer(ctx, repl, change) case change.IsChangeReplicas(): err = m.applyReplicaChanges(ctx, repl, change) @@ -175,7 +175,7 @@ func (m *mmaStoreRebalancer) applyChange( // applyLeaseTransfer applies a lease transfer change. func (m *mmaStoreRebalancer) applyLeaseTransfer( - ctx context.Context, repl replicaToApplyChanges, change mmaprototype.PendingRangeChange, + ctx context.Context, repl replicaToApplyChanges, change mmaprototype.ExternalRangeChange, ) error { return repl.AdminTransferLease( ctx, @@ -186,7 +186,7 @@ func (m *mmaStoreRebalancer) applyLeaseTransfer( // applyReplicaChanges applies replica membership changes. func (m *mmaStoreRebalancer) applyReplicaChanges( - ctx context.Context, repl replicaToApplyChanges, change mmaprototype.PendingRangeChange, + ctx context.Context, repl replicaToApplyChanges, change mmaprototype.ExternalRangeChange, ) error { // TODO(mma): We should be setting a timeout on the ctx here, in the case // where rebalancing takes a long time (stuck behind other snapshots). diff --git a/pkg/kv/kvserver/mmaintegration/BUILD.bazel b/pkg/kv/kvserver/mmaintegration/BUILD.bazel index 882e42b41157..82ef339e28dc 100644 --- a/pkg/kv/kvserver/mmaintegration/BUILD.bazel +++ b/pkg/kv/kvserver/mmaintegration/BUILD.bazel @@ -34,6 +34,7 @@ go_test( deps = [ "//pkg/kv/kvpb", "//pkg/kv/kvserver/allocator", + "//pkg/kv/kvserver/allocator/mmaprototype", "//pkg/roachpb", "//pkg/testutils/datapathutils", "//pkg/util/leaktest", diff --git a/pkg/kv/kvserver/mmaintegration/allocator_op.go b/pkg/kv/kvserver/mmaintegration/allocator_op.go index baf0e23028e3..6e72374f69cf 100644 --- a/pkg/kv/kvserver/mmaintegration/allocator_op.go +++ b/pkg/kv/kvserver/mmaintegration/allocator_op.go @@ -22,7 +22,7 @@ type trackedAllocatorChange struct { // change could not be registered with mma, in which case PostApply must not // inform mma. isMMARegistered bool - mmaChange mmaprototype.PendingRangeChange + mmaChange mmaprototype.ExternalRangeChange // Usage is range load usage. usage allocator.RangeUsageInfo // Exactly one of the following two fields will be set. diff --git a/pkg/kv/kvserver/mmaintegration/allocator_sync.go b/pkg/kv/kvserver/mmaintegration/allocator_sync.go index c0a25773544a..b9c10c7ed0ab 100644 --- a/pkg/kv/kvserver/mmaintegration/allocator_sync.go +++ b/pkg/kv/kvserver/mmaintegration/allocator_sync.go @@ -44,10 +44,11 @@ type storePool interface { type mmaState interface { // RegisterExternalChange is called by the allocator sync to register // external changes with the mma. - RegisterExternalChange(change mmaprototype.PendingRangeChange) (ok bool) + RegisterExternalChange( + change mmaprototype.PendingRangeChange) (_ mmaprototype.ExternalRangeChange, ok bool) // AdjustPendingChangeDisposition is called by the allocator sync to adjust // the disposition of pending changes to a range. - AdjustPendingChangeDisposition(change mmaprototype.PendingRangeChange, success bool) + AdjustPendingChangeDisposition(change mmaprototype.ExternalRangeChange, success bool) // BuildMMARebalanceAdvisor is called by the allocator sync to build a // MMARebalanceAdvisor for the given existing store and candidates. The // advisor should be later passed to IsInConflictWithMMA to determine if a @@ -151,10 +152,10 @@ func (as *AllocatorSync) NonMMAPreTransferLease( transferFrom, transferTo roachpb.ReplicationTarget, ) SyncChangeID { var isMMARegistered bool - var mmaChange mmaprototype.PendingRangeChange + var mmaChange mmaprototype.ExternalRangeChange if kvserverbase.LoadBasedRebalancingModeIsMMA(&as.st.SV) { - mmaChange = convertLeaseTransferToMMA(desc, usage, transferFrom, transferTo) - isMMARegistered = as.mmaAllocator.RegisterExternalChange(mmaChange) + change := convertLeaseTransferToMMA(desc, usage, transferFrom, transferTo) + mmaChange, isMMARegistered = as.mmaAllocator.RegisterExternalChange(change) } trackedChange := trackedAllocatorChange{ isMMARegistered: isMMARegistered, @@ -181,14 +182,14 @@ func (as *AllocatorSync) NonMMAPreChangeReplicas( leaseholderStoreID roachpb.StoreID, ) SyncChangeID { var isMMARegistered bool - var mmaChange mmaprototype.PendingRangeChange + var mmaChange mmaprototype.ExternalRangeChange if kvserverbase.LoadBasedRebalancingModeIsMMA(&as.st.SV) { var err error - mmaChange, err = convertReplicaChangeToMMA(desc, usage, changes, leaseholderStoreID) + change, err := convertReplicaChangeToMMA(desc, usage, changes, leaseholderStoreID) if err != nil { log.KvDistribution.Errorf(ctx, "failed to convert replica change to mma: %v", err) } else { - isMMARegistered = as.mmaAllocator.RegisterExternalChange(mmaChange) + mmaChange, isMMARegistered = as.mmaAllocator.RegisterExternalChange(change) } } trackedChange := trackedAllocatorChange{ @@ -212,7 +213,7 @@ func (as *AllocatorSync) NonMMAPreChangeReplicas( func (as *AllocatorSync) MMAPreApply( ctx context.Context, usage allocator.RangeUsageInfo, - pendingChange mmaprototype.PendingRangeChange, + pendingChange mmaprototype.ExternalRangeChange, ) SyncChangeID { trackedChange := trackedAllocatorChange{ isMMARegistered: true, @@ -220,7 +221,7 @@ func (as *AllocatorSync) MMAPreApply( usage: usage, } switch { - case pendingChange.IsTransferLease(): + case pendingChange.IsPureTransferLease(): trackedChange.leaseTransferOp = &leaseTransferOp{ transferFrom: pendingChange.LeaseTransferFrom(), transferTo: pendingChange.LeaseTransferTarget(), @@ -242,7 +243,7 @@ func (as *AllocatorSync) MMAPreApply( // MarkChangeAsFailed marks the given changes to the range as failed without // going through allocator sync. This is used when mma changes fail before // even registering with mma via MMAPreApply. -func (as *AllocatorSync) MarkChangeAsFailed(change mmaprototype.PendingRangeChange) { +func (as *AllocatorSync) MarkChangeAsFailed(change mmaprototype.ExternalRangeChange) { as.mmaAllocator.AdjustPendingChangeDisposition(change, false /* success */) } diff --git a/pkg/kv/kvserver/mmaintegration/mma_conversion_test.go b/pkg/kv/kvserver/mmaintegration/mma_conversion_test.go index a6b057277f83..2e31c548af91 100644 --- a/pkg/kv/kvserver/mmaintegration/mma_conversion_test.go +++ b/pkg/kv/kvserver/mmaintegration/mma_conversion_test.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/mmaprototype" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -113,8 +114,12 @@ func TestConvertReplicaChangeToMMA(t *testing.T) { mmaChanges.SortForTesting() var b strings.Builder fmt.Fprintf(&b, "PendingRangeChange: %s", mmaChanges.StringForTesting()) - if mmaChanges.IsChangeReplicas() { - fmt.Fprintf(&b, "As kvpb.ReplicationChanges:\n %v\n", mmaChanges.ReplicationChanges()) + externalChange := mmaprototype.MakeExternalRangeChange(mmaChanges) + if externalChange.IsChangeReplicas() { + fmt.Fprintf(&b, "As kvpb.ReplicationChanges:\n %v\n", externalChange.ReplicationChanges()) + } else if externalChange.IsPureTransferLease() { + fmt.Fprintf(&b, "Lease transfer from s%v to s%v\n", + externalChange.LeaseTransferFrom(), externalChange.LeaseTransferTarget()) } return b.String() }