Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ NOTE: all releases may include dependency updates, not specifically mentioned

- feat: integrate try-as library [#912](https://github.com/hypermodeinc/modus/pull/912)

## 2025-07-14 - Runtime v0.18.6

- feat: restore agents on demand [#949](https://github.com/hypermodeinc/modus/pull/949)

## 2025-07-12 - Runtime v0.18.5

- fix: sentry source context [#940](https://github.com/hypermodeinc/modus/pull/940)
Expand Down
100 changes: 21 additions & 79 deletions runtime/actors/actorsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ package actors
import (
"context"
"fmt"
"math/rand/v2"
"sync"
"time"

"github.com/hypermodeinc/modus/runtime/db"
"github.com/hypermodeinc/modus/runtime/logger"
"github.com/hypermodeinc/modus/runtime/messages"
"github.com/hypermodeinc/modus/runtime/pluginmanager"
Expand Down Expand Up @@ -82,7 +81,13 @@ func startActorSystem(ctx context.Context, actorSystem goakt.ActorSystem) error
}

// important: wait for the actor system to sync with the cluster before proceeding
waitForClusterSync(ctx)
if clusterEnabled() {
select {
case <-time.After(peerSyncInterval()):
case <-ctx.Done():
logger.Warn(context.WithoutCancel(ctx)).Msg("Context cancelled while waiting for cluster sync.")
}
}

return nil
}
Expand All @@ -106,68 +111,9 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
}
}

// do this in a goroutine to avoid blocking the cluster engine startup
go func() {
if err := restoreAgentActors(ctx, plugin.Name()); err != nil {
const msg = "Failed to restore agent actors."
sentryutils.CaptureError(ctx, err, msg)
logger.Error(ctx, err).Msg(msg)
}
}()

return nil
}

// restoreAgentActors spawn actors for agents with state in the database, that are not already running
func restoreAgentActors(ctx context.Context, pluginName string) error {
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

logger.Debug(ctx).Msg("Restoring agent actors from database.")

// query the database for active agents
agents, err := db.QueryActiveAgents(ctx)
if err != nil {
return fmt.Errorf("failed to query active agents from database: %w", err)
}

// shuffle the agents to help distribute the load across the cluster when multiple nodes are starting simultaneously
rand.Shuffle(len(agents), func(i, j int) {
agents[i], agents[j] = agents[j], agents[i]
})

// spawn actors for each agent that is not already running
for _, agent := range agents {
actorName := getActorName(agent.Id)
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
const msg = "Failed to check if agent actor exists."
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", agent.Id))
logger.Error(ctx, err).Str("agent_id", agent.Id).Msg(msg)
} else if !exists {
err := spawnActorForAgent(ctx, pluginName, agent.Id, agent.Name, false)
if err != nil {
const msg = "Failed to spawn actor for agent."
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", agent.Id))
logger.Error(ctx, err).Str("agent_id", agent.Id).Msg(msg)
}
}
}

return nil
}

// Waits for the peer sync interval to pass, allowing time for the actor system to synchronize its
// list of actors with the remote nodes in the cluster. Cancels early if the context is done.
func waitForClusterSync(ctx context.Context) {
if clusterEnabled() {
select {
case <-time.After(peerSyncInterval()):
case <-ctx.Done():
logger.Warn(context.WithoutCancel(ctx)).Msg("Context cancelled while waiting for cluster sync.")
}
}
}

func Shutdown(ctx context.Context) {
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()
Expand Down Expand Up @@ -209,29 +155,25 @@ func (sh *shutdownHook) Execute(ctx context.Context, actorSystem goakt.ActorSyst

// Suspend all local running agent actors first, which allows them to gracefully stop and persist their state.
// In cluster mode, this will also allow the actor to resume on another node after this node shuts down.
// We use goroutines and a wait group to do this concurrently.
var wg sync.WaitGroup
for _, pid := range actors {
if actor, ok := pid.Actor().(*wasmAgentActor); ok && pid.IsRunning() {
if actor.status == AgentStatusRunning {
ctx := actor.augmentContext(ctx, pid)
if err := actor.suspendAgent(ctx); err != nil {
const msg = "Failed to suspend agent actor."
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", actor.agentId))
logger.Error(ctx, err).Str("agent_id", actor.agentId).Msg(msg)
}
}
}
}

// Then shut down subscription actors. They will have received the suspend message already.
for _, pid := range actors {
if a, ok := pid.Actor().(*subscriptionActor); ok && pid.IsRunning() {
if err := pid.Shutdown(ctx); err != nil {
const msg = "Failed to shut down subscription actor."
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", a.agentId))
logger.Error(ctx, err).Str("agent_id", a.agentId).Msg(msg)
wg.Add(1)
go func() {
defer wg.Done()
ctx := actor.augmentContext(ctx, pid)
if err := actor.suspendAgent(ctx); err != nil {
const msg = "Failed to suspend agent actor."
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", actor.agentId))
logger.Error(ctx, err).Str("agent_id", actor.agentId).Msg(msg)
}
}()
}
}
}
wg.Wait()

// Then allow the actor system to continue with its shutdown process.
return nil
Expand Down
72 changes: 49 additions & 23 deletions runtime/actors/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,6 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri
}),
)

// Important: Wait for the actor system to sync with the cluster before proceeding.
// This ensures consistency across the cluster, so we don't accidentally spawn the same actor multiple times.
// GoAkt does not resolve such inconsistencies automatically, so we need to handle this manually.
// A short sync time should not be noticeable by the user.
waitForClusterSync(ctx)

return err
}

Expand Down Expand Up @@ -208,26 +202,58 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data
}

var err error
var res proto.Message
if timeout == 0 {
err = tell(ctx, actorName, msg)
} else {
res, err = ask(ctx, actorName, msg, time.Duration(timeout))
}
const maxRetries = 3
for attempt := 1; attempt <= maxRetries; attempt++ {

var res proto.Message
if timeout == 0 {
err = tell(ctx, actorName, msg)
} else {
res, err = ask(ctx, actorName, msg, time.Duration(timeout))
}

if errors.Is(err, goakt.ErrActorNotFound) {
return newAgentMessageErrorResponse("agent not found"), nil
} else if err != nil {
return nil, fmt.Errorf("error sending message to agent: %w", err)
}
if err == nil {
if res == nil {
return newAgentMessageDataResponse(nil), nil
} else if response, ok := res.(*messages.AgentResponse); ok {
return newAgentMessageDataResponse(response.Data), nil
} else {
return nil, fmt.Errorf("unexpected agent response type: %T", res)
}
}

if res == nil {
return newAgentMessageDataResponse(nil), nil
} else if response, ok := res.(*messages.AgentResponse); ok {
return newAgentMessageDataResponse(response.Data), nil
} else {
return nil, fmt.Errorf("unexpected agent response type: %T", res)
if errors.Is(err, goakt.ErrActorNotFound) {
state, err := db.GetAgentState(ctx, agentId)
if err != nil {
return nil, fmt.Errorf("error getting agent state for %s: %w", agentId, err)
}
if state == nil {
return newAgentMessageErrorResponse("agent not found"), nil
}

switch AgentStatus(state.Status) {
case AgentStatusStopping, AgentStatusTerminated:
return newAgentMessageErrorResponse("agent is no longer available"), nil
}

// Restart the agent actor locally if it is not running.
var pluginName string
if plugin, ok := plugins.GetPluginFromContext(ctx); !ok {
return nil, fmt.Errorf("no plugin found in context")
} else {
pluginName = plugin.Name()
}
agentName := state.Name
if err := spawnActorForAgent(ctx, pluginName, agentId, agentName, false); err != nil {
return nil, fmt.Errorf("error spawning actor for agent %s: %w", agentId, err)
}

// Retry sending the message to the agent actor.
continue
}
}

return nil, fmt.Errorf("error sending message to agent: %w", err)
}

func PublishAgentEvent(ctx context.Context, agentId, eventName string, eventData *string) error {
Expand Down
2 changes: 2 additions & 0 deletions runtime/actors/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func SubscribeForAgentEvents(ctx context.Context, agentId string, update func(da
return fmt.Errorf("failed to subscribe to topic: %w", err)
}

logger.Debug(ctx).Msgf("Subscribed to topic %s with subscription actor %s", topic, subActor.Name())

// When the context is done, we will unsubscribe and stop the subscription actor.
// For example, the GraphQL subscription is closed or the client disconnects.
go func() {
Expand Down
Loading