Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,14 @@ func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoad
func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChange, success bool) {
a.mu.Lock()
defer a.mu.Unlock()
rs, ok := a.cs.ranges[change.RangeID]
_, ok := a.cs.ranges[change.RangeID]
if !ok {
// Range no longer exists. This can happen if the StoreLeaseholderMsg
// which included the effect of the change that transferred the lease away
// was already processed, causing the range to no longer be tracked by the
// allocator.
return
}
if !success && rs.pendingChangeNoRollback {
// Not allowed to undo.
return
}
// NB: It is possible that some of the changes have already been enacted via
// StoreLeaseholderMsg, and even been garbage collected. So no assumption
// can be made about whether these changes will be found in the allocator's
Expand All @@ -284,6 +280,9 @@ func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChang
if !ok {
continue
}
// NB: the ch and c pointers are not identical even though they have the
// same changeID. We create two copies in
// clusterState.addPendingRangeChange, since the internal copy is mutable.
changes = append(changes, ch)
}
if len(changes) == 0 {
Expand Down
195 changes: 71 additions & 124 deletions pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,12 @@ type ReplicaChange struct {
// replica being demoted cannot retain the lease).
//
// NB: The prev value is always the state before the change. This is the
// source of truth provided by the leaseholder in the RangeMsg, so will
// have real ReplicaIDs (if already a replica) and real ReplicaTypes
// latest source of truth provided by the leaseholder in the RangeMsg, so
// will have real ReplicaIDs (if already a replica) and real ReplicaTypes
// (including types beyond VOTER_FULL and NON_VOTER). This source-of-truth
// claim is guaranteed by REQUIREMENT(change-computation) documented
// elsewhere, and the fact that new changes are computed only when there
// are no pending changes for a range.
// elsewhere, and the fact that new changes are computed only when there are
// no pending changes for a range.
//
// The ReplicaType in next is either the zero value (for removals), or
// {VOTER_FULL, NON_VOTER} for additions/change, i.e., it represents the
Expand All @@ -218,6 +218,9 @@ type ReplicaChange struct {
// TODO(tbg): in MakeLeaseTransferChanges, next.ReplicaType.ReplicaType is
// simply the current value, and not necessarily {VOTER_FULL, NON_VOTER}.
// So the above comment is incorrect. We should clean this up.
//
// The prev field is mutable after creation, to ensure that an undo restores
// the state to the latest source of truth from the leaseholder.
prev ReplicaState
next ReplicaIDAndType

Expand Down Expand Up @@ -459,21 +462,14 @@ func mapReplicaTypeToVoterOrNonVoter(rType roachpb.ReplicaType) roachpb.ReplicaT
// replicas, or transferring the lease. There is at most one change per store
// in the set.
//
// NB: pendingReplicaChanges is not visible outside the package, so we can be
// certain that callers outside this package that hold a PendingRangeChange
// cannot mutate the internals other than clearing the state.
//
// Additionally, for a PendingRangeChange returned outside the package, we
// ensure that the pendingReplicaChanges slice itself is not shared with the
// rangeState.pendingChanges slice since the rangeState.pendingChanges slice
// can have entries removed from it (and swapped around as part of removal).
//
// Some the state inside each *pendingReplicaChange is mutable at arbitrary
// points in time by the code inside this package (with the relevant locking,
// of course). Currently, this state is gcTime, enactedAtTime. Neither of it
// is read by the public methods on PendingRangeChange.
// NB: pendingReplicaChanges is not visible outside the package.
//
// TODO(sumeer): when we expand the set of mutable fields, make a deep copy.
// The *pendingReplicaChange objects are pointers since the clusterState
// struct has multiple slices and maps that point to the same
// *pendingReplicaChange object, which is mutable. To prevent race conditions
// with exported functions on PendingRangeChange called from outside the
// package, the *pendingReplicaChange objects returned outside the package are
// a copy that will not be mutated.
type PendingRangeChange struct {
RangeID roachpb.RangeID
pendingReplicaChanges []*pendingReplicaChange
Expand Down Expand Up @@ -727,9 +723,9 @@ type pendingReplicaChange struct {
// expiry. All replica changes in a PendingRangeChange have the same
// startTime.
startTime time.Time
// gcTime represents a time when the unenacted change should be GC'd, either
// using the normal GC undo path, or if rangeState.pendingChangeNoRollback
// is true, when processing a RangeMsg from the leaseholder.
// gcTime represents a time when the unenacted change should be GC'd.
//
// Mutable after creation.
gcTime time.Time

// TODO(kvoli,sumeerbhola): Consider adopting an explicit expiration time,
Expand All @@ -742,6 +738,8 @@ type pendingReplicaChange struct {
// information received from the leaseholder, this value is set, so that even
// if the store with a replica affected by this pending change does not tell
// us about the enactment, we can garbage collect this change.
//
// Mutable after creation.
enactedAtTime time.Time
}

Expand Down Expand Up @@ -1052,7 +1050,7 @@ type rangeState struct {
// that are still at the initial state, or an intermediate state, it can
// continue anticipating that these pending changes will happen. Tracking
// what is pending also allows for undo in the case of explicit failure,
// notified by AdjustPendingChangesDisposition.
// notified by AdjustPendingChangesDisposition, or GC.
//
// 2. Lifecycle
// pendingChanges track proposed modifications to a range's replicas or
Expand All @@ -1078,17 +1076,9 @@ type rangeState struct {
// has been enacted in this case.
//
// 2. Undone as failed: corresponding replica and load change is rolled back.
// Note that for replica changes that originate from one action, all changes
// would be undone together.
// NB: pending changes of a range state originate from one decision.
// Therefore, when one pending change is enacted successfully, we mark this
// range state's pending changes as no rollback (read more about this in 3).
// If we are here trying to undo a pending change but the range state has
// already been marked as no rollback, we do not undo the remaining pending
// changes. Instead, we wait for a StoreLeaseholderMsg to discard the pending
// changes and revert the load adjustments after the
// partiallyEnactedGCDuration has elapsed since the first enacted change. The
// modeling here is imperfect (read more about this in 3).
// Note that for replica changes that originate from one action, some changes
// can be considered done because of the leaseholder msg, and others can be
// rolled back (say due to GC).
//
// This happens when:
// - The pending change failed to apply via
Expand Down Expand Up @@ -1149,14 +1139,10 @@ type rangeState struct {
// the replica and leaseholder to s4. An intermediate state that can be
// observed is {s1, s2, s3, s4} with the lease still at s3. But the pending
// change for adding s4 includes both that it has a replica, and it has the
// lease, so we will not mark it done, and keep pretending that the whole
// change is pending. Since lease transfers are fast, we accept this
// imperfect modeling fidelity. One consequence of this imperfect modeling
// is that if in this example there are no further changes observed until
// GC, the allocator will undo both changes and go back to the state {s1,
// s2, s3} with s3 as the leaseholder. That is, it has forgotten that s4 was
// added. This is unavoidable and will be fixed by the first
// StoreLeaseholderMsg post-GC.
// lease, so we will not mark it done, and keep pretending that the change
// is pending. However, we will change the prev state to indicate that s4
// has a replica, so that undo (say due to GC) rolls back to the latest
// source-of-truth from the leaseholder.
//
// 4. Non Atomicity Hazard
//
Expand All @@ -1165,20 +1151,19 @@ type rangeState struct {
// to contend with the hazard of having two leaseholders or no leaseholders.
// In the earlier example, say s3 and s4 were both local stores (a
// multi-store node), it may be possible to observe an intermediate state
// {s1, s2, s3, s4} where s4 is the leaseholder. If we subsequently get a
// spurious AdjustPendingChangesDisposition(success=false) call, or
// time-based GC causes the s3 removal to be undone, there will be two
// replicas marked as the leaseholder. The other extreme is believing that
// the s3 transfer is done and the s4 incoming replica (and lease) failed
// (this may not actually be possible because of the surrounding code).
// {s1, s2, s3, s4} where s4 is the leaseholder. We need to ensure that if
// we subsequently get a spurious
// AdjustPendingChangesDisposition(success=false) call, or time-based GC
// causes the s3 removal to be undone, there will not be two replicas marked
// as the leaseholder. The other extreme is believing that the s3 transfer
// is done and the s4 incoming replica (and lease) failed (this may not
// actually be possible because of the surrounding code).
//
// We deal with this hazard by observing that we've constructed multiple
// pending changes in order to observe intermediate changes in the common
// case of success. Once one change in the set of changes is considered
// enacted, we mark the whole remaining group as no-rollback. In the above
// case, if we see s4 has become the leaseholder, the s1 removal can't undo
// itself -- it can be dropped if it is considered subsumed when processing
// a RangeMsg, or it can be GC'd.
// This hazard is dealt with in the same way outlined in the earlier
// example: when the leaseholder msg from s4 arrives that lists {s1, s2, s3,
// s4} as replicas, the prev state for the s3 change is updated to indicate
// that it is not the leaseholder. This means that if the change is undone,
// it will return to a prev state where it has a replica but not the lease.
//
// Additionally, when processing a RangeMsg, if any of the pending changes
// is considered inconsistent, all the pending changes are discarded. This
Expand All @@ -1198,11 +1183,6 @@ type rangeState struct {
// rangeState.pendingChanges across all ranges in clusterState.ranges will
// be identical to clusterState.pendingChanges.
pendingChanges []*pendingReplicaChange
// When set, the pendingChanges can not be rolled back anymore. They have
// to be enacted, or discarded wholesale in favor of the latest RangeMsg
// from the leaseholder. It is reset to false when pendingChanges
// transitions from empty to non-empty.
pendingChangeNoRollback bool

// If non-nil, it is up-to-date. Typically, non-nil for a range that has no
// pendingChanges and is not satisfying some constraint, since we don't want
Expand Down Expand Up @@ -1534,27 +1514,15 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
// The change has been enacted according to the leaseholder.
enactedChanges = append(enactedChanges, change)
} else {
// Not subsumed. Replace the prev with the latest source of truth from
// the leaseholder. Note, this can be the noReplicaID case from above.
change.prev = adjustedReplica
remainingChanges = append(remainingChanges, change)
}
}
gcRemainingChanges := false
if rs.pendingChangeNoRollback {
// A previous StoreLeaseholderMsg has enacted some changes, so the
// remainingChanges may be GC'able. All of them share the same GC time.
// Note that normal GC will not GC these, since normal GC needs to undo,
// and we are not allowed to undo these.
if len(remainingChanges) > 0 {
gcTime := remainingChanges[0].gcTime
if gcTime.Before(now) {
gcRemainingChanges = true
}
}
} else if len(enactedChanges) > 0 && len(remainingChanges) > 0 {
// First time this set of changes is seeing something enacted, and there
// are remaining changes.
if len(enactedChanges) > 0 && len(remainingChanges) > 0 {
// There are remaining changes, so potentially update their gcTime.
//
// No longer permitted to rollback.
rs.pendingChangeNoRollback = true
// All remaining changes have the same gcTime.
curGCTime := remainingChanges[0].gcTime
revisedGCTime := now.Add(partiallyEnactedGCDuration)
Expand Down Expand Up @@ -1598,27 +1566,19 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
// preCheckOnApplyReplicaChanges returns false if there are any pending
// changes, and these are the changes that are pending. This is hacky
// and should be cleaned up.
var valid bool
var reason redact.RedactableString
if gcRemainingChanges {
reason = "GCing remaining changes after partial enactment"
} else {
// NB: rs.pendingChanges contains the same changes as
// remainingChanges, but they are not the same slice.
rc := rs.pendingChanges
rs.pendingChanges = nil
err := cs.preCheckOnApplyReplicaChanges(PendingRangeChange{
RangeID: rangeMsg.RangeID,
pendingReplicaChanges: remainingChanges,
})
valid = err == nil
if err != nil {
reason = redact.Sprint(err)
}
// Restore it.
rs.pendingChanges = rc
}
if valid {
//
// NB: rs.pendingChanges contains the same changes as
// remainingChanges, but they are not the same slice.
rc := rs.pendingChanges
rs.pendingChanges = nil
err := cs.preCheckOnApplyReplicaChanges(PendingRangeChange{
RangeID: rangeMsg.RangeID,
pendingReplicaChanges: remainingChanges,
})
// Restore it.
rs.pendingChanges = rc

if err == nil {
// Re-apply the remaining changes. Note that the load change was not
// undone above, so we pass !applyLoadChange, to avoid applying it
// again. Also note that applyReplicaChange does not add to the various
Expand All @@ -1628,6 +1588,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
cs.applyReplicaChange(change.ReplicaChange, false)
}
} else {
reason := redact.Sprint(err)
// The current state provided by the leaseholder does not permit these
// changes, so we need to drop them. This should be rare, but can happen
// if the leaseholder executed a change that MMA was completely unaware
Expand Down Expand Up @@ -1861,7 +1822,6 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
topk := ss.adjusted.topKRanges[msg.StoreID]
topk.doneInit()
}

}

// If the pending replica change does not happen within this GC duration, we
Expand Down Expand Up @@ -1895,15 +1855,6 @@ func (cs *clusterState) gcPendingChanges(now time.Time) {
if !ok {
panic(errors.AssertionFailedf("range %v not found in cluster state", rangeID))
}

// Unlike normal GC that reverts changes, we want to discard these pending
// changes. Do nothing here; processStoreLeaseholderMsgInternal will later
// detect and discard these pending changes. Note that
// processStoreLeaseholderMsgInternal will not revert the pending load
// change.
if rs.pendingChangeNoRollback {
continue
}
if len(rs.pendingChanges) == 0 {
panic(errors.AssertionFailedf("no pending changes in range %v", rangeID))
}
Expand Down Expand Up @@ -1945,8 +1896,6 @@ func (cs *clusterState) pendingChangeEnacted(cid changeID, enactedAt time.Time)
}

// undoPendingChange reverses the change with ID cid.
//
// REQUIRES: the change is not marked as no-rollback.
func (cs *clusterState) undoPendingChange(cid changeID) {
change, ok := cs.pendingChanges[cid]
if !ok {
Expand All @@ -1956,10 +1905,6 @@ func (cs *clusterState) undoPendingChange(cid changeID) {
if !ok {
panic(errors.AssertionFailedf("range %v not found in cluster state", change.rangeID))
}
if rs.pendingChangeNoRollback {
// One cannot undo changes once no-rollback is true.
panic(errors.AssertionFailedf("pending change is marked as no-rollback"))
}
// Wipe the analyzed constraints, as the range has changed.
rs.constraints = nil
rs.lastFailedChange = cs.ts.Now()
Expand Down Expand Up @@ -1992,6 +1937,10 @@ func printMapPendingChanges(changes map[changeID]*pendingReplicaChange) string {
// adjusted load, tracked pending changes and changeIDs to reflect the pending
// application. It updates the *pendingReplicaChanges inside the change.
//
// The change contains replica changes that will be returned outside the
// package, so a copy is made for package internal use (see the comment on
// PendingRangeChange about mutability).
//
// REQUIRES: all the replica changes are to the same range, and that the range
// has no pending changes.
func (cs *clusterState) addPendingRangeChange(change PendingRangeChange) {
Expand All @@ -2018,26 +1967,26 @@ func (cs *clusterState) addPendingRangeChange(change PendingRangeChange) {
// Only the lease is being transferred.
gcDuration = pendingLeaseTransferGCDuration
}
pendingChanges := change.pendingReplicaChanges
now := cs.ts.Now()
for _, pendingChange := range pendingChanges {
cs.applyReplicaChange(pendingChange.ReplicaChange, true)
for _, origPendingChange := range change.pendingReplicaChanges {
cs.applyReplicaChange(origPendingChange.ReplicaChange, true)
cs.changeSeqGen++
cid := cs.changeSeqGen
pendingChange.changeID = cid
pendingChange.startTime = now
pendingChange.gcTime = now.Add(gcDuration)
pendingChange.enactedAtTime = time.Time{}
origPendingChange.changeID = cid
origPendingChange.startTime = now
origPendingChange.gcTime = now.Add(gcDuration)
origPendingChange.enactedAtTime = time.Time{}
// Make a copy for internal tracking, since the internal state is mutable.
pendingChange := &pendingReplicaChange{}
*pendingChange = *origPendingChange
storeState := cs.stores[pendingChange.target.StoreID]
rangeState := cs.ranges[rangeID]
cs.pendingChanges[cid] = pendingChange
storeState.adjusted.loadPendingChanges[cid] = pendingChange
rangeState.pendingChanges = append(rangeState.pendingChanges, pendingChange)
rangeState.pendingChangeNoRollback = false
log.KvDistribution.VInfof(context.Background(), 3,
"addPendingRangeChange: change_id=%v, range_id=%v, change=%v",
cid, rangeID, pendingChange.ReplicaChange)
pendingChanges = append(pendingChanges, pendingChange)
}
}

Expand Down Expand Up @@ -2105,8 +2054,6 @@ func (cs *clusterState) preCheckOnApplyReplicaChanges(rangeChange PendingRangeCh
// preCheckOnUndoReplicaChanges does some validation of the changes being
// proposed for undo.
//
// REQUIRES: the rangeState.pendingChangeNoRollback is false.
//
// This method is defensive since if we always check against the current state
// before allowing a change to be added (including re-addition after a
// StoreLeaseholderMsg), we should never have invalidity during an undo, if
Expand Down
Loading