Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions service/sharddistributor/leader/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,16 @@ func (p *namespaceProcessor) runProcess(ctx context.Context) {

// runRebalancingLoop handles shard assignment and redistribution.
func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
// Buffered channel to allow one pending rebalance trigger.
triggerChan := make(chan string, 1)

// Perform an initial rebalance on startup.
err := p.rebalanceShards(ctx)
if err != nil {
p.logger.Error("initial rebalance failed", tag.Error(err))
}
Comment on lines 189 to 193
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Edge Case: Initial rebalance failure doesn't trigger immediate retry

When the initial rebalance at line 191 fails, the error is logged but no retry is queued to triggerChan. The new retry-on-failure logic (lines 220-226) only applies to failures within the main loop, not the initial attempt.

This means if the initial rebalance fails, the system must wait until the first periodic tick (cfg.Period, default 1s) or a state change to retry. This is likely acceptable since the periodic trigger fires quickly, but it's inconsistent with the PR's stated goal of "immediate retry after failure."

If desired, you could queue a retry after the initial failure:

if err != nil {
    p.logger.Error("initial rebalance failed", tag.Error(err))
    select {
    case triggerChan <- "Initial rebalance failed":
    default:
    }
}

Was this helpful? React with 👍 / 👎


updateChan, err := p.runRebalanceTriggeringLoop(ctx)
if err != nil {
if err := p.runRebalanceTriggeringLoop(ctx, triggerChan); err != nil {
p.logger.Error("failed to start rebalance triggering loop", tag.Error(err))
return
}
Expand All @@ -201,43 +202,46 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
p.logger.Info("Rebalancing loop cancelled.")
p.logger.Info("Rebalancing loop cancelled")
return

case update := <-updateChan:
case triggerReason := <-triggerChan:
// If an update comes in before the cooldown has expired,
// we wait until the cooldown has passed since the last rebalance before processing it.
// This ensures that we don't rebalance too frequently in response to a flurry of updates
p.timeSource.Sleep(nextRebalanceAllowedAt.Sub(p.timeSource.Now()))
nextRebalanceAllowedAt = p.timeSource.Now().Add(p.cfg.RebalanceCooldown)

p.logger.Info("Rebalancing triggered", tag.Dynamic("reason", update))
p.logger.Info("Rebalancing triggered", tag.Dynamic("triggerReason", triggerReason))
if err := p.rebalanceShards(ctx); err != nil {
p.logger.Error("rebalance failed", tag.Error(err))

// If rebalance fails, we want to trigger another rebalance ASAP,
// but with a cooldown to avoid rebalance storms if the underlying issue is persistent.
select {
case triggerChan <- "Previous rebalance failed":
default:
// If the channel is full, we skip sending the update to avoid blocking the loop.
}
}
}
}
}

// runRebalanceTriggeringLoop monitors for state changes and periodic triggers to initiate rebalancing.
// it doesn't block Subscribe calls to avoid a growing backlog of updates.
func (p *namespaceProcessor) runRebalanceTriggeringLoop(ctx context.Context) (<-chan string, error) {
// Buffered channel to allow one pending rebalance trigger.
triggerChan := make(chan string, 1)

func (p *namespaceProcessor) runRebalanceTriggeringLoop(ctx context.Context, triggerChan chan<- string) error {
updateChan, err := p.shardStore.SubscribeToExecutorStatusChanges(ctx, p.namespaceCfg.Name)
if err != nil {
p.logger.Error("Failed to subscribe to state changes, stopping rebalancing loop.", tag.Error(err))
return nil, err
return err
}

go p.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
return triggerChan, nil
return nil
}

func (p *namespaceProcessor) rebalanceTriggeringLoop(ctx context.Context, updateChan <-chan int64, triggerChan chan<- string) {
defer close(triggerChan)

ticker := p.timeSource.NewTicker(p.cfg.Period)
defer ticker.Stop()

Expand Down
Loading