Skip to content

Commit ccdd264

Browse files
craig[bot]sumeerbholatbg
committed
Merge #156586
156586: mmaprototype: improve modeling of pending changes r=tbg a=sumeerbhola The goal was to fix a known bug where in a multi-store setting where a range (and lease) is being moved from one local store to another local store, under a partial undo case we could end up with 2 leaseholders in the internal state of the allocator. As part of fixing this, we elaborate on why we are modeling a single "decision" as a set of pending pendingReplicaChanges, one per replica, and the modeling deficiencies. To address the bug, we sacrifice the universal undo behavior of a pendingReplicaChange. Instead, once some subset of changes on a range are seen to be enacted, we set a no-rollback bit on the rangeState for the remaining set of pendingReplicaChanges. These can no longer be undone, and must only be discarded or considered subsumed. This discarding behavior happens when processing a new StoreLeaseholderMsg. As part of this change, we also shorten the GC duration of the remaining pending changes once some subset has been observed to be enacted. This should be generally beneficial, since for range moves it is the first change (replica addition) that is time consuming, and is what caused us to have a slow (5min) GC duration. Informs #156754 Epic: CRDB-55052 Release note: None Co-authored-by: sumeerbhola <[email protected]> Co-authored-by: Tobias Grieger <[email protected]>
2 parents 061d9a3 + 0e33c11 commit ccdd264

File tree

11 files changed

+1163
-223
lines changed

11 files changed

+1163
-223
lines changed

pkg/kv/kvserver/allocator/mmaprototype/allocator.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type Allocator interface {
5959
// Calls to AdjustPendingChangesDisposition must be correctly sequenced with
6060
// full state updates from the local node provided in
6161
// ProcessNodeLoadResponse.
62+
//
63+
// REQUIRES: len(changes) > 0 and all changes are to the same range.
6264
AdjustPendingChangesDisposition(changes []ChangeID, success bool)
6365

6466
// RegisterExternalChanges informs this allocator about yet to complete

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -546,9 +546,8 @@ func (a *allocatorState) rebalanceStores(
546546
}
547547
leaseChanges := MakeLeaseTransferChanges(
548548
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
549-
if valid, reason := a.cs.preCheckOnApplyReplicaChanges(leaseChanges[:]); !valid {
550-
panic(fmt.Sprintf("pre-check failed for lease transfer %v: due to %v",
551-
leaseChanges, reason))
549+
if err := a.cs.preCheckOnApplyReplicaChanges(leaseChanges[:]); err != nil {
550+
panic(errors.Wrapf(err, "pre-check failed for lease transfer %v", leaseChanges))
552551
}
553552
pendingChanges := a.cs.createPendingChanges(leaseChanges[:]...)
554553
changes = append(changes, PendingRangeChange{
@@ -764,9 +763,9 @@ func (a *allocatorState) rebalanceStores(
764763
}
765764
replicaChanges := makeRebalanceReplicaChanges(
766765
rangeID, rstate.replicas, rstate.load, addTarget, removeTarget)
767-
if valid, reason := a.cs.preCheckOnApplyReplicaChanges(replicaChanges[:]); !valid {
768-
panic(fmt.Sprintf("pre-check failed for replica changes: %v due to %v for %v",
769-
replicaChanges, reason, rangeID))
766+
if err = a.cs.preCheckOnApplyReplicaChanges(replicaChanges[:]); err != nil {
767+
panic(errors.Wrapf(err, "pre-check failed for replica changes: %v for %v",
768+
replicaChanges, rangeID))
770769
}
771770
pendingChanges := a.cs.createPendingChanges(replicaChanges[:]...)
772771
changes = append(changes, PendingRangeChange{
@@ -819,27 +818,56 @@ func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoad
819818
func (a *allocatorState) AdjustPendingChangesDisposition(changeIDs []ChangeID, success bool) {
820819
a.mu.Lock()
821820
defer a.mu.Unlock()
821+
// NB: It is possible that some of the changeIDs have already been enacted
822+
// via StoreLeaseholderMsg, and even been garbage collected. So no
823+
// assumption can be made about whether these changeIDs will be found in the
824+
// allocator's state.
822825
if !success {
826+
// Gather the changes that are found and need to be undone.
823827
replicaChanges := make([]ReplicaChange, 0, len(changeIDs))
824828
for _, changeID := range changeIDs {
825829
change, ok := a.cs.pendingChanges[changeID]
826830
if !ok {
831+
continue
832+
}
833+
rs, ok := a.cs.ranges[change.rangeID]
834+
if !ok {
835+
panic(errors.AssertionFailedf("range %v not found in cluster state", change.rangeID))
836+
}
837+
if rs.pendingChangeNoRollback {
838+
// All the changes are to the same range, so return.
827839
return
828840
}
829841
replicaChanges = append(replicaChanges, change.ReplicaChange)
830842
}
831-
if valid, reason := a.cs.preCheckOnUndoReplicaChanges(replicaChanges); !valid {
832-
log.KvDistribution.Infof(context.Background(), "did not undo change %v: due to %v", changeIDs, reason)
843+
if len(replicaChanges) == 0 {
844+
return
845+
}
846+
// Check that we can undo these changes. If not, log and return.
847+
if err := a.cs.preCheckOnUndoReplicaChanges(replicaChanges); err != nil {
848+
// TODO(sumeer): we should be able to panic here, once the interface
849+
// contract says that all the proposed changes must be included in
850+
// changeIDs. Without that contract, there may be a pair of changes
851+
// (remove replica and lease from s1), (add replica and lease to s2),
852+
// and the caller can provide the first changeID only, and the undo
853+
// would cause two leaseholders. The pre-check would catch that here.
854+
log.KvDistribution.Infof(context.Background(), "did not undo change %v: due to %v", changeIDs, err)
833855
return
834856
}
835857
}
836858

837859
for _, changeID := range changeIDs {
838-
// We set !requireFound, since a StoreLeaseholderMsg that happened after
839-
// the pending change was created and before this call to
860+
// We set !requireFound, since some of these pending changes may no longer
861+
// exist in the allocator's state. For example, a StoreLeaseholderMsg that
862+
// happened after the pending change was created and before this call to
840863
// AdjustPendingChangesDisposition may have already removed the pending
841864
// change.
842865
if success {
866+
// TODO(sumeer): this code is implicitly assuming that all the changes
867+
// on the rangeState are being enacted. And that is true of the current
868+
// callers. We should explicitly state the assumption in the interface.
869+
// Because if only some are being enacted, we ought to set
870+
// pendingChangeNoRollback, and we don't bother to.
843871
a.cs.pendingChangeEnacted(changeID, a.cs.ts.Now(), false)
844872
} else {
845873
a.cs.undoPendingChange(changeID, false)
@@ -852,10 +880,10 @@ func (a *allocatorState) AdjustPendingChangesDisposition(changeIDs []ChangeID, s
852880
func (a *allocatorState) RegisterExternalChanges(changes []ReplicaChange) []ChangeID {
853881
a.mu.Lock()
854882
defer a.mu.Unlock()
855-
if valid, reason := a.cs.preCheckOnApplyReplicaChanges(changes); !valid {
883+
if err := a.cs.preCheckOnApplyReplicaChanges(changes); err != nil {
856884
a.mmaMetrics.ExternalFailedToRegister.Inc(1)
857885
log.KvDistribution.Infof(context.Background(),
858-
"did not register external changes: due to %v", reason)
886+
"did not register external changes: due to %v", err)
859887
return nil
860888
} else {
861889
a.mmaMetrics.ExternaRegisterSuccess.Inc(1)

0 commit comments

Comments
 (0)