Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/mmaprototype/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"load.go",
"memo_helper.go",
"messages.go",
"range_change.go",
"rebalance_advisor.go",
"store_load_summary.go",
"store_set.go",
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/allocator/mmaprototype/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ type Allocator interface {
// Calls to AdjustPendingChangeDisposition must be correctly sequenced with
// full state updates from the local node provided in
// ProcessNodeLoadResponse.
AdjustPendingChangeDisposition(change PendingRangeChange, success bool)
AdjustPendingChangeDisposition(change ExternalRangeChange, success bool)

// RegisterExternalChange informs this allocator about yet to complete
// changes to the cluster which were not initiated by this allocator. The
// ownership of all state inside change is handed off to the callee. If ok
// is true, the change was registered, and the caller should subsequently
// use the same change in a subsequent call to
// is true, the change was registered, and the caller is returned an
// ExternalRangeChange that it should subsequently use in a call to
// AdjustPendingChangeDisposition when the changes are completed, either
// successfully or not. If ok is false, the change was not registered.
RegisterExternalChange(change PendingRangeChange) (ok bool)
RegisterExternalChange(change PendingRangeChange) (_ ExternalRangeChange, ok bool)

// ComputeChanges is called periodically and frequently, say every 10s.
//
Expand All @@ -82,7 +82,7 @@ type Allocator interface {
// the allocator, to avoid re-proposing the same change and to make
// adjustments to the load.
ComputeChanges(
ctx context.Context, msg *StoreLeaseholderMsg, opts ChangeOptions) []PendingRangeChange
ctx context.Context, msg *StoreLeaseholderMsg, opts ChangeOptions) []ExternalRangeChange

// AdminRelocateOne is a helper for AdminRelocateRange.
//
Expand Down
14 changes: 8 additions & 6 deletions pkg/kv/kvserver/allocator/mmaprototype/allocator_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (a *allocatorState) ProcessStoreLoadMsg(ctx context.Context, msg *StoreLoad
}

// AdjustPendingChangeDisposition implements the Allocator interface.
func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChange, success bool) {
func (a *allocatorState) AdjustPendingChangeDisposition(change ExternalRangeChange, success bool) {
a.mu.Lock()
defer a.mu.Unlock()
rs, ok := a.cs.ranges[change.RangeID]
Expand All @@ -279,7 +279,7 @@ func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChang
// can be made about whether these changes will be found in the allocator's
// state. We gather the found changes.
var changes []*pendingReplicaChange
for _, c := range change.pendingReplicaChanges {
for _, c := range change.Changes {
ch, ok := a.cs.pendingChanges[c.changeID]
if !ok {
continue
Expand Down Expand Up @@ -308,25 +308,27 @@ func (a *allocatorState) AdjustPendingChangeDisposition(change PendingRangeChang
}

// RegisterExternalChange implements the Allocator interface.
func (a *allocatorState) RegisterExternalChange(change PendingRangeChange) (ok bool) {
func (a *allocatorState) RegisterExternalChange(
change PendingRangeChange,
) (_ ExternalRangeChange, ok bool) {
a.mu.Lock()
defer a.mu.Unlock()
if err := a.cs.preCheckOnApplyReplicaChanges(change); err != nil {
a.mmaMetrics.ExternalFailedToRegister.Inc(1)
log.KvDistribution.Infof(context.Background(),
"did not register external changes: due to %v", err)
return false
return ExternalRangeChange{}, false
} else {
a.mmaMetrics.ExternaRegisterSuccess.Inc(1)
}
a.cs.addPendingRangeChange(change)
return true
return MakeExternalRangeChange(change), true
}

// ComputeChanges implements the Allocator interface.
func (a *allocatorState) ComputeChanges(
ctx context.Context, msg *StoreLeaseholderMsg, opts ChangeOptions,
) []PendingRangeChange {
) []ExternalRangeChange {
a.mu.Lock()
defer a.mu.Unlock()
if msg.StoreID != opts.LocalStoreID {
Expand Down
Loading