Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
## 2025-06-23 - Runtime 0.18.0-alpha.13

- fix: time zone retrieval and logging [#906](https://github.com/hypermodeinc/modus/pull/906)
- feat: distribute actors across cluster when restoring on startup [#976](https://github.com/hypermodeinc/modus/pull/907)

## 2025-06-23 - Runtime 0.18.0-alpha.12

Expand Down
61 changes: 36 additions & 25 deletions runtime/actors/actorsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package actors
import (
"context"
"fmt"
"math/rand/v2"
"time"

"github.com/hypermodeinc/modus/runtime/db"
Expand Down Expand Up @@ -92,44 +93,54 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {

// restart local actors that are already running, which will reload the plugin
actors := _actorSystem.Actors()
localAgents := make(map[string]bool, len(actors))
for _, pid := range actors {
if a, ok := pid.Actor().(*wasmAgentActor); ok {
localAgents[a.agentId] = true
if err := goakt.Tell(ctx, pid, &messages.RestartAgent{}); err != nil {
logger.Err(ctx, err).Str("agent_id", a.agentId).Msg("Failed to send restart agent message to actor.")
}
}
}

// spawn actors for agents with state in the database, that are not already running
// check both locally and on remote nodes in the cluster
// do this in a goroutine to avoid blocking the cluster engine startup
go func() {
logger.Debug(ctx).Msg("Restoring agent actors from database.")
agents, err := db.QueryActiveAgents(ctx)
if err != nil {
logger.Err(ctx, err).Msg("Failed to query active agents from database.")
return
if err := restoreAgentActors(ctx, plugin.Name()); err != nil {
logger.Err(ctx, err).Msg("Failed to restore agent actors.")
}
inCluster := _actorSystem.InCluster()
for _, agent := range agents {
if !localAgents[agent.Id] {
if inCluster {
actorName := getActorName(agent.Id)
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
logger.Err(ctx, err).Msgf("Failed to check if actor %s exists in cluster.", actorName)
} else if exists {
// if the actor already exists in the cluster, skip spawning it
continue
}
}
if err := spawnActorForAgent(ctx, plugin.Name(), agent.Id, agent.Name, false); err != nil {
logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id)
}
}()

return nil
}

// restoreAgentActors spawn actors for agents with state in the database, that are not already running
func restoreAgentActors(ctx context.Context, pluginName string) error {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

logger.Debug(ctx).Msg("Restoring agent actors from database.")

// query the database for active agents
agents, err := db.QueryActiveAgents(ctx)
if err != nil {
return fmt.Errorf("failed to query active agents from database: %w", err)
}

// shuffle the agents to help distribute the load across the cluster when multiple nodes are starting simultaneously
rand.Shuffle(len(agents), func(i, j int) {
agents[i], agents[j] = agents[j], agents[i]
})

// spawn actors for each agent that is not already running
for _, agent := range agents {
actorName := getActorName(agent.Id)
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
logger.Err(ctx, err).Msgf("Failed to check if actor %s exists.", actorName)
} else if !exists {
err := spawnActorForAgent(ctx, pluginName, agent.Id, agent.Name, false)
if err != nil {
logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id)
}
}
}()
}

return nil
}
Expand Down
Loading