diff --git a/CHANGELOG.md b/CHANGELOG.md index 62a004632..1f61c7ab1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ## 2025-06-23 - Runtime 0.18.0-alpha.13 - fix: time zone retrieval and logging [#906](https://github.com/hypermodeinc/modus/pull/906) +- feat: distribute actors across cluster when restoring on startup [#976](https://github.com/hypermodeinc/modus/pull/907) ## 2025-06-23 - Runtime 0.18.0-alpha.12 diff --git a/runtime/actors/actorsystem.go b/runtime/actors/actorsystem.go index 5217e50d8..8dafbe247 100644 --- a/runtime/actors/actorsystem.go +++ b/runtime/actors/actorsystem.go @@ -12,6 +12,7 @@ package actors import ( "context" "fmt" + "math/rand/v2" "time" "github.com/hypermodeinc/modus/runtime/db" @@ -92,44 +93,54 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { // restart local actors that are already running, which will reload the plugin actors := _actorSystem.Actors() - localAgents := make(map[string]bool, len(actors)) for _, pid := range actors { if a, ok := pid.Actor().(*wasmAgentActor); ok { - localAgents[a.agentId] = true if err := goakt.Tell(ctx, pid, &messages.RestartAgent{}); err != nil { logger.Err(ctx, err).Str("agent_id", a.agentId).Msg("Failed to send restart agent message to actor.") } } } - // spawn actors for agents with state in the database, that are not already running - // check both locally and on remote nodes in the cluster // do this in a goroutine to avoid blocking the cluster engine startup go func() { - logger.Debug(ctx).Msg("Restoring agent actors from database.") - agents, err := db.QueryActiveAgents(ctx) - if err != nil { - logger.Err(ctx, err).Msg("Failed to query active agents from database.") - return + if err := restoreAgentActors(ctx, plugin.Name()); err != nil { + logger.Err(ctx, err).Msg("Failed to restore agent actors.") } - inCluster := _actorSystem.InCluster() - for _, agent := range agents { - if !localAgents[agent.Id] { - if inCluster { - actorName := getActorName(agent.Id) - if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil { - logger.Err(ctx, err).Msgf("Failed to check if actor %s exists in cluster.", actorName) - } else if exists { - // if the actor already exists in the cluster, skip spawning it - continue - } - } - if err := spawnActorForAgent(ctx, plugin.Name(), agent.Id, agent.Name, false); err != nil { - logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id) - } + }() + + return nil +} + +// restoreAgentActors spawn actors for agents with state in the database, that are not already running +func restoreAgentActors(ctx context.Context, pluginName string) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + + logger.Debug(ctx).Msg("Restoring agent actors from database.") + + // query the database for active agents + agents, err := db.QueryActiveAgents(ctx) + if err != nil { + return fmt.Errorf("failed to query active agents from database: %w", err) + } + + // shuffle the agents to help distribute the load across the cluster when multiple nodes are starting simultaneously + rand.Shuffle(len(agents), func(i, j int) { + agents[i], agents[j] = agents[j], agents[i] + }) + + // spawn actors for each agent that is not already running + for _, agent := range agents { + actorName := getActorName(agent.Id) + if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil { + logger.Err(ctx, err).Msgf("Failed to check if actor %s exists.", actorName) + } else if !exists { + err := spawnActorForAgent(ctx, pluginName, agent.Id, agent.Name, false) + if err != nil { + logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id) } } - }() + } return nil }