Skip to content

Commit 6a5f0d6

Browse files
committed
mma: fix minor todos related to pending changes
Some methods now take a PendingRangeChange as parameter, insted of a slice of pendingReplicaChanges, to make it clearer that these are changes for the same range. Also removed two todos: - The RangeID continues to be a field in pendingReplicaChange, since it is convenient (mainly for logging and testing), despite redundancy with the containing PendingRangeChange. This is because the contained relationship is not maintained in various maps keyed by the changeID. - The changes in a rangeState are not modeled as a PendingRangeChange since the concept is redundant -- they are already contained in a range specific container. Informs #157049 Epic: CRDB-55052 Release note: None
1 parent e837db7 commit 6a5f0d6

File tree

3 files changed

+29
-45
lines changed

3 files changed

+29
-45
lines changed

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,10 @@ func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChang
291291
}
292292
if !success {
293293
// Check that we can undo these changes.
294-
if err := a.cs.preCheckOnUndoReplicaChanges(changes); err != nil {
294+
if err := a.cs.preCheckOnUndoReplicaChanges(PendingRangeChange{
295+
RangeID: change.RangeID,
296+
pendingReplicaChanges: changes,
297+
}); err != nil {
295298
panic(err)
296299
}
297300
}
@@ -308,7 +311,7 @@ func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChang
308311
func (a *allocatorState) RegisterExternalChange(change PendingRangeChange) (ok bool) {
309312
a.mu.Lock()
310313
defer a.mu.Unlock()
311-
if err := a.cs.preCheckOnApplyReplicaChanges(change.pendingReplicaChanges); err != nil {
314+
if err := a.cs.preCheckOnApplyReplicaChanges(change); err != nil {
312315
a.mmaMetrics.ExternalFailedToRegister.Inc(1)
313316
log.KvDistribution.Infof(context.Background(),
314317
"did not register external changes: due to %v", err)

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 22 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,10 @@ type ReplicaChange struct {
169169

170170
// target is the target {store,node} for the change.
171171
target roachpb.ReplicationTarget
172-
// TODO(sumeer): remove rangeID.
172+
// rangeID is the same as that in the PendingRangeChange.RangeID this change
173+
// is part of. It is duplicated here since the individual
174+
// pendingReplicaChanges are kept in various maps keyed by the changeID, and
175+
// having the RangeID field on each change is convenient.
173176
rangeID roachpb.RangeID
174177

175178
// NB: 0 is not a valid ReplicaID, but this component does not care about
@@ -1200,8 +1203,6 @@ type rangeState struct {
12001203
// Life-cycle matches clusterState.pendingChanges. The consolidated
12011204
// rangeState.pendingChanges across all ranges in clusterState.ranges will
12021205
// be identical to clusterState.pendingChanges.
1203-
//
1204-
// TODO(sumeer): replace by PendingRangeChange.
12051206
pendingChanges []*pendingReplicaChange
12061207
// When set, the pendingChanges can not be rolled back anymore. They have
12071208
// to be enacted, or discarded wholesale in favor of the latest RangeMsg
@@ -1607,7 +1608,10 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
16071608
// remainingChanges, but they are not the same slice.
16081609
rc := rs.pendingChanges
16091610
rs.pendingChanges = nil
1610-
err := cs.preCheckOnApplyReplicaChanges(remainingChanges)
1611+
err := cs.preCheckOnApplyReplicaChanges(PendingRangeChange{
1612+
RangeID: rangeMsg.RangeID,
1613+
pendingReplicaChanges: remainingChanges,
1614+
})
16111615
valid = err == nil
16121616
if err != nil {
16131617
reason = redact.Sprint(err)
@@ -1907,7 +1911,10 @@ func (cs *clusterState) gcPendingChanges(now time.Time) {
19071911
if !startTime.Add(pendingChangeGCDuration).Before(now) {
19081912
continue
19091913
}
1910-
if err := cs.preCheckOnUndoReplicaChanges(rs.pendingChanges); err != nil {
1914+
if err := cs.preCheckOnUndoReplicaChanges(PendingRangeChange{
1915+
RangeID: rangeID,
1916+
pendingReplicaChanges: rs.pendingChanges,
1917+
}); err != nil {
19111918
panic(err)
19121919
}
19131920
// Gather the changeIDs, since calls to undoPendingChange modify the
@@ -1980,24 +1987,6 @@ func printMapPendingChanges(changes map[changeID]*pendingReplicaChange) string {
19801987
return buf.String()
19811988
}
19821989

1983-
//lint:ignore U1000 used in tests
1984-
func printPendingChanges(changes []*pendingReplicaChange) string {
1985-
var buf strings.Builder
1986-
fmt.Fprintf(&buf, "pending(%d)", len(changes))
1987-
for _, change := range changes {
1988-
fmt.Fprintf(&buf, "\nchange-id=%d store-id=%v node-id=%v range-id=%v load-delta=%v start=%v",
1989-
change.changeID, change.target.StoreID, change.target.NodeID, change.rangeID,
1990-
change.loadDelta, change.startTime,
1991-
)
1992-
if !(change.enactedAtTime == time.Time{}) {
1993-
fmt.Fprintf(&buf, " enacted=%v",
1994-
change.enactedAtTime)
1995-
}
1996-
fmt.Fprintf(&buf, "\n prev=(%v)\n next=(%v)", change.prev, change.next)
1997-
}
1998-
return buf.String()
1999-
}
2000-
20011990
// addPendingRangeChange takes a range change containing a set of replica
20021991
// changes, and applies the changes as pending. The application updates the
20031992
// adjusted load, tracked pending changes and changeIDs to reflect the pending
@@ -2062,16 +2051,11 @@ func (cs *clusterState) addPendingRangeChange(change PendingRangeChange) {
20622051
// been added at a store, but it has not yet received the lease. Finally, it
20632052
// checks that after the changes are applied there is exactly one leaseholder.
20642053
// It returns a non-nil error if any of these checks fail.
2065-
//
2066-
// REQUIRES: all the changes are to the same range, and there is at most one
2067-
// change per store.
2068-
//
2069-
// TODO(sumeer): change to take PendingRangeChange as parameter
2070-
func (cs *clusterState) preCheckOnApplyReplicaChanges(changes []*pendingReplicaChange) error {
2071-
if len(changes) == 0 {
2054+
func (cs *clusterState) preCheckOnApplyReplicaChanges(rangeChange PendingRangeChange) error {
2055+
if len(rangeChange.pendingReplicaChanges) == 0 {
20722056
return nil
20732057
}
2074-
rangeID := changes[0].rangeID
2058+
rangeID := rangeChange.RangeID
20752059
curr, ok := cs.ranges[rangeID]
20762060
// Return early if range already has some pending changes or the range does not exist.
20772061
if !ok {
@@ -2086,7 +2070,7 @@ func (cs *clusterState) preCheckOnApplyReplicaChanges(changes []*pendingReplicaC
20862070
copiedCurr := rangeState{
20872071
replicas: append([]StoreIDAndReplicaState{}, curr.replicas...),
20882072
}
2089-
for _, change := range changes {
2073+
for _, change := range rangeChange.pendingReplicaChanges {
20902074
// Check that all changes correspond to the same range. Panic otherwise.
20912075
if change.rangeID != rangeID {
20922076
panic(errors.AssertionFailedf("unexpected change rangeID %d != %d", change.rangeID, rangeID))
@@ -2115,20 +2099,17 @@ func (cs *clusterState) preCheckOnApplyReplicaChanges(changes []*pendingReplicaC
21152099
// preCheckOnUndoReplicaChanges does some validation of the changes being
21162100
// proposed for undo.
21172101
//
2118-
// REQUIRES: changes is non-empty; all changes are to the same range; the
2119-
// rangeState.pendingChangeNoRollback is false.
2102+
// REQUIRES: the rangeState.pendingChangeNoRollback is false.
21202103
//
21212104
// This method is defensive since if we always check against the current state
21222105
// before allowing a change to be added (including re-addition after a
21232106
// StoreLeaseholderMsg), we should never have invalidity during an undo, if
21242107
// all the changes are being undone.
2125-
//
2126-
// TODO(sumeer): change to take PendingRangeChange as parameter
2127-
func (cs *clusterState) preCheckOnUndoReplicaChanges(changes []*pendingReplicaChange) error {
2128-
if len(changes) == 0 {
2129-
panic(errors.AssertionFailedf("no changes to undo"))
2108+
func (cs *clusterState) preCheckOnUndoReplicaChanges(rangeChange PendingRangeChange) error {
2109+
if len(rangeChange.pendingReplicaChanges) == 0 {
2110+
return nil
21302111
}
2131-
rangeID := changes[0].rangeID
2112+
rangeID := rangeChange.RangeID
21322113
curr, ok := cs.ranges[rangeID]
21332114
if !ok {
21342115
return errors.Errorf("range %v does not exist in cluster state", rangeID)
@@ -2137,7 +2118,7 @@ func (cs *clusterState) preCheckOnUndoReplicaChanges(changes []*pendingReplicaCh
21372118
copiedCurr := &rangeState{
21382119
replicas: append([]StoreIDAndReplicaState{}, curr.replicas...),
21392120
}
2140-
for _, change := range changes {
2121+
for _, change := range rangeChange.pendingReplicaChanges {
21412122
if change.rangeID != rangeID {
21422123
panic(errors.AssertionFailedf("unexpected change rangeID %d != %d", change.rangeID, rangeID))
21432124
}

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_rebalance_stores.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ func (re *rebalanceEnv) rebalanceReplicas(
413413
replicaChanges := makeRebalanceReplicaChanges(
414414
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
415415
rangeChange := MakePendingRangeChange(rangeID, replicaChanges[:])
416-
if err = re.preCheckOnApplyReplicaChanges(rangeChange.pendingReplicaChanges); err != nil {
416+
if err = re.preCheckOnApplyReplicaChanges(rangeChange); err != nil {
417417
panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v",
418418
replicaChanges, rangeID))
419419
}
@@ -591,7 +591,7 @@ func (re *rebalanceEnv) rebalanceLeases(
591591
replicaChanges := MakeLeaseTransferChanges(
592592
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
593593
leaseChange := MakePendingRangeChange(rangeID, replicaChanges[:])
594-
if err := re.preCheckOnApplyReplicaChanges(leaseChange.pendingReplicaChanges); err != nil {
594+
if err := re.preCheckOnApplyReplicaChanges(leaseChange); err != nil {
595595
panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChange))
596596
}
597597
re.addPendingRangeChange(leaseChange)

0 commit comments

Comments
 (0)