Skip to content

Commit 0e33c11

Browse files
committed
mmaprototype: more test and comment improvements etc.
1 parent 7e006d5 commit 0e33c11

File tree

7 files changed

+160
-103
lines changed

7 files changed

+160
-103
lines changed

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,12 @@ func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoad
818818
func (a *allocatorState) AdjustPendingChangesDisposition(changeIDs []ChangeID, success bool) {
819819
a.mu.Lock()
820820
defer a.mu.Unlock()
821+
// NB: It is possible that some of the changeIDs have already been enacted
822+
// via StoreLeaseholderMsg, and even been garbage collected. So no
823+
// assumption can be made about whether these changeIDs will be found in the
824+
// allocator's state.
821825
if !success {
826+
// Gather the changes that are found and need to be undone.
822827
replicaChanges := make([]ReplicaChange, 0, len(changeIDs))
823828
for _, changeID := range changeIDs {
824829
change, ok := a.cs.pendingChanges[changeID]
@@ -834,25 +839,35 @@ func (a *allocatorState) AdjustPendingChangesDisposition(changeIDs []ChangeID, s
834839
return
835840
}
836841
replicaChanges = append(replicaChanges, change.ReplicaChange)
837-
// Else ignore this change. We don't want to pass this change to
838-
// pre-check since it will likely violate an invariant and cause us to
839-
// emit a noisy log message.
840842
}
841843
if len(replicaChanges) == 0 {
842844
return
843845
}
846+
// Check that we can undo these changes. If not, log and return.
844847
if err := a.cs.preCheckOnUndoReplicaChanges(replicaChanges); err != nil {
848+
// TODO(sumeer): we should be able to panic here, once the interface
849+
// contract says that all the proposed changes must be included in
850+
// changeIDs. Without that contract, there may be a pair of changes
851+
// (remove replica and lease from s1), (add replica and lease to s2),
852+
// and the caller can provide the first changeID only, and the undo
853+
// would cause two leaseholders. The pre-check would catch that here.
845854
log.KvDistribution.Infof(context.Background(), "did not undo change %v: due to %v", changeIDs, err)
846855
return
847856
}
848857
}
849858

850859
for _, changeID := range changeIDs {
851-
// We set !requireFound, since a StoreLeaseholderMsg that happened after
852-
// the pending change was created and before this call to
860+
// We set !requireFound, since some of these pending changes may no longer
861+
// exist in the allocator's state. For example, a StoreLeaseholderMsg that
862+
// happened after the pending change was created and before this call to
853863
// AdjustPendingChangesDisposition may have already removed the pending
854864
// change.
855865
if success {
866+
// TODO(sumeer): this code is implicitly assuming that all the changes
867+
// on the rangeState are being enacted. And that is true of the current
868+
// callers. We should explicitly state the assumption in the interface.
869+
// Because if only some are being enacted, we ought to set
870+
// pendingChangeNoRollback, and we don't bother to.
856871
a.cs.pendingChangeEnacted(changeID, a.cs.ts.Now(), false)
857872
} else {
858873
a.cs.undoPendingChange(changeID, false)

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,12 @@ func (prc PendingRangeChange) LeaseTransferFrom() roachpb.StoreID {
536536
panic("unreachable")
537537
}
538538

539+
// TODO(sumeer): we have various methods that take slices of either ChangeIDs
540+
// or pendingReplicaChanges or ReplicaChange, and have callers that already
541+
// have or could first construct a slice of pendingReplicaChanges, and avoid
542+
// various temporary slice construction and repeated map lookups. Clean this
543+
// up.
544+
539545
// pendingReplicaChange is a proposed change to a single replica. Some
540546
// external entity (the leaseholder of the range) may choose to enact this
541547
// change. It may not be enacted if it will cause some invariant (like the
@@ -559,6 +565,10 @@ type pendingReplicaChange struct {
559565
// earlier GC. It is used to hasten GC (for the remaining changes) when some
560566
// subset of changes corresponding to the same complex change have been
561567
// observed to be enacted.
568+
//
569+
// The GC of these changes happens on a different path than the usual GC,
570+
// which can undo the changes -- this GC happens only when processing a
571+
// RangeMsg from the leaseholder.
562572
revisedGCTime time.Time
563573

564574
// TODO(kvoli,sumeerbhola): Consider adopting an explicit expiration time,
@@ -902,7 +912,12 @@ type rangeState struct {
902912
// change for adding s4 includes both that it has a replica, and it has the
903913
// lease, so we will not mark it done, and keep pretending that the whole
904914
// change is pending. Since lease transfers are fast, we accept this
905-
// imperfect modeling fidelity.
915+
// imperfect modeling fidelity. One consequence of this imperfect modeling
916+
// is that if in this example there are no further changes observed until
917+
// GC, the allocator will undo both changes and go back to the state {s1,
918+
// s2, s3} with s3 as the leaseholder. That is, it has forgotten that s4 was
919+
// added. This is unavoidable and will be fixed by the first
920+
// StoreLeaseholderMsg post-GC.
906921
//
907922
// 3. Non Atomicity Hazard
908923
//
@@ -1627,9 +1642,7 @@ func (cs *clusterState) gcPendingChanges(now time.Time) {
16271642
changeIDs = append(changeIDs, pendingChange.ChangeID)
16281643
}
16291644
if err := cs.preCheckOnUndoReplicaChanges(replicaChanges); err != nil {
1630-
log.KvDistribution.Infof(context.Background(),
1631-
"did not undo changes to range %v: due to %v", rangeID, err)
1632-
continue
1645+
panic(err)
16331646
}
16341647
for _, changeID := range changeIDs {
16351648
cs.undoPendingChange(changeID, true)
@@ -1773,6 +1786,13 @@ func (cs *clusterState) createPendingChanges(changes ...ReplicaChange) []*pendin
17731786

17741787
// preCheckOnApplyReplicaChanges does some validation of the changes being
17751788
// proposed. It ensures the range is known and has no pending changes already.
1789+
//
1790+
// It only needs to be called for (a) new changes that are being proposed, or
1791+
// (b) when we have reset the rangeState.replicas using a StoreLeaseholderMsg
1792+
// and we have some previously proposed pending changes that have not been
1793+
// enacted yet, and we want to re-validate them before adjusting
1794+
// rangeState.replicas.
1795+
//
17761796
// For a removal, it validates that the replica exists. For non-removal, it
17771797
// blind applies the change without validating whether the current state is
17781798
// ReplicaChange.prev -- this blind application allows this pre-check to
@@ -1783,6 +1803,20 @@ func (cs *clusterState) createPendingChanges(changes ...ReplicaChange) []*pendin
17831803
//
17841804
// REQUIRES: all the changes are to the same range; there are 1, 2 or 4
17851805
// changes.
1806+
//
1807+
// TODO(sumeer): the 4 changes part is a hack because the asim conformance
1808+
// test produces a change (when running under SMA) which is:
1809+
//
1810+
// r10 type: RemoveReplica target store n3,s3 (replica-id=5 type=NON_VOTER)->(replica-id=none type=VOTER_FULL)
1811+
// r10 type: RemoveReplica target store n2,s2 (replica-id=2 type=VOTER_FULL)->(replica-id=none type=VOTER_FULL)
1812+
// r10 type: AddReplica target store n3,s3 (replica-id=none type=VOTER_FULL)->(replica-id=unknown type=VOTER_FULL)
1813+
// r10 type: AddReplica target store n2,s2 (replica-id=none type=VOTER_FULL)->(replica-id=unknown type=NON_VOTER)]
1814+
//
1815+
// This change violates the requirement that there should be a single change
1816+
// per store. Fix how this is modeled and disallow 4 changes.
1817+
//
1818+
// TODO(sumeer): allow arbitrary number of changes, but validate that at most
1819+
// one change per store.
17861820
func (cs *clusterState) preCheckOnApplyReplicaChanges(changes []ReplicaChange) error {
17871821
// preApplyReplicaChange is called before applying a change to the cluster
17881822
// state.
@@ -1833,23 +1867,16 @@ func (cs *clusterState) preCheckOnApplyReplicaChanges(changes []ReplicaChange) e
18331867
return replicaSetIsValid(copiedCurr.replicas)
18341868
}
18351869

1836-
// TODO: this is unnecessary since if we always check against the current
1837-
// state before allowing a chang to be added (including re-addition after a
1838-
// StoreLeaseholderMsg), we should never have invalidity during an undo.
1839-
// Which is why this function now panics except for the trivial cases of no
1840-
// changes or the range not existing in the cluster state.
1841-
//
1842-
// This is also justified by the current callers. If this were to return false
1843-
// in non-trivial cases, what is the caller supposed to do? These changes have
1844-
// been reflected on both the membership and load information. Undoing the
1845-
// latter is trivial since it is just subtraction of numbers. But it can't
1846-
// undo the membership changes. So we presumably have left membership in an
1847-
// inconsistent state.
1848-
18491870
// preCheckOnUndoReplicaChanges does some validation of the changes being
18501871
// proposed for undo.
18511872
//
1852-
// REQUIRES: changes is non-empty, and all changes are to the same range.
1873+
// REQUIRES: changes is non-empty; all changes are to the same range; the
1874+
// rangeState.pendingChangeNoRollback is false.
1875+
//
1876+
// This method is defensive since if we always check against the current state
1877+
// before allowing a change to be added (including re-addition after a
1878+
// StoreLeaseholderMsg), we should never have invalidity during an undo, if
1879+
// all the changes are being undone.
18531880
func (cs *clusterState) preCheckOnUndoReplicaChanges(changes []ReplicaChange) error {
18541881
if len(changes) == 0 {
18551882
panic(errors.AssertionFailedf("no changes to undo"))

pkg/kv/kvserver/allocator/mmaprototype/cluster_state_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,11 @@ func TestClusterState(t *testing.T) {
428428
return ss.status.String()
429429

430430
case "store-load-msg":
431+
// TODO(sumeer): the load-time is passed as an argument, and is
432+
// independent of ts. This is by necessity, since the load-time can
433+
// be in the past, indicating gossip delay. However, having it be
434+
// some arbitrary value can be confusing for the test reader.
435+
// Consider making it relative to ts.
431436
msg := parseStoreLoadMsg(t, d.Input)
432437
cs.processStoreLoadMsg(context.Background(), &msg)
433438
return ""

pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/multiple_ranges

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,9 @@ range-id=3 local-store=1 load=[cpu:30, write-bandwidth:10, byte-size:10] raft-cp
100100
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
101101
store-id=2 replica-id=2 type=VOTER_FULL
102102

103-
# Exercise the path that ensures consistency even though not-populated is
104-
# true.
103+
# Change the membership, while MaybeSpanConfIsPopulated=false, to ensure that
104+
# we notice the change in replicas. One of these ranges has changed the number
105+
# of replicas, and the other the replica-id of one of the store's replicas.
105106
store-leaseholder-msg
106107
store-id=1
107108
range-id=2 not-populated
@@ -111,6 +112,7 @@ store-id=1
111112
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
112113
----
113114

115+
# The ranges reflect the latest state from the StoreLeaseholderMsg.
114116
ranges
115117
----
116118
range-id=2 local-store=1 load=[cpu:20, write-bandwidth:10, byte-size:15] raft-cpu=10

pkg/kv/kvserver/allocator/mmaprototype/testdata/cluster_state/rebalance_replica

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,19 @@ set-store
2121
store-id=1 node-id=1 attrs=purple locality-tiers=region=us-west-1,zone=us-west-1a
2222
store-id=2 node-id=2 attrs=yellow locality-tiers=region=us-east-1,zone=us-east-1a
2323
----
24-
node-id=1 failure-summary=ok locality-tiers=region=us-west-1,zone=us-west-1a,node=1
25-
store-id=1 membership=full attrs=purple locality-code=1:2:3:
26-
node-id=2 failure-summary=ok locality-tiers=region=us-east-1,zone=us-east-1a,node=2
27-
store-id=2 membership=full attrs=yellow locality-code=4:5:6:
24+
node-id=1 locality-tiers=region=us-west-1,zone=us-west-1a,node=1
25+
store-id=1 attrs=purple locality-code=1:2:3:
26+
node-id=2 locality-tiers=region=us-east-1,zone=us-east-1a,node=2
27+
store-id=2 attrs=yellow locality-code=4:5:6:
2828

2929
store-load-msg
3030
store-id=1 node-id=1 load=[80,80,80] capacity=[100,100,100] secondary-load=0 load-time=0s
3131
----
3232

3333
get-load-info
3434
----
35-
store-id=1 node-id=1 reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:80, write-bandwidth:80, byte-size:80] node-reported-cpu=80 node-adjusted-cpu=80 seq=1
36-
store-id=2 node-id=2 reported=[cpu:0, write-bandwidth:0, byte-size:0] adjusted=[cpu:0, write-bandwidth:0, byte-size:0] node-reported-cpu=0 node-adjusted-cpu=0 seq=0
35+
store-id=1 node-id=1 status=ok accepting all reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:80, write-bandwidth:80, byte-size:80] node-reported-cpu=80 node-adjusted-cpu=80 seq=1
36+
store-id=2 node-id=2 status=ok accepting all reported=[cpu:0, write-bandwidth:0, byte-size:0] adjusted=[cpu:0, write-bandwidth:0, byte-size:0] node-reported-cpu=0 node-adjusted-cpu=0 seq=0
3737

3838
store-leaseholder-msg
3939
store-id=1
@@ -49,9 +49,9 @@ range-id=1 local-store=1 load=[cpu:80, write-bandwidth:80, byte-size:80] raft-cp
4949

5050
get-load-info
5151
----
52-
store-id=1 node-id=1 reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:80, write-bandwidth:80, byte-size:80] node-reported-cpu=80 node-adjusted-cpu=80 seq=1
52+
store-id=1 node-id=1 status=ok accepting all reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:80, write-bandwidth:80, byte-size:80] node-reported-cpu=80 node-adjusted-cpu=80 seq=1
5353
top-k-ranges (local-store-id=1) dim=CPURate: r1
54-
store-id=2 node-id=2 reported=[cpu:0, write-bandwidth:0, byte-size:0] adjusted=[cpu:0, write-bandwidth:0, byte-size:0] node-reported-cpu=0 node-adjusted-cpu=0 seq=0
54+
store-id=2 node-id=2 status=ok accepting all reported=[cpu:0, write-bandwidth:0, byte-size:0] adjusted=[cpu:0, write-bandwidth:0, byte-size:0] node-reported-cpu=0 node-adjusted-cpu=0 seq=0
5555

5656
make-pending-changes range-id=1
5757
rebalance-replica: remove-store-id=1 add-store-id=2
@@ -71,9 +71,9 @@ range-id=1 local-store=1 load=[cpu:80, write-bandwidth:80, byte-size:80] raft-cp
7171

7272
get-load-info
7373
----
74-
store-id=1 node-id=1 reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:0, write-bandwidth:0, byte-size:0] node-reported-cpu=80 node-adjusted-cpu=0 seq=2
74+
store-id=1 node-id=1 status=ok accepting all reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:0, write-bandwidth:0, byte-size:0] node-reported-cpu=80 node-adjusted-cpu=0 seq=2
7575
top-k-ranges (local-store-id=1) dim=CPURate: r1
76-
store-id=2 node-id=2 reported=[cpu:0, write-bandwidth:0, byte-size:0] adjusted=[cpu:88, write-bandwidth:88, byte-size:88] node-reported-cpu=0 node-adjusted-cpu=88 seq=1
76+
store-id=2 node-id=2 status=ok accepting all reported=[cpu:0, write-bandwidth:0, byte-size:0] adjusted=[cpu:88, write-bandwidth:88, byte-size:88] node-reported-cpu=0 node-adjusted-cpu=88 seq=1
7777

7878
# Same store load from s1. Results in no change.
7979
store-load-msg
@@ -82,9 +82,9 @@ store-load-msg
8282

8383
get-load-info
8484
----
85-
store-id=1 node-id=1 reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:0, write-bandwidth:0, byte-size:0] node-reported-cpu=80 node-adjusted-cpu=0 seq=4
85+
store-id=1 node-id=1 status=ok accepting all reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:0, write-bandwidth:0, byte-size:0] node-reported-cpu=80 node-adjusted-cpu=0 seq=4
8686
top-k-ranges (local-store-id=1) dim=CPURate: r1
87-
store-id=2 node-id=2 reported=[cpu:0, write-bandwidth:0, byte-size:0] adjusted=[cpu:88, write-bandwidth:88, byte-size:88] node-reported-cpu=0 node-adjusted-cpu=88 seq=1
87+
store-id=2 node-id=2 status=ok accepting all reported=[cpu:0, write-bandwidth:0, byte-size:0] adjusted=[cpu:88, write-bandwidth:88, byte-size:88] node-reported-cpu=0 node-adjusted-cpu=88 seq=1
8888

8989
# Store leaseholder msg from s1 showing that s2 has a replica but not the lease.
9090
store-leaseholder-msg
@@ -105,6 +105,7 @@ change-id=2 store-id=1 node-id=1 range-id=1 load-delta=[cpu:-80, write-bandwidth
105105
prev=(replica-id=1 type=VOTER_FULL leaseholder=true)
106106
next=(replica-id=none type=VOTER_FULL)
107107

108+
# Advance just to simulate some passage of time.
108109
tick seconds=5
109110
----
110111
t=5s
@@ -131,8 +132,8 @@ ranges
131132
# The enacted changes are still adjusting the load.
132133
get-load-info
133134
----
134-
store-id=1 node-id=1 reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:0, write-bandwidth:0, byte-size:0] node-reported-cpu=80 node-adjusted-cpu=0 seq=4
135-
store-id=2 node-id=2 reported=[cpu:0, write-bandwidth:0, byte-size:0] adjusted=[cpu:88, write-bandwidth:88, byte-size:88] node-reported-cpu=0 node-adjusted-cpu=88 seq=1
135+
store-id=1 node-id=1 status=ok accepting all reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:0, write-bandwidth:0, byte-size:0] node-reported-cpu=80 node-adjusted-cpu=0 seq=4
136+
store-id=2 node-id=2 status=ok accepting all reported=[cpu:0, write-bandwidth:0, byte-size:0] adjusted=[cpu:88, write-bandwidth:88, byte-size:88] node-reported-cpu=0 node-adjusted-cpu=88 seq=1
136137

137138
# Store load msg from s2 showing updated load.
138139
store-load-msg
@@ -141,16 +142,18 @@ store-load-msg
141142

142143
# Store load msg from s1 showing updated load.
143144
store-load-msg
144-
store-id=1 node-id=2 load=[5,5,5] capacity=[100,100,100] secondary-load=1 load-time=14s
145+
store-id=1 node-id=1 load=[5,5,5] capacity=[100,100,100] secondary-load=1 load-time=14s
145146
----
146147

147-
# Neither load is recent enough (computePendingChangesReflectedInLatestLoad
148-
# timeout) to be considered as accounting for the enacted changes. So s2
149-
# adjusted load appears very high and s1 adjusted load becomes negative.
148+
# Both of the load msgs had load-time=14s, while the enacted time was 5s.
149+
# Neither is recent enough, since lagForChangeReflectedInLoad is 10s (see
150+
# computePendingChangesReflectedInLatestLoad) to be considered as accounting
151+
# for the enacted changes. So s2 adjusted load appears very high and s1
152+
# adjusted load becomes negative.
150153
get-load-info
151154
----
152-
store-id=1 node-id=1 reported=[cpu:5, write-bandwidth:5, byte-size:5] adjusted=[cpu:-75, write-bandwidth:-75, byte-size:-75] node-reported-cpu=80 node-adjusted-cpu=-80 seq=6
153-
store-id=2 node-id=2 reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:168, write-bandwidth:168, byte-size:168] node-reported-cpu=5 node-adjusted-cpu=173 seq=3
155+
store-id=1 node-id=1 status=ok accepting all reported=[cpu:5, write-bandwidth:5, byte-size:5] adjusted=[cpu:-75, write-bandwidth:-75, byte-size:-75] node-reported-cpu=5 node-adjusted-cpu=-75 seq=6
156+
store-id=2 node-id=2 status=ok accepting all reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:168, write-bandwidth:168, byte-size:168] node-reported-cpu=80 node-adjusted-cpu=168 seq=3
154157

155158
# The enacted changes are still adjusting the load.
156159
get-pending-changes
@@ -170,15 +173,17 @@ store-load-msg
170173

171174
# Store load msg from s1 showing updated load.
172175
store-load-msg
173-
store-id=1 node-id=2 load=[5,5,5] capacity=[100,100,100] secondary-load=1 load-time=16s
176+
store-id=1 node-id=1 load=[5,5,5] capacity=[100,100,100] secondary-load=1 load-time=16s
174177
----
175178

176-
# The enacted changes are no longer adjusting the load.
179+
# Both of the load msgs had load-time=16s, while the enacted time was 5s. So
180+
# they are recent enough to be considered as accounting for the enacted
181+
# changes. The enacted changes are no longer adjusting the load.
177182
get-pending-changes
178183
----
179184
pending(0)
180185

181186
get-load-info
182187
----
183-
store-id=1 node-id=1 reported=[cpu:5, write-bandwidth:5, byte-size:5] adjusted=[cpu:5, write-bandwidth:5, byte-size:5] node-reported-cpu=80 node-adjusted-cpu=-80 seq=7
184-
store-id=2 node-id=2 reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:80, write-bandwidth:80, byte-size:80] node-reported-cpu=5 node-adjusted-cpu=165 seq=4
188+
store-id=1 node-id=1 status=ok accepting all reported=[cpu:5, write-bandwidth:5, byte-size:5] adjusted=[cpu:5, write-bandwidth:5, byte-size:5] node-reported-cpu=5 node-adjusted-cpu=5 seq=7
189+
store-id=2 node-id=2 status=ok accepting all reported=[cpu:80, write-bandwidth:80, byte-size:80] adjusted=[cpu:80, write-bandwidth:80, byte-size:80] node-reported-cpu=80 node-adjusted-cpu=80 seq=4

0 commit comments

Comments
 (0)