diff --git a/CHANGELOG.md b/CHANGELOG.md index 620665315..0596a2c23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ NOTE: all releases may include dependency updates, not specifically mentioned - feat: integrate try-as library [#912](https://github.com/hypermodeinc/modus/pull/912) +## 2025-07-14 - Runtime v0.18.6 + +- feat: restore agents on demand [#949](https://github.com/hypermodeinc/modus/pull/949) + ## 2025-07-12 - Runtime v0.18.5 - fix: sentry source context [#940](https://github.com/hypermodeinc/modus/pull/940) diff --git a/runtime/actors/actorsystem.go b/runtime/actors/actorsystem.go index 2c0f91e2f..b805ccf87 100644 --- a/runtime/actors/actorsystem.go +++ b/runtime/actors/actorsystem.go @@ -8,10 +8,9 @@ package actors import ( "context" "fmt" - "math/rand/v2" + "sync" "time" - "github.com/hypermodeinc/modus/runtime/db" "github.com/hypermodeinc/modus/runtime/logger" "github.com/hypermodeinc/modus/runtime/messages" "github.com/hypermodeinc/modus/runtime/pluginmanager" @@ -82,7 +81,13 @@ func startActorSystem(ctx context.Context, actorSystem goakt.ActorSystem) error } // important: wait for the actor system to sync with the cluster before proceeding - waitForClusterSync(ctx) + if clusterEnabled() { + select { + case <-time.After(peerSyncInterval()): + case <-ctx.Done(): + logger.Warn(context.WithoutCancel(ctx)).Msg("Context cancelled while waiting for cluster sync.") + } + } return nil } @@ -106,68 +111,9 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { } } - // do this in a goroutine to avoid blocking the cluster engine startup - go func() { - if err := restoreAgentActors(ctx, plugin.Name()); err != nil { - const msg = "Failed to restore agent actors." - sentryutils.CaptureError(ctx, err, msg) - logger.Error(ctx, err).Msg(msg) - } - }() - return nil } -// restoreAgentActors spawn actors for agents with state in the database, that are not already running -func restoreAgentActors(ctx context.Context, pluginName string) error { - span, ctx := sentryutils.NewSpanForCurrentFunc(ctx) - defer span.Finish() - - logger.Debug(ctx).Msg("Restoring agent actors from database.") - - // query the database for active agents - agents, err := db.QueryActiveAgents(ctx) - if err != nil { - return fmt.Errorf("failed to query active agents from database: %w", err) - } - - // shuffle the agents to help distribute the load across the cluster when multiple nodes are starting simultaneously - rand.Shuffle(len(agents), func(i, j int) { - agents[i], agents[j] = agents[j], agents[i] - }) - - // spawn actors for each agent that is not already running - for _, agent := range agents { - actorName := getActorName(agent.Id) - if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil { - const msg = "Failed to check if agent actor exists." - sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", agent.Id)) - logger.Error(ctx, err).Str("agent_id", agent.Id).Msg(msg) - } else if !exists { - err := spawnActorForAgent(ctx, pluginName, agent.Id, agent.Name, false) - if err != nil { - const msg = "Failed to spawn actor for agent." - sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", agent.Id)) - logger.Error(ctx, err).Str("agent_id", agent.Id).Msg(msg) - } - } - } - - return nil -} - -// Waits for the peer sync interval to pass, allowing time for the actor system to synchronize its -// list of actors with the remote nodes in the cluster. Cancels early if the context is done. -func waitForClusterSync(ctx context.Context) { - if clusterEnabled() { - select { - case <-time.After(peerSyncInterval()): - case <-ctx.Done(): - logger.Warn(context.WithoutCancel(ctx)).Msg("Context cancelled while waiting for cluster sync.") - } - } -} - func Shutdown(ctx context.Context) { span, ctx := sentryutils.NewSpanForCurrentFunc(ctx) defer span.Finish() @@ -209,29 +155,25 @@ func (sh *shutdownHook) Execute(ctx context.Context, actorSystem goakt.ActorSyst // Suspend all local running agent actors first, which allows them to gracefully stop and persist their state. // In cluster mode, this will also allow the actor to resume on another node after this node shuts down. + // We use goroutines and a wait group to do this concurrently. + var wg sync.WaitGroup for _, pid := range actors { if actor, ok := pid.Actor().(*wasmAgentActor); ok && pid.IsRunning() { if actor.status == AgentStatusRunning { - ctx := actor.augmentContext(ctx, pid) - if err := actor.suspendAgent(ctx); err != nil { - const msg = "Failed to suspend agent actor." - sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", actor.agentId)) - logger.Error(ctx, err).Str("agent_id", actor.agentId).Msg(msg) - } - } - } - } - - // Then shut down subscription actors. They will have received the suspend message already. - for _, pid := range actors { - if a, ok := pid.Actor().(*subscriptionActor); ok && pid.IsRunning() { - if err := pid.Shutdown(ctx); err != nil { - const msg = "Failed to shut down subscription actor." - sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", a.agentId)) - logger.Error(ctx, err).Str("agent_id", a.agentId).Msg(msg) + wg.Add(1) + go func() { + defer wg.Done() + ctx := actor.augmentContext(ctx, pid) + if err := actor.suspendAgent(ctx); err != nil { + const msg = "Failed to suspend agent actor." + sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", actor.agentId)) + logger.Error(ctx, err).Str("agent_id", actor.agentId).Msg(msg) + } + }() } } } + wg.Wait() // Then allow the actor system to continue with its shutdown process. return nil diff --git a/runtime/actors/agents.go b/runtime/actors/agents.go index 0c19f7bd1..fd0803d4a 100644 --- a/runtime/actors/agents.go +++ b/runtime/actors/agents.go @@ -98,12 +98,6 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri }), ) - // Important: Wait for the actor system to sync with the cluster before proceeding. - // This ensures consistency across the cluster, so we don't accidentally spawn the same actor multiple times. - // GoAkt does not resolve such inconsistencies automatically, so we need to handle this manually. - // A short sync time should not be noticeable by the user. - waitForClusterSync(ctx) - return err } @@ -208,26 +202,58 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data } var err error - var res proto.Message - if timeout == 0 { - err = tell(ctx, actorName, msg) - } else { - res, err = ask(ctx, actorName, msg, time.Duration(timeout)) - } + const maxRetries = 3 + for attempt := 1; attempt <= maxRetries; attempt++ { + + var res proto.Message + if timeout == 0 { + err = tell(ctx, actorName, msg) + } else { + res, err = ask(ctx, actorName, msg, time.Duration(timeout)) + } - if errors.Is(err, goakt.ErrActorNotFound) { - return newAgentMessageErrorResponse("agent not found"), nil - } else if err != nil { - return nil, fmt.Errorf("error sending message to agent: %w", err) - } + if err == nil { + if res == nil { + return newAgentMessageDataResponse(nil), nil + } else if response, ok := res.(*messages.AgentResponse); ok { + return newAgentMessageDataResponse(response.Data), nil + } else { + return nil, fmt.Errorf("unexpected agent response type: %T", res) + } + } - if res == nil { - return newAgentMessageDataResponse(nil), nil - } else if response, ok := res.(*messages.AgentResponse); ok { - return newAgentMessageDataResponse(response.Data), nil - } else { - return nil, fmt.Errorf("unexpected agent response type: %T", res) + if errors.Is(err, goakt.ErrActorNotFound) { + state, err := db.GetAgentState(ctx, agentId) + if err != nil { + return nil, fmt.Errorf("error getting agent state for %s: %w", agentId, err) + } + if state == nil { + return newAgentMessageErrorResponse("agent not found"), nil + } + + switch AgentStatus(state.Status) { + case AgentStatusStopping, AgentStatusTerminated: + return newAgentMessageErrorResponse("agent is no longer available"), nil + } + + // Restart the agent actor locally if it is not running. + var pluginName string + if plugin, ok := plugins.GetPluginFromContext(ctx); !ok { + return nil, fmt.Errorf("no plugin found in context") + } else { + pluginName = plugin.Name() + } + agentName := state.Name + if err := spawnActorForAgent(ctx, pluginName, agentId, agentName, false); err != nil { + return nil, fmt.Errorf("error spawning actor for agent %s: %w", agentId, err) + } + + // Retry sending the message to the agent actor. + continue + } } + + return nil, fmt.Errorf("error sending message to agent: %w", err) } func PublishAgentEvent(ctx context.Context, agentId, eventName string, eventData *string) error { diff --git a/runtime/actors/subscriber.go b/runtime/actors/subscriber.go index 4caff8054..bdb6e5ff4 100644 --- a/runtime/actors/subscriber.go +++ b/runtime/actors/subscriber.go @@ -68,6 +68,8 @@ func SubscribeForAgentEvents(ctx context.Context, agentId string, update func(da return fmt.Errorf("failed to subscribe to topic: %w", err) } + logger.Debug(ctx).Msgf("Subscribed to topic %s with subscription actor %s", topic, subActor.Name()) + // When the context is done, we will unsubscribe and stop the subscription actor. // For example, the GraphQL subscription is closed or the client disconnects. go func() {