diff --git a/CHANGELOG.md b/CHANGELOG.md index 9293a4bce..be2d02ccf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ NOTE: all releases may include dependency updates, not specifically mentioned - feat: integrate try-as library [#912](https://github.com/hypermodeinc/modus/pull/912) +## 2025-07-15 - Runtime v0.18.7 + +- feat: agent passivation [#952](https://github.com/hypermodeinc/modus/pull/952) + ## 2025-07-14 - Runtime v0.18.6 - feat: restore agents on demand [#949](https://github.com/hypermodeinc/modus/pull/949) diff --git a/runtime/actors/actorsystem.go b/runtime/actors/actorsystem.go index b805ccf87..a8262d4d4 100644 --- a/runtime/actors/actorsystem.go +++ b/runtime/actors/actorsystem.go @@ -163,7 +163,6 @@ func (sh *shutdownHook) Execute(ctx context.Context, actorSystem goakt.ActorSyst wg.Add(1) go func() { defer wg.Done() - ctx := actor.augmentContext(ctx, pid) if err := actor.suspendAgent(ctx); err != nil { const msg = "Failed to suspend agent actor." sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", actor.agentId)) diff --git a/runtime/actors/agents.go b/runtime/actors/agents.go index 6f480f228..fcbfbdd0e 100644 --- a/runtime/actors/agents.go +++ b/runtime/actors/agents.go @@ -20,6 +20,7 @@ import ( goakt "github.com/tochemey/goakt/v3/actor" "github.com/tochemey/goakt/v3/goaktpb" + "github.com/tochemey/goakt/v3/passivation" "github.com/rs/xid" "google.golang.org/protobuf/proto" @@ -89,15 +90,23 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri initializing: initializing, } + agentIdleTimeout := utils.GetDurationFromEnv("MODUS_AGENT_IDLE_TIMEOUT_SECONDS", 2, time.Second) + var agentPassivationStrategy = passivation.NewTimeBasedStrategy(agentIdleTimeout) + actorName := getActorName(agentId) _, err := _actorSystem.Spawn(ctx, actorName, actor, - goakt.WithLongLived(), + goakt.WithPassivationStrategy(agentPassivationStrategy), goakt.WithDependencies(&wasmAgentInfo{ AgentName: agentName, PluginName: pluginName, }), ) + if err != nil { + sentryutils.CaptureError(ctx, err, "Error spawning agent actor", + sentryutils.WithData("agent_id", agentId)) + } + return err } @@ -195,6 +204,52 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data actorName := getActorName(agentId) + // Pause passivation to ensure the actor is not passivated while processing the message. + if err := tell(ctx, actorName, &goaktpb.PausePassivation{}); errors.Is(err, goakt.ErrActorNotFound) { + state, err := db.GetAgentState(ctx, agentId) + if errors.Is(err, db.ErrAgentNotFound) { + return newAgentMessageErrorResponse(fmt.Sprintf("agent %s not found", agentId)), nil + } else if err != nil { + return nil, fmt.Errorf("error getting agent state for %s: %w", agentId, err) + } + + switch AgentStatus(state.Status) { + case AgentStatusStopping, AgentStatusTerminated: + return newAgentMessageErrorResponse("agent is no longer available"), nil + } + + // Restart the agent actor locally if it is not running. + var pluginName string + if plugin, ok := plugins.GetPluginFromContext(ctx); ok { + pluginName = plugin.Name() + } else { + return nil, errors.New("no plugin found in context") + } + agentName := state.Name + if err := spawnActorForAgent(ctx, pluginName, agentId, agentName, false); err != nil { + return nil, fmt.Errorf("error spawning actor for agent %s: %w", agentId, err) + } + + // Try again. + if err := tell(ctx, actorName, &goaktpb.PausePassivation{}); err != nil { + return nil, fmt.Errorf("error sending message to agent: %w", err) + } + } else if err != nil { + sentryutils.CaptureError(ctx, err, "Error pausing passivation for agent", + sentryutils.WithData("agent_id", agentId)) + return nil, fmt.Errorf("error sending message to agent: %w", err) + } + + defer func() { + // Resume passivation after the message is sent. + if err := tell(ctx, actorName, &goaktpb.ResumePassivation{}); err != nil { + const msg = "Error resuming passivation after sending message to agent." + logger.Error(ctx, err).Str("agent_id", agentId).Msg(msg) + sentryutils.CaptureError(ctx, err, msg, + sentryutils.WithData("agent_id", agentId)) + } + }() + msg := &messages.AgentRequest{ Name: msgName, Data: data, @@ -202,53 +257,20 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data } var err error - const maxRetries = 3 - for attempt := 1; attempt <= maxRetries; attempt++ { + var res proto.Message + if timeout == 0 { + err = tell(ctx, actorName, msg) + } else { + res, err = ask(ctx, actorName, msg, time.Duration(timeout)) + } - var res proto.Message - if timeout == 0 { - err = tell(ctx, actorName, msg) + if err == nil { + if res == nil { + return newAgentMessageDataResponse(nil), nil + } else if response, ok := res.(*messages.AgentResponse); ok { + return newAgentMessageDataResponse(response.Data), nil } else { - res, err = ask(ctx, actorName, msg, time.Duration(timeout)) - } - - if err == nil { - if res == nil { - return newAgentMessageDataResponse(nil), nil - } else if response, ok := res.(*messages.AgentResponse); ok { - return newAgentMessageDataResponse(response.Data), nil - } else { - return nil, fmt.Errorf("unexpected agent response type: %T", res) - } - } - - if errors.Is(err, goakt.ErrActorNotFound) { - state, err := db.GetAgentState(ctx, agentId) - if errors.Is(err, db.ErrAgentNotFound) { - return newAgentMessageErrorResponse(fmt.Sprintf("agent %s not found", agentId)), nil - } else if err != nil { - return nil, fmt.Errorf("error getting agent state for %s: %w", agentId, err) - } - - switch AgentStatus(state.Status) { - case AgentStatusStopping, AgentStatusTerminated: - return newAgentMessageErrorResponse("agent is no longer available"), nil - } - - // Restart the agent actor locally if it is not running. - var pluginName string - if plugin, ok := plugins.GetPluginFromContext(ctx); !ok { - return nil, fmt.Errorf("no plugin found in context") - } else { - pluginName = plugin.Name() - } - agentName := state.Name - if err := spawnActorForAgent(ctx, pluginName, agentId, agentName, false); err != nil { - return nil, fmt.Errorf("error spawning actor for agent %s: %w", agentId, err) - } - - // Retry sending the message to the agent actor. - continue + return nil, fmt.Errorf("unexpected agent response type: %T", res) } } diff --git a/runtime/actors/wasmagent.go b/runtime/actors/wasmagent.go index dea9ea51c..7531f5d89 100644 --- a/runtime/actors/wasmagent.go +++ b/runtime/actors/wasmagent.go @@ -27,6 +27,7 @@ import ( ) type wasmAgentActor struct { + pid *goakt.PID agentId string agentName string status AgentStatus @@ -84,6 +85,8 @@ func (a *wasmAgentActor) Receive(rc *goakt.ReceiveContext) { } case *goaktpb.PostStart: + a.pid = rc.Self() + if a.initializing { if err := a.startAgent(ctx); err != nil { rc.Err(fmt.Errorf("error starting agent: %w", err)) @@ -140,7 +143,7 @@ func (a *wasmAgentActor) Receive(rc *goakt.ReceiveContext) { } func (a *wasmAgentActor) PostStop(ac *goakt.Context) error { - ctx := ac.Context() + ctx := a.augmentContext(ac.Context(), a.pid) ctx = sentry.SetHubOnContext(ctx, a.sentryHub) span, ctx := sentryutils.NewSpanForCurrentFunc(ctx) defer span.Finish() diff --git a/runtime/go.mod b/runtime/go.mod index 6fec931b1..d647969d9 100644 --- a/runtime/go.mod +++ b/runtime/go.mod @@ -42,7 +42,7 @@ require ( github.com/tetratelabs/wazero v1.9.0 github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 - github.com/tochemey/goakt/v3 v3.7.1 + github.com/tochemey/goakt/v3 v3.7.2 github.com/travisjeffery/go-dynaport v1.0.0 github.com/twpayne/go-geom v1.6.1 github.com/wundergraph/graphql-go-tools/execution v1.4.0 diff --git a/runtime/go.sum b/runtime/go.sum index f5fd6be85..6d247a527 100644 --- a/runtime/go.sum +++ b/runtime/go.sum @@ -860,8 +860,8 @@ github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDW github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po= github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0= -github.com/tochemey/goakt/v3 v3.7.1 h1:HnigDV2jpx5AB3UTeHCqTxmNdf6WGbKUXM9k0rf3+hk= -github.com/tochemey/goakt/v3 v3.7.1/go.mod h1:zOV6ibP+V/e4l0pRN4vRJncgPENvhD2YDSPs1ZK1bcc= +github.com/tochemey/goakt/v3 v3.7.2 h1:TxZ3HsiJ37mXHIIjppCWGQKV+uASgw+dMPfjT9PexqQ= +github.com/tochemey/goakt/v3 v3.7.2/go.mod h1:zOV6ibP+V/e4l0pRN4vRJncgPENvhD2YDSPs1ZK1bcc= github.com/tochemey/olric v0.2.3 h1:LGmsHLQBSEs3uasZNLT5MdS2pBMNJ71gSrXnYfkb62M= github.com/tochemey/olric v0.2.3/go.mod h1:BAD82xys8R8IAWFV+GC0B8I+J4QsYZvmPS5NT/dhmtI= github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw=