Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/allocator/mmaprototype/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type Allocator interface {
// associated node in the cluster.
ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoadMsg)

// UpdateStoreStatus updates the health and disposition for the stores in storeStatuses according to the statuses in storeStatuses.
// Stores not known to the allocator are ignored with logging.
// TODO(wenyihu6): if this is too expensive, we should only update status for stores that have changed.
UpdateStoreStatus(ctx context.Context, storeStatuses map[roachpb.StoreID]Status)

// Methods related to making changes.

// AdjustPendingChangeDisposition is optional feedback to inform the
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoad
a.cs.processStoreLoadMsg(ctx, msg)
}

// RefreshStoreStatus implements the Allocator interface.
func (a *allocatorState) UpdateStoreStatus(ctx context.Context, storeStatuses map[roachpb.StoreID]Status) {
a.mu.Lock()
defer a.mu.Unlock()
a.cs.updateStoreStatuses(ctx, storeStatuses)
}

// AdjustPendingChangeDisposition implements the Allocator interface.
func (a *allocatorState) AdjustPendingChangeDisposition(
ctx context.Context, change ExternalRangeChange, success bool,
Expand Down
29 changes: 19 additions & 10 deletions pkg/kv/kvserver/allocator/mmaprototype/cluster_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2180,16 +2180,10 @@ func (cs *clusterState) setStore(sal storeAttributesAndLocalityWithNodeTier) {
if !ok {
// This is the first time seeing this store.
ss := newStoreState()
// TODO(tbg): below is what we should be doing once asim and production code actually
// have a way to update the health status. For now, we just set it to healthy initially
// and that's where it will stay (outside of unit tests).
//
// At this point, the store's health is unknown. It will need to be marked
// as healthy separately. Until we know more, we won't place leases or
// replicas on it (nor will we try to shed any that are already reported to
// have replicas on it).
// ss.status = MakeStatus(HealthUnknown, LeaseDispositionRefusing, ReplicaDispositionRefusing)
ss.status = MakeStatus(HealthOK, LeaseDispositionOK, ReplicaDispositionOK)
// At this point, the store's health is unknown. It will be updated by cs.updateStoreStatuses. Until we know more, we
// won't place leases or replicas on it (nor will we try to shed any that
// are already reported to have replicas on it).
ss.status = MakeStatus(HealthUnknown, LeaseDispositionRefusing, ReplicaDispositionRefusing)
ss.overloadStartTime = cs.ts.Now()
ss.overloadEndTime = cs.ts.Now()
cs.stores[sal.StoreID] = ss
Expand All @@ -2216,6 +2210,21 @@ func (cs *clusterState) setStore(sal storeAttributesAndLocalityWithNodeTier) {
}
}

// updateStoreStatuses updates each known store's health and disposition from storeStatuses.
// Stores unknown in mma yet but are known to store pool are ignored with logging.
func (cs *clusterState) updateStoreStatuses(ctx context.Context, storeStatuses map[roachpb.StoreID]Status) {
for storeID, storeStatus := range storeStatuses {
if _, ok := cs.stores[storeID]; !ok {
// Store not known to mma yet but is known to store pool - ignore the update. The store will be added via
// setStore when gossip arrives, and then subsequent status updates will
// take effect.
log.KvDistribution.Infof(ctx, "store %d not found in cluster state, skipping update", storeID)
continue
}
cs.stores[storeID].status = storeStatus
}
}

//======================================================================
// clusterState accessors:
//
Expand Down
Loading
Loading