Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/allocator/mmaprototype/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type Allocator interface {
// associated node in the cluster.
ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoadMsg)

// UpdateStoresStatuses updates the health and disposition for the stores in
// storeStatuses. Stores unknown to the allocator are ignored with logging.
// to the allocator are ignored with logging.
UpdateStoresStatuses(ctx context.Context, storeStatuses map[roachpb.StoreID]Status)

// Methods related to making changes.

// AdjustPendingChangeDisposition is optional feedback to inform the
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@ func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoad
a.cs.processStoreLoadMsg(ctx, msg)
}

// UpdateStoresStatus implements the Allocator interface.
func (a *allocatorState) UpdateStoresStatuses(
ctx context.Context, storeStatuses map[roachpb.StoreID]Status,
) {
a.mu.Lock()
defer a.mu.Unlock()
a.cs.updateStoreStatuses(ctx, storeStatuses)
}

// AdjustPendingChangeDisposition implements the Allocator interface.
func (a *allocatorState) AdjustPendingChangeDisposition(
ctx context.Context, change ExternalRangeChange, success bool,
Expand Down
31 changes: 21 additions & 10 deletions pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2180,16 +2180,10 @@ func (cs *clusterState) setStore(sal storeAttributesAndLocalityWithNodeTier) {
if !ok {
// This is the first time seeing this store.
ss := newStoreState()
// TODO(tbg): below is what we should be doing once asim and production code actually
// have a way to update the health status. For now, we just set it to healthy initially
// and that's where it will stay (outside of unit tests).
//
// At this point, the store's health is unknown. It will need to be marked
// as healthy separately. Until we know more, we won't place leases or
// replicas on it (nor will we try to shed any that are already reported to
// have replicas on it).
// ss.status = MakeStatus(HealthUnknown, LeaseDispositionRefusing, ReplicaDispositionRefusing)
ss.status = MakeStatus(HealthOK, LeaseDispositionOK, ReplicaDispositionOK)
// At this point, the store's health is unknown. It will be updated by cs.updateStoreStatuses. Until we know more, we
// won't place leases or replicas on it (nor will we try to shed any that
// are already reported to have replicas on it).
ss.status = MakeStatus(HealthUnknown, LeaseDispositionRefusing, ReplicaDispositionRefusing)
ss.overloadStartTime = cs.ts.Now()
ss.overloadEndTime = cs.ts.Now()
cs.stores[sal.StoreID] = ss
Expand All @@ -2216,6 +2210,23 @@ func (cs *clusterState) setStore(sal storeAttributesAndLocalityWithNodeTier) {
}
}

// updateStoreStatuses updates each known store's health and disposition from storeStatuses.
// Stores unknown in mma yet but are known to store pool are ignored with logging.
func (cs *clusterState) updateStoreStatuses(
ctx context.Context, storeStatuses map[roachpb.StoreID]Status,
) {
for storeID, storeStatus := range storeStatuses {
if _, ok := cs.stores[storeID]; !ok {
// Store not known to mma yet but is known to store pool - ignore the update. The store will be added via
// setStore when gossip arrives, and then subsequent status updates will
// take effect.
log.KvDistribution.Infof(ctx, "store %d not found in cluster state, skipping update", storeID)
continue
}
cs.stores[storeID].status = storeStatus
}
}

//======================================================================
// clusterState accessors:
//
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
# This test verifies mma's candidate exclusion and shedding behavior based on
# store status. Note that this test does not verify the correctness of the
# actual translation from store pool status to mma status. That is tested in
# the mmaintegration package.
#
# Setup: 3 stores
# - s1: source store (overloaded, wants to shed)
# - s2: always available (good target)
# - s3: test store (status changes to test each scenario)
set-store
store-id=1 node-id=1
store-id=2 node-id=2
store-id=3 node-id=3
----
node-id=1 locality-tiers=node=1
store-id=1 attrs=
node-id=2 locality-tiers=node=2
store-id=2 attrs=
node-id=3 locality-tiers=node=3
store-id=3 attrs=

# s1 is overloaded, s2 and s3 are low load
store-load-msg
store-id=1 node-id=1 load=[1000,0,0] capacity=[1000,1000,1000] secondary-load=0 load-time=0s
store-id=2 node-id=2 load=[100,0,0] capacity=[1000,1000,1000] secondary-load=0 load-time=0s
store-id=3 node-id=3 load=[100,0,0] capacity=[1000,1000,1000] secondary-load=0 load-time=0s
----

# Range r1: lease on s1, replicas on s1, s2, s3
store-leaseholder-msg
store-id=1
range-id=1 load=[100,0,0] raft-cpu=100
store-id=1 replica-id=1 type=VOTER_FULL leaseholder=true
store-id=2 replica-id=2 type=VOTER_FULL
store-id=3 replica-id=3 type=VOTER_FULL
config=num_replicas=3 constraints={} voter_constraints={}
----

# Baseline: all stores available
retain-ready-replica-target-stores-only in=(2,3)
----
[2 3]

retain-ready-lease-target-stores-only in=(2,3) range-id=1
----
[2 3]

# Dead: excluded from all targets (shedding leases, shedding replicas)
set-store-status store-id=3 health=dead leases=shedding replicas=shedding
----
dead shedding=leases,replicas

retain-ready-replica-target-stores-only in=(2,3)
----
skipping s3 for replica transfer: replica disposition shedding (health dead)
[2]

retain-ready-lease-target-stores-only in=(2,3) range-id=1
----
skipping s3 for lease transfer: lease disposition shedding (health dead)
[2]

# Unknown: excluded from all targets (refusing leases, refusing replicas)
set-store-status store-id=3 health=unknown leases=refusing replicas=refusing
----
unknown refusing=leases,replicas

retain-ready-replica-target-stores-only in=(2,3)
----
skipping s3 for replica transfer: replica disposition refusing (health unknown)
[2]

retain-ready-lease-target-stores-only in=(2,3) range-id=1
----
skipping s3 for lease transfer: lease disposition refusing (health unknown)
[2]

# Decommissioning: excluded from all targets (shedding leases, shedding replicas)
set-store-status store-id=3 health=ok leases=shedding replicas=shedding
----
ok shedding=leases,replicas

retain-ready-replica-target-stores-only in=(2,3)
----
skipping s3 for replica transfer: replica disposition shedding (health ok)
[2]

retain-ready-lease-target-stores-only in=(2,3) range-id=1
----
skipping s3 for lease transfer: lease disposition shedding (health ok)
[2]

# Draining: excluded from all targets (shedding leases, refusing replicas)
set-store-status store-id=3 health=ok leases=shedding replicas=refusing
----
ok refusing=replicas shedding=leases

retain-ready-replica-target-stores-only in=(2,3)
----
skipping s3 for replica transfer: replica disposition refusing (health ok)
[2]

retain-ready-lease-target-stores-only in=(2,3) range-id=1
----
skipping s3 for lease transfer: lease disposition shedding (health ok)
[2]

# Suspect: excluded from all targets (shedding leases, refusing replicas)
set-store-status store-id=3 health=unhealthy leases=shedding replicas=refusing
----
unhealthy refusing=replicas shedding=leases

retain-ready-replica-target-stores-only in=(2,3)
----
skipping s3 for replica transfer: replica disposition refusing (health unhealthy)
[2]

retain-ready-lease-target-stores-only in=(2,3) range-id=1
----
skipping s3 for lease transfer: lease disposition shedding (health unhealthy)
[2]

# Throttled: excluded from replica targets, can receive leases
set-store-status store-id=3 health=ok leases=ok replicas=refusing
----
ok refusing=replicas

retain-ready-replica-target-stores-only in=(2,3)
----
skipping s3 for replica transfer: replica disposition refusing (health ok)
[2]

retain-ready-lease-target-stores-only in=(2,3) range-id=1
----
[2 3]

# Available: accepts everything
set-store-status store-id=3 health=ok leases=ok replicas=ok
----
ok accepting all

retain-ready-replica-target-stores-only in=(2,3)
----
[2 3]

retain-ready-lease-target-stores-only in=(2,3) range-id=1
----
[2 3]

# Rebalance test: verify s3 (dead) is excluded during actual rebalance
set-store-status store-id=3 health=dead leases=shedding replicas=shedding
----
dead shedding=leases,replicas

rebalance-stores store-id=1
----
[mmaid=1] rebalanceStores begins
[mmaid=1] cluster means: (stores-load [cpu:400ns/s, write-bandwidth:0 B/s, byte-size:0 B]) (stores-capacity [cpu:1µs/s, write-bandwidth:1.0 kB/s, byte-size:1.0 kB]) (nodes-cpu-load 400) (nodes-cpu-capacity 1000)
[mmaid=1] load summary for dim=CPURate (s1): overloadUrgent, reason: fractionUsed > 90% [load=1000 meanLoad=400 fractionUsed=100.00% meanUtil=40.00% capacity=1000]
[mmaid=1] load summary for dim=WriteBandwidth (s1): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=ByteSize (s1): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=CPURate (n1): overloadUrgent, reason: fractionUsed > 90% [load=1000 meanLoad=400 fractionUsed=100.00% meanUtil=40.00% capacity=1000]
[mmaid=1] evaluating s1: node load overloadUrgent, store load overloadUrgent, worst dim CPURate
[mmaid=1] overload-continued s1 ((store=overloadUrgent worst=CPURate cpu=overloadUrgent writes=loadNormal bytes=loadNormal node=overloadUrgent high_disk=false frac_pending=0.00,0.00(true))) - within grace period
[mmaid=1] store s1 was added to shedding store list
[mmaid=1] load summary for dim=CPURate (s2): loadLow, reason: load is >10% below mean [load=100 meanLoad=400 fractionUsed=10.00% meanUtil=40.00% capacity=1000]
[mmaid=1] load summary for dim=WriteBandwidth (s2): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=ByteSize (s2): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=CPURate (n2): loadLow, reason: load is >10% below mean [load=100 meanLoad=400 fractionUsed=10.00% meanUtil=40.00% capacity=1000]
[mmaid=1] evaluating s2: node load loadLow, store load loadNormal, worst dim WriteBandwidth
[mmaid=1] load summary for dim=CPURate (s3): loadLow, reason: load is >10% below mean [load=100 meanLoad=400 fractionUsed=10.00% meanUtil=40.00% capacity=1000]
[mmaid=1] load summary for dim=WriteBandwidth (s3): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=ByteSize (s3): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=CPURate (n3): loadLow, reason: load is >10% below mean [load=100 meanLoad=400 fractionUsed=10.00% meanUtil=40.00% capacity=1000]
[mmaid=1] evaluating s3: node load loadLow, store load loadNormal, worst dim WriteBandwidth
[mmaid=1] start processing shedding store s1: cpu node load overloadUrgent, store load overloadUrgent, worst dim CPURate
[mmaid=1] top-K[CPURate] ranges for s1 with lease on local s1: r1:[cpu:100ns/s, write-bandwidth:0 B/s, byte-size:0 B]
[mmaid=1] local store s1 is CPU overloaded (overloadUrgent >= overloadSlow), attempting lease transfers first
[mmaid=1] considering lease-transfer r1 from s1: candidates are [2 3]
[mmaid=1] skipping s3 for lease transfer: lease disposition shedding (health dead)
[mmaid=1] load summary for dim=CPURate (s1): overloadUrgent, reason: fractionUsed > 90% [load=1000 meanLoad=550 fractionUsed=100.00% meanUtil=55.00% capacity=1000]
[mmaid=1] load summary for dim=WriteBandwidth (s1): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=ByteSize (s1): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=CPURate (n1): overloadUrgent, reason: fractionUsed > 90% [load=1000 meanLoad=550 fractionUsed=100.00% meanUtil=55.00% capacity=1000]
[mmaid=1] load summary for dim=CPURate (s2): loadLow, reason: load is >10% below mean [load=100 meanLoad=550 fractionUsed=10.00% meanUtil=55.00% capacity=1000]
[mmaid=1] load summary for dim=WriteBandwidth (s2): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=ByteSize (s2): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=CPURate (n2): loadLow, reason: load is >10% below mean [load=100 meanLoad=550 fractionUsed=10.00% meanUtil=55.00% capacity=1000]
[mmaid=1] sortTargetCandidateSetAndPick: candidates: s2(SLS:loadNormal, overloadedDimLoadSummary:loadLow), overloadedDim:CPURate, picked s2
[mmaid=1] load summary for dim=CPURate (s2): loadLow, reason: load is >10% below mean [load=100 meanLoad=550 fractionUsed=10.00% meanUtil=55.00% capacity=1000]
[mmaid=1] load summary for dim=WriteBandwidth (s2): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=ByteSize (s2): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=CPURate (n2): loadLow, reason: load is >10% below mean [load=100 meanLoad=550 fractionUsed=10.00% meanUtil=55.00% capacity=1000]
[mmaid=1] load summary for dim=CPURate (s1): overloadUrgent, reason: fractionUsed > 90% [load=1000 meanLoad=550 fractionUsed=100.00% meanUtil=55.00% capacity=1000]
[mmaid=1] load summary for dim=WriteBandwidth (s1): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=ByteSize (s1): loadNormal, reason: load is within 5% of mean [load=0 meanLoad=0 fractionUsed=0.00% meanUtil=0.00% capacity=1000]
[mmaid=1] load summary for dim=CPURate (n1): overloadUrgent, reason: fractionUsed > 90% [load=1000 meanLoad=550 fractionUsed=100.00% meanUtil=55.00% capacity=1000]
[mmaid=1] can add load to n2s2: true targetSLS[(store=loadNormal worst=WriteBandwidth cpu=loadLow writes=loadNormal bytes=loadNormal node=loadLow high_disk=false frac_pending=0.00,0.00(true))] srcSLS[(store=overloadUrgent worst=CPURate cpu=overloadUrgent writes=loadNormal bytes=loadNormal node=overloadUrgent high_disk=false frac_pending=0.00,0.00(true))]
[mmaid=1] result(success): shedding r1 lease from s1 to s2 [change:r1=[transfer_to=2 cids=1,2]] with resulting loads source:[cpu:1µs/s, write-bandwidth:0 B/s, byte-size:0 B] target:[cpu:100ns/s, write-bandwidth:0 B/s, byte-size:0 B] (means: [cpu:550ns/s, write-bandwidth:0 B/s, byte-size:0 B]) (frac_pending: (src:0.00,target:0.00) (src:0.00,target:0.00))
[mmaid=1] skipping replica transfers for s1 to try more leases next time
[mmaid=1] rebalancing pass shed: {s1}
pending(2)
change-id=1 store-id=1 node-id=1 range-id=1 load-delta=[cpu:0s/s, write-bandwidth:0 B/s, byte-size:0 B] start=0s gc=1m0s
prev=(replica-id=1 type=VOTER_FULL leaseholder=true)
next=(replica-id=1 type=VOTER_FULL)
change-id=2 store-id=2 node-id=2 range-id=1 load-delta=[cpu:0s/s, write-bandwidth:0 B/s, byte-size:0 B] start=0s gc=1m0s
prev=(replica-id=2 type=VOTER_FULL)
next=(replica-id=2 type=VOTER_FULL leaseholder=true)
15 changes: 15 additions & 0 deletions pkg/kv/kvserver/allocator/storepool/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,21 @@ func (sp *StorePool) GetStores() map[roachpb.StoreID]roachpb.StoreDescriptor {
return stores
}

// GetStoreStatuses returns a map of store ID to store status for all known stores.
// TODO(wenyihu6): optimize for the allocation cost on this
func (sp *StorePool) GetStoreStatuses() map[roachpb.StoreID]StoreStatus {
now := sp.clock.Now()
timeUntilNodeDead := liveness.TimeUntilNodeDead.Get(&sp.st.SV)
timeAfterNodeSuspect := liveness.TimeAfterNodeSuspect.Get(&sp.st.SV)

result := make(map[roachpb.StoreID]StoreStatus)
sp.Details.StoreDetails.Range(func(storeID roachpb.StoreID, sd *StoreDetailMu) bool {
result[storeID] = sd.status(now, timeUntilNodeDead, sp.NodeLivenessFn, timeAfterNodeSuspect)
return true
})
return result
}

// GetStoreDetail returns the store detail for the given storeID.
func (sp *StorePool) GetStoreDetail(storeID roachpb.StoreID) *StoreDetailMu {
detail, ok := sp.Details.StoreDetails.Load(storeID)
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/asim/mmaintegration/mma_store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ func (msr *MMAStoreRebalancer) Tick(ctx context.Context, tick time.Time, s state
msr.pendingChangeIdx = 0
msr.lastRebalanceTime = tick
log.KvDistribution.VInfof(ctx, 1, "no more pending changes to process, will call compute changes again")
// Refresh store status from StorePool before computing changes.
// This uses the real production RefreshStoreStatus, which queries
// StorePool (backed by StatusTracker via NodeLivenessFn) and
// translates to MMA's status model.
msr.allocator.UpdateStoresStatuses(ctx, msr.as.GetMMAStoreStatuses())
storeLeaseholderMsg := MakeStoreLeaseholderMsgFromState(s, msr.localStoreID)
pendingChanges := msr.allocator.ComputeChanges(ctx, &storeLeaseholderMsg, mmaprototype.ChangeOptions{
LocalStoreID: roachpb.StoreID(msr.localStoreID),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# This test verifies MMA's write bandwidth rebalancing respects store status.
#
# Setup: 4 nodes, 40 ranges (RF=3). All replicas start on s1,s2,s3 with s1
# as leaseholder. s4 starts dead. Heavy write load (~19 MiB/s) creates
# pressure on all replicas. Replicate queue is disabled to isolate MMA behavior.
#
# Write bandwidth affects ALL replicas (not just leaseholder), so MMA should
# move replicas to balance write load. However, since s4 is dead, MMA should
# NOT move replicas there.
#
# Expected: MMA should avoid moving replicas to s4 (dead store).
gen_cluster nodes=4
----

setting split_queue_enabled=false
----

setting replicate_queue_enabled=false
----

# All ranges start on s1,s2,s3 with s1 as leaseholder. s4 has no replicas.
gen_ranges ranges=40 placement_type=replica_placement
{s1:*,s2,s3}:40
----
{s1:*,s2,s3}:40

# Write-heavy workload (rw_ratio=0 means 100% writes).
# High write bandwidth creates pressure on ALL replicas, not just leaseholder.
# This forces MMA to move replicas, not just leases.
gen_load rate=20000 rw_ratio=0 min_block=1000 max_block=1000
----
19 MiB/s goodput

# Node 4 becomes dead.
set_status node=4 liveness=dead
----

# Assert s4 (dead) should have 0 replicas.
assertion type=stat stat=replicas ticks=6 exact_bound=0 stores=(4)
----

eval duration=6m seed=42 metrics=(replicas,leases,write_bytes_per_second) cfgs=(mma-only)
----
leases#1: first: [s1=40, s2=0, s3=0, s4=0] (stddev=17.32, mean=10.00, sum=40)
leases#1: last: [s1=40, s2=0, s3=0, s4=0] (stddev=17.32, mean=10.00, sum=40)
leases#1: thrash_pct: [s1=0%, s2=0%, s3=0%, s4=0%] (sum=0%)
replicas#1: first: [s1=40, s2=40, s3=40, s4=0] (stddev=17.32, mean=30.00, sum=120)
replicas#1: last: [s1=40, s2=40, s3=40, s4=0] (stddev=17.32, mean=30.00, sum=120)
replicas#1: thrash_pct: [s1=0%, s2=0%, s3=0%, s4=0%] (sum=0%)
write_bytes_per_second#1: last: [s1=19999999, s2=19999999, s3=19999999, s4=0] (stddev=8660253.60, mean=14999999.25, sum=59999997)
write_bytes_per_second#1: thrash_pct: [s1=27020%, s2=27020%, s3=27020%, s4=0%] (sum=81060%)
artifacts[mma-only]: 9bd6f2a583adcd74
==========================
1 change: 1 addition & 0 deletions pkg/kv/kvserver/mma_store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (m *mmaStoreRebalancer) start(ctx context.Context, stopper *stop.Stopper) {
// rebalance may return true if errors happen in the process and fail to apply
// the changes successfully.
func (m *mmaStoreRebalancer) rebalance(ctx context.Context, periodicCall bool) bool {
m.mma.UpdateStoresStatuses(ctx, m.as.GetMMAStoreStatuses())
knownStoresByMMA := m.mma.KnownStores()
storeLeaseholderMsg, numIgnoredRanges := m.store.MakeStoreLeaseholderMsg(ctx, knownStoresByMMA)
if numIgnoredRanges > 0 {
Expand Down
Loading
Loading