Skip to content

Commit 9d7f785

Browse files
committed
asim: integrate allocatorsync with lease and replicate queue properly
This commit integrates lease and replicate queue changes with the mma allocator properly by calling allocatorSync.PreApply* and allocatorSync.PostApply, completing the allocator integration. Some data-driven test outputs changed because allocatorSync.PostApply now updates store pool after operations (such as UpdateLocalStoresAfterLeaseTransfer), which was missing previously in asim. It’s unclear if this was an oversight or previously seemed as unnecessary. More reading is needed. Epic: none Release note: none
1 parent d727567 commit 9d7f785

File tree

10 files changed

+228
-185
lines changed

10 files changed

+228
-185
lines changed

pkg/kv/kvserver/asim/queue/lease_queue.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ import (
2525
type leaseQueue struct {
2626
baseQueue
2727
plan.ReplicaPlanner
28-
storePool storepool.AllocatorStorePool
29-
planner plan.ReplicationPlanner
30-
clock *hlc.Clock
31-
settings *config.SimulationSettings
32-
as *mmaprototypehelpers.AllocatorSync
28+
storePool storepool.AllocatorStorePool
29+
planner plan.ReplicationPlanner
30+
clock *hlc.Clock
31+
settings *config.SimulationSettings
32+
as *mmaprototypehelpers.AllocatorSync
33+
lastSyncChangeID mmaprototypehelpers.SyncChangeID
3334
}
3435

3536
// NewLeaseQueue returns a new lease queue.
@@ -119,6 +120,11 @@ func (lq *leaseQueue) Tick(ctx context.Context, tick time.Time, s state.State) {
119120
lq.next = lq.lastTick
120121
}
121122

123+
if !tick.Before(lq.next) && lq.lastSyncChangeID.IsValid() {
124+
lq.as.PostApply(ctx, lq.lastSyncChangeID, true /* success */)
125+
lq.lastSyncChangeID = mmaprototypehelpers.InvalidSyncChangeID
126+
}
127+
122128
for !tick.Before(lq.next) && lq.priorityQueue.Len() != 0 {
123129
item := heap.Pop(lq).(*replicaItem)
124130
if item == nil {
@@ -158,8 +164,8 @@ func (lq *leaseQueue) Tick(ctx context.Context, tick time.Time, s state.State) {
158164
continue
159165
}
160166

161-
lq.next = pushReplicateChange(
162-
ctx, change, repl, tick, lq.settings.ReplicaChangeDelayFn(), lq.baseQueue.stateChanger)
167+
lq.next, lq.lastSyncChangeID = pushReplicateChange(
168+
ctx, change, repl, tick, lq.settings.ReplicaChangeDelayFn(), lq.baseQueue.stateChanger, lq.as)
163169
}
164170

165171
lq.lastTick = tick

pkg/kv/kvserver/asim/queue/replicate_queue.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ import (
2424

2525
type replicateQueue struct {
2626
baseQueue
27-
planner plan.ReplicationPlanner
28-
clock *hlc.Clock
29-
settings *config.SimulationSettings
30-
as *mmaprototypehelpers.AllocatorSync
27+
planner plan.ReplicationPlanner
28+
clock *hlc.Clock
29+
settings *config.SimulationSettings
30+
as *mmaprototypehelpers.AllocatorSync
31+
lastSyncChangeID mmaprototypehelpers.SyncChangeID
3132
}
3233

3334
// NewReplicateQueue returns a new replicate queue.
@@ -118,6 +119,11 @@ func (rq *replicateQueue) Tick(ctx context.Context, tick time.Time, s state.Stat
118119
rq.next = rq.lastTick
119120
}
120121

122+
if !tick.Before(rq.next) && rq.lastSyncChangeID.IsValid() {
123+
rq.as.PostApply(ctx, rq.lastSyncChangeID, true /* success */)
124+
rq.lastSyncChangeID = mmaprototypehelpers.InvalidSyncChangeID
125+
}
126+
121127
for !tick.Before(rq.next) && rq.priorityQueue.Len() != 0 {
122128
item := heap.Pop(rq).(*replicaItem)
123129
if item == nil {
@@ -152,8 +158,8 @@ func (rq *replicateQueue) Tick(ctx context.Context, tick time.Time, s state.Stat
152158
continue
153159
}
154160

155-
rq.next = pushReplicateChange(
156-
ctx, change, repl, tick, rq.settings.ReplicaChangeDelayFn(), rq.baseQueue.stateChanger)
161+
rq.next, rq.lastSyncChangeID = pushReplicateChange(
162+
ctx, change, repl, tick, rq.settings.ReplicaChangeDelayFn(), rq.baseQueue.stateChanger, rq.as)
157163
}
158164

159165
rq.lastTick = tick
@@ -166,29 +172,51 @@ func pushReplicateChange(
166172
tick time.Time,
167173
delayFn func(int64, bool) time.Duration,
168174
stateChanger state.Changer,
169-
) time.Time {
175+
as *mmaprototypehelpers.AllocatorSync,
176+
) (time.Time, mmaprototypehelpers.SyncChangeID) {
170177
var stateChange state.Change
178+
var changeID mmaprototypehelpers.SyncChangeID
171179
next := tick
172180
switch op := change.Op.(type) {
173181
case plan.AllocationNoop:
174182
// Nothing to do.
175-
return next
183+
return next, mmaprototypehelpers.InvalidSyncChangeID
176184
case plan.AllocationFinalizeAtomicReplicationOp:
177185
panic("unimplemented finalize atomic replication op")
178186
case plan.AllocationTransferLeaseOp:
187+
if as != nil {
188+
// as may be nil in some tests.
189+
changeID = as.NonMMAPreTransferLease(
190+
ctx,
191+
repl.Desc(),
192+
repl.RangeUsageInfo(),
193+
op.Source,
194+
op.Target,
195+
mmaprototypehelpers.ReplicateQueue,
196+
)
197+
}
179198
stateChange = &state.LeaseTransferChange{
180199
RangeID: state.RangeID(change.Replica.GetRangeID()),
181200
TransferTarget: state.StoreID(op.Target.StoreID),
182201
Author: state.StoreID(op.Source.StoreID),
183202
Wait: delayFn(repl.rng.Size(), false /* add */),
184203
}
185204
case plan.AllocationChangeReplicasOp:
186-
log.VEventf(ctx, 1, "pushing state change for range=%s, details=%s", repl.rng, op.Details)
205+
if as != nil {
206+
// as may be nil in some tests.
207+
changeID = as.NonMMAPreChangeReplicas(
208+
ctx,
209+
repl.Desc(),
210+
repl.RangeUsageInfo(),
211+
op.Chgs,
212+
repl.StoreID(), /* leaseholder */
213+
)
214+
}
187215
stateChange = &state.ReplicaChange{
188216
RangeID: state.RangeID(change.Replica.GetRangeID()),
189217
Changes: op.Chgs,
190218
Author: state.StoreID(op.LeaseholderStore),
191-
Wait: delayFn(repl.rng.Size(), true),
219+
Wait: delayFn(repl.rng.Size(), true /* add */),
192220
}
193221
default:
194222
panic(fmt.Sprintf("Unknown operation %+v, unable to create state change", op))
@@ -199,6 +227,8 @@ func pushReplicateChange(
199227
next = completeAt
200228
} else {
201229
log.VEventf(ctx, 1, "pushing state change failed")
230+
as.PostApply(ctx, changeID, false /* success */)
231+
changeID = mmaprototypehelpers.InvalidSyncChangeID
202232
}
203-
return next
233+
return next, changeID
204234
}

pkg/kv/kvserver/asim/tests/testdata/non_rand/example_add_node.txt

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,22 +44,22 @@ plot stat=replicas sample=1
4444
----
4545
301 ┼───────────────────────────────────────────────────────────────────────────────
4646
281 ┤ ╭
47-
261 ┤ ╭─╭───────
48-
241 ┤ ╭─────
49-
221 ┤ ╭────╯
50-
201 ┤ ╭────╯
51-
181 ┤ ╭──────╯
52-
161 ┤ ╭────
53-
140 ┤ ╭───╯
54-
120 ┤ ╭──
55-
100 ┤ ╭────
56-
80 ┤ ╭──
57-
60 ┤ ╭────╯
58-
40 ┤ ╭────────╯
59-
20 ┤ ╭───────
60-
0 ┼────
47+
261 ┤ ╭────────
48+
241 ┤ ╭─────
49+
221 ┤ ╭────╯
50+
201 ┤ ╭────╯
51+
181 ┤ ────╯
52+
161 ┤ ╭────
53+
140 ┤ ──╯
54+
120 ┤ ╭────
55+
100 ┤ ╭──
56+
80 ┤ ╭──
57+
60 ┤ ╭────╯
58+
40 ┤ ───────╯
59+
20 ┤ ╭─────────
60+
0 ┼────
6161
replicas
6262
initial store values: [s1=301, s2=0, s3=0] (stddev=141.89, mean=100.33, sum=301)
63-
last store values: [s1=301, s2=272, s3=265] (stddev=15.58, mean=279.33, sum=838)
63+
last store values: [s1=301, s2=271, s3=267] (stddev=15.17, mean=279.67, sum=839)
6464

6565
# vim:ft=sh

pkg/kv/kvserver/asim/tests/testdata/non_rand/example_fulldisk.txt

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,34 +23,34 @@ OK
2323
# store 5. This is shown below as it sheds replicas.
2424
plot stat=replicas
2525
----
26-
344╭╮ ╭╮╭╭╮╮╭─╮╮╭╮─╭─╮╭──
27-
331 ╭╮ ╭╮╭─╮╭╮──╭──╮╭───────────────────────╯╰─╯─╰─╯╰────╯╯─
28-
318 ┤ ╭───╭─╮╮╭────────────────╯╯╰╯╰──╯╰╰╰╯╯ ╰╯
29-
305 ┼╮──────╯╯╰╯╯
30-
293 ┤╰╮
31-
280 ┤ ╰─╮
32-
267 ┤ ╰╮
33-
255 ┤ ╰───╮
34-
242 ╰─────╮
35-
229─────╮
36-
217╰───────╮
37-
204 ┤ ╰──────────
38-
191 ╰────╮
39-
178 ╰───────────╮ ╭
40-
166╰─╯╰──────╮╭──╮ ╭╮
41-
153 ┤ ╰╰─╯╰────
26+
341 ╭─╮ ╭╮╭─╭──
27+
328 ┤ ╭╮ ╭╮╭╮╭╭╮╮╭─────────────────────────────────────╯╰──╯╰╯
28+
316 ┤ ╭──╮╭──╮╭──────────╯╰────╯╰─╯╰─╯─╰╯──╯╰╯
29+
303 ┼╮──────╯╯╰╰╯ ╰╯╯
30+
290 ┤╰╮
31+
277 ┤ ╰─╮
32+
265 ┤ ╰╮
33+
252 ┤ ╰───╮
34+
239────╮
35+
226╰─────────╮
36+
214──────╮
37+
201 ╰─────╮╭
38+
188╰╯╰─────╮╭──╮
39+
175╰╯ ╰─────────╮
40+
163 ╰────────╮ ╭─╮ ╭
41+
150 ┤ ╰─╯ ╰─╯───
4242
replicas
4343
initial store values: [s1=300, s2=300, s3=300, s4=300, s5=300] (stddev=0.00, mean=300.00, sum=1500)
44-
last store values: [s1=342, s2=336, s3=339, s4=339, s5=153] (stddev=74.42, mean=301.80, sum=1509)
44+
last store values: [s1=339, s2=340, s3=338, s4=338, s5=157] (stddev=72.70, mean=302.40, sum=1512)
4545

4646
# Plot the % of disk storage capacity used. We should see s5 hovering right
4747
# around 92.5-95% (the storage capacity threshold value).
4848
plot stat=disk_fraction_used
4949
----
5050
1.05 ┼─╮
5151
0.99 ┤ ╰─╮
52-
0.94 ┤ ╰─────────────────╮╭──╮╭──╮╭────────────────────╮╭────╮ ╭──────╮╭──╮╭─────
53-
0.88 ┤ ╰╯ ╰╯ ╰╯ ╰╯ ╰─╯ ╯ ╰╯
52+
0.94 ┤ ╰─────────────╮╭───────────╮╭────╮╭──────╮╭─────────────────╮╭──╮ ╭─╮ ╭──╮╭
53+
0.88 ┤ ╰╯ ╰╯ ╰╯ ╰╯ ╰╯ ╰─╯ ╰─╯ ╰╯
5454
0.82 ┤
5555
0.77 ┤
5656
0.71 ┤
@@ -59,10 +59,10 @@ plot stat=disk_fraction_used
5959
0.54 ┤
6060
0.49 ┤
6161
0.43 ┤
62-
0.37 ┤ ╭╭╭────────────────
63-
0.32 ┤ ╭╭─────────────────────╯
64-
0.26 ┤ ╭───────────────────────╯╯
65-
0.20 ┼────────────────╯╯
62+
0.37 ┤ ╭─────────────────
63+
0.32 ┤ ╭╭─────────────────────╯
64+
0.26 ┤ ╭─────────────────────╯╯
65+
0.20 ┼────────────────╯╯
6666
disk_fraction_used
6767
initial store values: [s1=0.20, s2=0.20, s3=0.20, s4=0.20, s5=1.05] (stddev=0.34, mean=0.37, sum=2)
68-
last store values: [s1=0.41, s2=0.40, s3=0.41, s4=0.41, s5=0.94] (stddev=0.22, mean=0.51, sum=3)
68+
last store values: [s1=0.41, s2=0.41, s3=0.41, s4=0.41, s5=0.96] (stddev=0.22, mean=0.52, sum=3)

pkg/kv/kvserver/asim/tests/testdata/non_rand/example_io_overload.txt

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ OK
1818
# solid line at 0, which will be s5's replica count.
1919
plot stat=replicas
2020
----
21-
500 ┼───╮──╭─╮
22-
467 ┤ ╰──╯─╰──────────╮╮ ╭─╮
23-
433 ┤ ╰──╰────────────╮╮
24-
400 ┤ ╰╰───────────╮─╮ ╭╮ ╭╮╭──╮╭───╮─╮
25-
367 ┤ ╰─╭─────╯╰──╯╰╯──╰╯───────────────
21+
500 ┼──────╮
22+
467 ┤ ╰────────────╮
23+
433 ┤ ╰────────────╮
24+
400 ┤ ╰╰────────────╮
25+
367 ┤ ╭────────────────────────────────
2626
333 ┤ ╭────╯
2727
300 ┤ ╭───╯
2828
267 ┤ ╭───╯
@@ -36,26 +36,26 @@ plot stat=replicas
3636
0 ┼───────────────────────────────────────────────────────────────────────────────
3737
replicas
3838
initial store values: [s1=500, s2=500, s3=500, s4=0, s5=0] (stddev=244.95, mean=300.00, sum=1500)
39-
last store values: [s1=372, s2=381, s3=371, s4=376, s5=0] (stddev=150.04, mean=300.00, sum=1500)
39+
last store values: [s1=375, s2=375, s3=375, s4=375, s5=0] (stddev=150.00, mean=300.00, sum=1500)
4040

4141
plot stat=leases
4242
----
43-
500 ┼─────╮
44-
467 ┤ ─────────╮
45-
433 ┤ ─────────╮
46-
400 ┤ ─────────╮
47-
367 ┤ ╰───────────╮
48-
333 ┤ ╰─────╮
49-
300 ┤ ╰──────╮
50-
267 ┤ ╰─────────
51-
233 ┤ ╰────────
52-
200 ┤
43+
500 ┼─────
44+
467 ┤ ╰────────────╮
45+
433 ┤ ╰───────────╮
46+
400 ┤ ╰───────────╮
47+
367 ┤ ───────╮
48+
333 ┤ ────╮
49+
300 ┤ ───╮
50+
267 ┤ ╰───╮
51+
233 ┤ ╰─────
52+
200 ┤ ╰────────
5353
167 ┤
54-
133 ┤ ╭╮ ╭──────────────────────╮
55-
100 ┤ ╭─────────────╯╰─╯ ╰─────────────
56-
67 ┤ ╭─────────╯ ╭───╭──────────────
57-
33 ┤ ╭─────────╯ ╭╭────────────╯
54+
133 ┤
55+
100 ┤ ╭───────────────────────────────────────────────
56+
67 ┤ ╭────────────╭─╭────────
57+
33 ┤ ╭───────────╭╭────────╯
5858
0 ┼───────────────────────────────────────────────────────────────────────────────
5959
leases
6060
initial store values: [s1=500, s2=0, s3=0, s4=0, s5=0] (stddev=200.00, mean=100.00, sum=500)
61-
last store values: [s1=230, s2=98, s3=67, s4=105, s5=0] (stddev=74.86, mean=100.00, sum=500)
61+
last store values: [s1=200, s2=100, s3=100, s4=100, s5=0] (stddev=63.25, mean=100.00, sum=500)

0 commit comments

Comments
 (0)