Skip to content
This repository was archived by the owner on Jan 20, 2026. It is now read-only.

Worker Rebalance Fails on Unassigned Shards #50

@calebstewart

Description

@calebstewart

Describe the bug

I am seeing errors like this when restarting a worker instance:

Error in rebalance: AssignedToNotFoundForShard

Investigating the root cause, it seems that the DynamoCheckpoint.ListActiveWorkers method returns this error when a shard has no assigned lease owner. The name of the error makes sense conceptually, but is this really an error? Why does the ListActiveWorkers method need to fail in this scenario? My (possibly naive) assumption would be that if a shard is unassigned, that does not affect the list of active workers. The method should return whatever set of active workers it finds, ignoring unassigned shards. For example:

// ListActiveWorkers returns a map of workers and their shards
func (checkpointer *DynamoCheckpoint) ListActiveWorkers(shardStatus map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) {
	err := checkpointer.syncLeases(shardStatus)
	if err != nil {
		return nil, err
	}

	workers := map[string][]*par.ShardStatus{}
	for _, shard := range shardStatus {
		if shard.GetCheckpoint() == ShardEnd {
			continue
		}

		leaseOwner := shard.GetLeaseOwner()
		if leaseOwner == "" {
                        // Original code
			// checkpointer.log.Debugf("Shard Not Assigned Error. ShardID: %s, WorkerID: %s", shard.ID, checkpointer.kclConfig.WorkerID)
			// return nil, ErrShardNotAssigned
			checkpointer.log.Debugf("ListActiveWorkers: Shard Not Assigned. ShardID: %s, WorkerID: %s", shard.ID, checkpointer.kclConfig.WorkerID)
			continue
		}

		if w, ok := workers[leaseOwner]; ok {
			workers[leaseOwner] = append(w, shard)
		} else {
			workers[leaseOwner] = []*par.ShardStatus{shard}
		}
	}
	return workers, nil
}

Reproduction steps

  1. Start multiple workers.
  2. Restart a worker

Expected behavior

If the intention is to restrict rebalancing until all shards have leases, I don't think an error from rebalance is appropriate. If that is the case, this is not an error condition. At worst, I would argue this is a warning, but in my honest opinion, this would just be something like this in Worker.rebalance():

func (w *Worker) rebalance() error {
  // ... snip ...

  workers, err := w.checkpointer.ListActiveWorkers(w.shardStatus)
  if errors.Is(err, checkpoint.ErrAssignedToNotFound) {
    // Not all shards have leases yet, so don't rebalance
    return nil
  } else if err != nil {
    log.Debugf("Error listing workers. workerID: %s. Error: %+v", w.workerID, err)
    return err
  }

  // ... snip ...
}

Additional context

This may be intended functionality, but it seems odd to me, so I figured I'd open a bug report to ask. If this is intended, I'd appreciate some explanation of the error in question. It happens regularly during restarts, and has thus far seemed to be a red haring, and not a real error. So, it gives me a little fright every time I check up on the logs.

Also, if this is not something you have observed and you think I may be doing something wrong, please let me know. Happy to fix my code and close this issue if need be 😄

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions