Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ NOTE: all releases may include dependency updates, not specifically mentioned

- feat: integrate try-as library [#912](https://github.com/hypermodeinc/modus/pull/912)

## 2025-07-15 - Runtime v0.18.7

- feat: agent passivation [#952](https://github.com/hypermodeinc/modus/pull/952)

## 2025-07-14 - Runtime v0.18.6

- feat: restore agents on demand [#949](https://github.com/hypermodeinc/modus/pull/949)
Expand Down
1 change: 0 additions & 1 deletion runtime/actors/actorsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ func (sh *shutdownHook) Execute(ctx context.Context, actorSystem goakt.ActorSyst
wg.Add(1)
go func() {
defer wg.Done()
ctx := actor.augmentContext(ctx, pid)
if err := actor.suspendAgent(ctx); err != nil {
const msg = "Failed to suspend agent actor."
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", actor.agentId))
Expand Down
114 changes: 68 additions & 46 deletions runtime/actors/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

goakt "github.com/tochemey/goakt/v3/actor"
"github.com/tochemey/goakt/v3/goaktpb"
"github.com/tochemey/goakt/v3/passivation"

"github.com/rs/xid"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -89,15 +90,23 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri
initializing: initializing,
}

agentIdleTimeout := utils.GetDurationFromEnv("MODUS_AGENT_IDLE_TIMEOUT_SECONDS", 2, time.Second)
var agentPassivationStrategy = passivation.NewTimeBasedStrategy(agentIdleTimeout)

actorName := getActorName(agentId)
_, err := _actorSystem.Spawn(ctx, actorName, actor,
goakt.WithLongLived(),
goakt.WithPassivationStrategy(agentPassivationStrategy),
goakt.WithDependencies(&wasmAgentInfo{
AgentName: agentName,
PluginName: pluginName,
}),
)

if err != nil {
sentryutils.CaptureError(ctx, err, "Error spawning agent actor",
sentryutils.WithData("agent_id", agentId))
}

return err
}

Expand Down Expand Up @@ -195,60 +204,73 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data

actorName := getActorName(agentId)

// Pause passivation to ensure the actor is not passivated while processing the message.
if err := tell(ctx, actorName, &goaktpb.PausePassivation{}); errors.Is(err, goakt.ErrActorNotFound) {
state, err := db.GetAgentState(ctx, agentId)
if errors.Is(err, db.ErrAgentNotFound) {
return newAgentMessageErrorResponse(fmt.Sprintf("agent %s not found", agentId)), nil
} else if err != nil {
return nil, fmt.Errorf("error getting agent state for %s: %w", agentId, err)
}

switch AgentStatus(state.Status) {
case AgentStatusStopping, AgentStatusTerminated:
return newAgentMessageErrorResponse("agent is no longer available"), nil
}

// Restart the agent actor locally if it is not running.
var pluginName string
if plugin, ok := plugins.GetPluginFromContext(ctx); ok {
pluginName = plugin.Name()
} else {
return nil, errors.New("no plugin found in context")
}
agentName := state.Name
if err := spawnActorForAgent(ctx, pluginName, agentId, agentName, false); err != nil {
return nil, fmt.Errorf("error spawning actor for agent %s: %w", agentId, err)
}

// Try again.
if err := tell(ctx, actorName, &goaktpb.PausePassivation{}); err != nil {
return nil, fmt.Errorf("error sending message to agent: %w", err)
}
} else if err != nil {
sentryutils.CaptureError(ctx, err, "Error pausing passivation for agent",
sentryutils.WithData("agent_id", agentId))
return nil, fmt.Errorf("error sending message to agent: %w", err)
}

defer func() {
// Resume passivation after the message is sent.
if err := tell(ctx, actorName, &goaktpb.ResumePassivation{}); err != nil {
const msg = "Error resuming passivation after sending message to agent."
logger.Error(ctx, err).Str("agent_id", agentId).Msg(msg)
sentryutils.CaptureError(ctx, err, msg,
sentryutils.WithData("agent_id", agentId))
}
}()

msg := &messages.AgentRequest{
Name: msgName,
Data: data,
Respond: timeout > 0,
}

var err error
const maxRetries = 3
for attempt := 1; attempt <= maxRetries; attempt++ {
var res proto.Message
if timeout == 0 {
err = tell(ctx, actorName, msg)
} else {
res, err = ask(ctx, actorName, msg, time.Duration(timeout))
}

var res proto.Message
if timeout == 0 {
err = tell(ctx, actorName, msg)
if err == nil {
if res == nil {
return newAgentMessageDataResponse(nil), nil
} else if response, ok := res.(*messages.AgentResponse); ok {
return newAgentMessageDataResponse(response.Data), nil
} else {
res, err = ask(ctx, actorName, msg, time.Duration(timeout))
}

if err == nil {
if res == nil {
return newAgentMessageDataResponse(nil), nil
} else if response, ok := res.(*messages.AgentResponse); ok {
return newAgentMessageDataResponse(response.Data), nil
} else {
return nil, fmt.Errorf("unexpected agent response type: %T", res)
}
}

if errors.Is(err, goakt.ErrActorNotFound) {
state, err := db.GetAgentState(ctx, agentId)
if errors.Is(err, db.ErrAgentNotFound) {
return newAgentMessageErrorResponse(fmt.Sprintf("agent %s not found", agentId)), nil
} else if err != nil {
return nil, fmt.Errorf("error getting agent state for %s: %w", agentId, err)
}

switch AgentStatus(state.Status) {
case AgentStatusStopping, AgentStatusTerminated:
return newAgentMessageErrorResponse("agent is no longer available"), nil
}

// Restart the agent actor locally if it is not running.
var pluginName string
if plugin, ok := plugins.GetPluginFromContext(ctx); !ok {
return nil, fmt.Errorf("no plugin found in context")
} else {
pluginName = plugin.Name()
}
agentName := state.Name
if err := spawnActorForAgent(ctx, pluginName, agentId, agentName, false); err != nil {
return nil, fmt.Errorf("error spawning actor for agent %s: %w", agentId, err)
}

// Retry sending the message to the agent actor.
continue
return nil, fmt.Errorf("unexpected agent response type: %T", res)
}
}

Expand Down
5 changes: 4 additions & 1 deletion runtime/actors/wasmagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
)

type wasmAgentActor struct {
pid *goakt.PID
agentId string
agentName string
status AgentStatus
Expand Down Expand Up @@ -84,6 +85,8 @@ func (a *wasmAgentActor) Receive(rc *goakt.ReceiveContext) {
}

case *goaktpb.PostStart:
a.pid = rc.Self()

if a.initializing {
if err := a.startAgent(ctx); err != nil {
rc.Err(fmt.Errorf("error starting agent: %w", err))
Expand Down Expand Up @@ -140,7 +143,7 @@ func (a *wasmAgentActor) Receive(rc *goakt.ReceiveContext) {
}

func (a *wasmAgentActor) PostStop(ac *goakt.Context) error {
ctx := ac.Context()
ctx := a.augmentContext(ac.Context(), a.pid)
ctx = sentry.SetHubOnContext(ctx, a.sentryHub)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()
Expand Down
2 changes: 1 addition & 1 deletion runtime/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/tetratelabs/wazero v1.9.0
github.com/tidwall/gjson v1.18.0
github.com/tidwall/sjson v1.2.5
github.com/tochemey/goakt/v3 v3.7.1
github.com/tochemey/goakt/v3 v3.7.2
github.com/travisjeffery/go-dynaport v1.0.0
github.com/twpayne/go-geom v1.6.1
github.com/wundergraph/graphql-go-tools/execution v1.4.0
Expand Down
4 changes: 2 additions & 2 deletions runtime/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -860,8 +860,8 @@ github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDW
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po=
github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0=
github.com/tochemey/goakt/v3 v3.7.1 h1:HnigDV2jpx5AB3UTeHCqTxmNdf6WGbKUXM9k0rf3+hk=
github.com/tochemey/goakt/v3 v3.7.1/go.mod h1:zOV6ibP+V/e4l0pRN4vRJncgPENvhD2YDSPs1ZK1bcc=
github.com/tochemey/goakt/v3 v3.7.2 h1:TxZ3HsiJ37mXHIIjppCWGQKV+uASgw+dMPfjT9PexqQ=
github.com/tochemey/goakt/v3 v3.7.2/go.mod h1:zOV6ibP+V/e4l0pRN4vRJncgPENvhD2YDSPs1ZK1bcc=
github.com/tochemey/olric v0.2.3 h1:LGmsHLQBSEs3uasZNLT5MdS2pBMNJ71gSrXnYfkb62M=
github.com/tochemey/olric v0.2.3/go.mod h1:BAD82xys8R8IAWFV+GC0B8I+J4QsYZvmPS5NT/dhmtI=
github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw=
Expand Down
Loading