Skip to content

Commit dc890af

Browse files
committed
mmaprototype: thread ctx
1 parent c2c042f commit dc890af

File tree

5 files changed

+33
-28
lines changed

5 files changed

+33
-28
lines changed

pkg/kv/kvserver/allocator/mmaprototype/allocator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type ChangeOptions struct {
2929
// - changes to this interface to make the integration for the new allocator
3030
// be less different than integration with the old allocator.
3131
type Allocator interface {
32-
LoadSummaryForAllStores() string
32+
LoadSummaryForAllStores(context.Context) string
3333
Metrics() *MMAMetrics
3434
// Methods to update the state of the external world. The allocator starts
3535
// with no knowledge.

pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ func (a *allocatorState) Metrics() *MMAMetrics {
242242
return a.mmaMetrics
243243
}
244244

245-
func (a *allocatorState) LoadSummaryForAllStores() string {
246-
return a.cs.loadSummaryForAllStores()
245+
func (a *allocatorState) LoadSummaryForAllStores(ctx context.Context) string {
246+
return a.cs.loadSummaryForAllStores(ctx)
247247
}
248248

249249
var mmaid = atomic.Int64{}
@@ -291,7 +291,7 @@ func (a *allocatorState) rebalanceStores(
291291
// is storeMembershipRemoving (decommissioning). These are currently handled
292292
// via replicate_queue.go.
293293
for storeID, ss := range a.cs.stores {
294-
sls := a.cs.meansMemo.getStoreLoadSummary(clusterMeans, storeID, ss.loadSeqNum)
294+
sls := a.cs.meansMemo.getStoreLoadSummary(ctx, clusterMeans, storeID, ss.loadSeqNum)
295295
log.Dev.VInfof(ctx, 2, "evaluating s%d: node load %s, store load %s, worst dim %s",
296296
storeID, sls.nls, sls.sls, sls.worstDim)
297297

@@ -477,7 +477,7 @@ func (a *allocatorState) rebalanceStores(
477477
clear(scratchNodes)
478478
means.stores = candsPL
479479
computeMeansForStoreSet(a.cs, &means, scratchNodes)
480-
sls := a.cs.computeLoadSummary(store.StoreID, &means.storeLoad, &means.nodeLoad)
480+
sls := a.cs.computeLoadSummary(ctx, store.StoreID, &means.storeLoad, &means.nodeLoad)
481481
log.Dev.VInfof(ctx, 2, "considering lease-transfer r%v from s%v: candidates are %v", rangeID, store.StoreID, candsPL)
482482
if sls.dimSummary[CPURate] < overloadSlow {
483483
// This store is not cpu overloaded relative to these candidates for
@@ -493,7 +493,7 @@ func (a *allocatorState) rebalanceStores(
493493
cand.storeID)
494494
continue
495495
}
496-
candSls := a.cs.computeLoadSummary(cand.storeID, &means.storeLoad, &means.nodeLoad)
496+
candSls := a.cs.computeLoadSummary(ctx, cand.storeID, &means.storeLoad, &means.nodeLoad)
497497
if sls.fd != fdOK {
498498
log.Dev.VInfof(ctx, 2, "skipping store s%d: failure detection status not OK", cand.storeID)
499499
continue
@@ -685,7 +685,7 @@ func (a *allocatorState) rebalanceStores(
685685
}
686686
}
687687
// TODO(sumeer): eliminate cands allocations by passing a scratch slice.
688-
cands, ssSLS := a.computeCandidatesForRange(disj[:], storesToExcludeForRange, store.StoreID)
688+
cands, ssSLS := a.computeCandidatesForRange(ctx, disj[:], storesToExcludeForRange, store.StoreID)
689689
log.Dev.VInfof(ctx, 2, "considering replica-transfer r%v from s%v: store load %v",
690690
rangeID, store.StoreID, ss.adjusted.load)
691691
if log.V(2) {
@@ -1359,12 +1359,15 @@ func (a *allocatorState) ensureAnalyzedConstraints(rstate *rangeState) bool {
13591359
// loadSheddingStore is only specified if this candidate computation is
13601360
// happening because of overload.
13611361
func (a *allocatorState) computeCandidatesForRange(
1362-
expr constraintsDisj, storesToExclude storeIDPostingList, loadSheddingStore roachpb.StoreID,
1362+
ctx context.Context,
1363+
expr constraintsDisj,
1364+
storesToExclude storeIDPostingList,
1365+
loadSheddingStore roachpb.StoreID,
13631366
) (_ candidateSet, sheddingSLS storeLoadSummary) {
13641367
means := a.cs.meansMemo.getMeans(expr)
13651368
if loadSheddingStore > 0 {
13661369
sheddingSS := a.cs.stores[loadSheddingStore]
1367-
sheddingSLS = a.cs.meansMemo.getStoreLoadSummary(means, loadSheddingStore, sheddingSS.loadSeqNum)
1370+
sheddingSLS = a.cs.meansMemo.getStoreLoadSummary(ctx, means, loadSheddingStore, sheddingSS.loadSeqNum)
13681371
if sheddingSLS.sls <= loadNoChange && sheddingSLS.nls <= loadNoChange {
13691372
// In this set of stores, this store no longer looks overloaded.
13701373
return candidateSet{}, sheddingSLS
@@ -1378,7 +1381,7 @@ func (a *allocatorState) computeCandidatesForRange(
13781381
continue
13791382
}
13801383
ss := a.cs.stores[storeID]
1381-
csls := a.cs.meansMemo.getStoreLoadSummary(means, storeID, ss.loadSeqNum)
1384+
csls := a.cs.meansMemo.getStoreLoadSummary(ctx, means, storeID, ss.loadSeqNum)
13821385
if csls.fd != fdOK {
13831386
continue
13841387
}

pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,7 +1404,7 @@ func (cs *clusterState) processStoreLeaseholderMsgInternal(
14041404
ss.adjusted.topKRanges[msg.StoreID] = topk
14051405
}
14061406
topk.startInit()
1407-
sls := cs.computeLoadSummary(ss.StoreID, &clusterMeans.storeLoad, &clusterMeans.nodeLoad)
1407+
sls := cs.computeLoadSummary(ctx, ss.StoreID, &clusterMeans.storeLoad, &clusterMeans.nodeLoad)
14081408
if ss.StoreID == localss.StoreID {
14091409
topk.dim = CPURate
14101410
} else {
@@ -1970,7 +1970,7 @@ func (cs *clusterState) canShedAndAddLoad(
19701970
// Or maybe CPU is the only dimension that matters at the node level. It feels
19711971
// sloppy/confusing though.
19721972
targetNS.adjustedCPU += deltaToAdd[CPURate]
1973-
targetSLS := computeLoadSummary(targetSS, targetNS, &means.storeLoad, &means.nodeLoad)
1973+
targetSLS := computeLoadSummary(ctx, targetSS, targetNS, &means.storeLoad, &means.nodeLoad)
19741974
// Undo the addition.
19751975
targetSS.adjusted.load.subtract(deltaToAdd)
19761976
targetNS.adjustedCPU -= deltaToAdd[CPURate]
@@ -1979,7 +1979,7 @@ func (cs *clusterState) canShedAndAddLoad(
19791979
srcNS := cs.nodes[srcSS.NodeID]
19801980
srcSS.adjusted.load.subtract(delta)
19811981
srcNS.adjustedCPU -= delta[CPURate]
1982-
srcSLS := computeLoadSummary(srcSS, srcNS, &means.storeLoad, &means.nodeLoad)
1982+
srcSLS := computeLoadSummary(ctx, srcSS, srcNS, &means.storeLoad, &means.nodeLoad)
19831983
// Undo the removal.
19841984
srcSS.adjusted.load.add(delta)
19851985
srcNS.adjustedCPU += delta[CPURate]
@@ -2141,39 +2141,39 @@ func (cs *clusterState) canShedAndAddLoad(
21412141
}
21422142

21432143
func (cs *clusterState) computeLoadSummary(
2144-
storeID roachpb.StoreID, msl *meanStoreLoad, mnl *meanNodeLoad,
2144+
ctx context.Context, storeID roachpb.StoreID, msl *meanStoreLoad, mnl *meanNodeLoad,
21452145
) storeLoadSummary {
21462146
ss := cs.stores[storeID]
21472147
ns := cs.nodes[ss.NodeID]
2148-
return computeLoadSummary(ss, ns, msl, mnl)
2148+
return computeLoadSummary(ctx, ss, ns, msl, mnl)
21492149
}
21502150

21512151
// TODO(wenyihu6): check to make sure obs here is correct
2152-
func (cs *clusterState) loadSummaryForAllStores() string {
2152+
func (cs *clusterState) loadSummaryForAllStores(ctx context.Context) string {
21532153
var b strings.Builder
21542154
clusterMeans := cs.meansMemo.getMeans(nil)
21552155
b.WriteString(fmt.Sprintf("cluster means: (stores-load %s) (stores-capacity %s)\n",
21562156
clusterMeans.storeLoad.load, clusterMeans.storeLoad.capacity))
21572157
b.WriteString(fmt.Sprintf("(nodes-cpu-load %d) (nodes-cpu-capacity %d)\n",
21582158
clusterMeans.nodeLoad.loadCPU, clusterMeans.nodeLoad.capacityCPU))
21592159
for storeID, ss := range cs.stores {
2160-
sls := cs.meansMemo.getStoreLoadSummary(clusterMeans, storeID, ss.loadSeqNum)
2160+
sls := cs.meansMemo.getStoreLoadSummary(ctx, clusterMeans, storeID, ss.loadSeqNum)
21612161
b.WriteString(fmt.Sprintf("evaluating store s%d for shedding: load summary %v", storeID, sls))
21622162
}
21632163
return b.String()
21642164
}
21652165

21662166
func computeLoadSummary(
2167-
ss *storeState, ns *nodeState, msl *meanStoreLoad, mnl *meanNodeLoad,
2167+
ctx context.Context, ss *storeState, ns *nodeState, msl *meanStoreLoad, mnl *meanNodeLoad,
21682168
) storeLoadSummary {
21692169
sls := loadLow
21702170
var highDiskSpaceUtil bool
21712171
var dimSummary [NumLoadDimensions]loadSummary
21722172
var worstDim LoadDimension
21732173
for i := range msl.load {
21742174
// TODO(kvoli,sumeerbhola): Handle negative adjusted store/node loads.
2175-
ls := loadSummaryForDimension(
2176-
ss.StoreID, 0 /*NodeID(for logging)*/, LoadDimension(i), ss.adjusted.load[i], ss.capacity[i], msl.load[i], msl.util[i])
2175+
ls := loadSummaryForDimension(ctx, ss.StoreID, 0, LoadDimension(i), ss.adjusted.load[i], ss.capacity[i],
2176+
msl.load[i], msl.util[i])
21772177
if ls > sls {
21782178
sls = ls
21792179
worstDim = LoadDimension(i)
@@ -2184,7 +2184,7 @@ func computeLoadSummary(
21842184
highDiskSpaceUtil = highDiskSpaceUtilization(ss.adjusted.load[i], ss.capacity[i])
21852185
}
21862186
}
2187-
nls := loadSummaryForDimension(0 /*StoreID(for logging)*/, ns.NodeID, CPURate, ns.adjustedCPU, ns.CapacityCPU, mnl.loadCPU, mnl.utilCPU)
2187+
nls := loadSummaryForDimension(ctx, 0, ns.NodeID, CPURate, ns.adjustedCPU, ns.CapacityCPU, mnl.loadCPU, mnl.utilCPU)
21882188
return storeLoadSummary{
21892189
worstDim: worstDim,
21902190
sls: sls,

pkg/kv/kvserver/allocator/mmaprototype/load.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func (mm *meansMemo) clear() {
328328
type loadInfoProvider interface {
329329
getStoreReportedLoad(roachpb.StoreID) (roachpb.NodeID, *storeLoad)
330330
getNodeReportedLoad(roachpb.NodeID) *NodeLoad
331-
computeLoadSummary(roachpb.StoreID, *meanStoreLoad, *meanNodeLoad) storeLoadSummary
331+
computeLoadSummary(context.Context, roachpb.StoreID, *meanStoreLoad, *meanNodeLoad) storeLoadSummary
332332
}
333333

334334
// getMeans returns the means for an expression.
@@ -347,13 +347,13 @@ func (mm *meansMemo) getMeans(expr constraintsDisj) *meansForStoreSet {
347347
// the given set (encoded in means). It attempts to utilize a cached value if
348348
// curLoadSeqNum permits.
349349
func (mm *meansMemo) getStoreLoadSummary(
350-
means *meansForStoreSet, storeID roachpb.StoreID, curLoadSeqNum uint64,
350+
ctx context.Context, means *meansForStoreSet, storeID roachpb.StoreID, curLoadSeqNum uint64,
351351
) storeLoadSummary {
352352
summary, ok := means.tryGetStoreLoadSummary(storeID, curLoadSeqNum)
353353
if ok {
354354
return summary
355355
}
356-
summary = mm.loadInfoProvider.computeLoadSummary(storeID, &means.storeLoad, &means.nodeLoad)
356+
summary = mm.loadInfoProvider.computeLoadSummary(ctx, storeID, &means.storeLoad, &means.nodeLoad)
357357
means.putStoreLoadSummary(storeID, summary)
358358
return summary
359359
}
@@ -465,6 +465,7 @@ func (ls loadSummary) SafeFormat(w redact.SafePrinter, _ rune) {
465465

466466
// Computes the loadSummary for a particular load dimension.
467467
func loadSummaryForDimension(
468+
ctx context.Context,
468469
storeID roachpb.StoreID,
469470
nodeID roachpb.NodeID,
470471
dim LoadDimension,
@@ -509,10 +510,10 @@ func loadSummaryForDimension(
509510
defer func() {
510511
if log.V(2) {
511512
if storeID == 0 {
512-
log.Dev.Infof(context.Background(), "n%d[%v]: load=%d, mean_load=%d, fraction above=%.2f, load_summary=%v",
513+
log.Dev.Infof(ctx, "n%d[%v]: load=%d, mean_load=%d, fraction above=%.2f, load_summary=%v",
513514
nodeID, dim, load, meanLoad, fractionAbove, summary)
514515
} else {
515-
log.Dev.Infof(context.Background(), "s%d[%v]: load=%d, mean_load=%d, fraction above=%.2f, load_summary=%v",
516+
log.Dev.Infof(ctx, "s%d[%v]: load=%d, mean_load=%d, fraction above=%.2f, load_summary=%v",
516517
storeID, dim, load, meanLoad, fractionAbove, summary)
517518
}
518519
}

pkg/kv/kvserver/allocator/mmaprototype/load_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package mmaprototype
77

88
import (
9+
"context"
910
"fmt"
1011
"strings"
1112
"testing"
@@ -43,7 +44,7 @@ func (p *testLoadInfoProvider) getNodeReportedLoad(nodeID roachpb.NodeID) *NodeL
4344
}
4445

4546
func (p *testLoadInfoProvider) computeLoadSummary(
46-
roachpb.StoreID, *meanStoreLoad, *meanNodeLoad,
47+
context.Context, roachpb.StoreID, *meanStoreLoad, *meanNodeLoad,
4748
) storeLoadSummary {
4849
fmt.Fprintf(&p.b, "called computeLoadSummary: returning seqnum %d", p.returnedLoadSeqNum)
4950
return storeLoadSummary{
@@ -161,7 +162,7 @@ func TestMeansMemo(t *testing.T) {
161162
var loadSeqNum uint64
162163
d.ScanArgs(t, "load-seq-num", &loadSeqNum)
163164
loadProvider.returnedLoadSeqNum = loadSeqNum
164-
_ = mm.getStoreLoadSummary(mss, roachpb.StoreID(storeID), loadSeqNum)
165+
_ = mm.getStoreLoadSummary(context.Background(), mss, roachpb.StoreID(storeID), loadSeqNum)
165166
rv := loadProvider.b.String()
166167
loadProvider.b.Reset()
167168
return rv

0 commit comments

Comments
 (0)