diff --git a/CHANGELOG.md b/CHANGELOG.md index 73a788784..d2a4238c1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - feat: add kubernetes secrets provider and API to read secrets [#885](https://github.com/hypermodeinc/modus/pull/885) - feat: "start" and "stop" are now mutation prefixes [#889](https://github.com/hypermodeinc/modus/pull/889) +- fix: start agents synchronously, and add examples setting data on start [#890](https://github.com/hypermodeinc/modus/pull/890) ## 2025-06-10 - Runtime 0.18.0-alpha.6 diff --git a/runtime/actors/actorsystem.go b/runtime/actors/actorsystem.go index 0d2c67112..b5bf6f6d3 100644 --- a/runtime/actors/actorsystem.go +++ b/runtime/actors/actorsystem.go @@ -92,13 +92,15 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { actors := _actorSystem.Actors() runningAgents := make(map[string]bool, len(actors)) for _, pid := range actors { - if actor, ok := pid.Actor().(*wasmAgentActor); ok { - runningAgents[actor.agentId] = true - actor.plugin = plugin - if err := pid.Restart(ctx); err != nil { - logger.Err(ctx, err).Msgf("Failed to restart actor for agent %s.", actor.agentId) + go func(f_ctx context.Context, f_pid *goakt.PID) { + if actor, ok := f_pid.Actor().(*wasmAgentActor); ok { + runningAgents[actor.agentId] = true + actor.plugin = plugin + if err := f_pid.Restart(f_ctx); err != nil { + logger.Err(f_ctx, err).Msgf("Failed to restart actor for agent %s.", actor.agentId) + } } - } + }(ctx, pid) } // spawn actors for agents with state in the database, that are not already running @@ -112,7 +114,11 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { host := wasmhost.GetWasmHost(ctx) for _, agent := range agents { if !runningAgents[agent.Id] { - spawnActorForAgentAsync(host, plugin, agent.Id, agent.Name, false) + go func(f_ctx context.Context, agentId string, agentName string) { + if _, err := spawnActorForAgent(host, plugin, agentId, agentName, false); err != nil { + logger.Err(f_ctx, err).Msgf("Failed to spawn actor for agent %s.", agentId) + } + }(ctx, agent.Id, agent.Name) } } diff --git a/runtime/actors/agents.go b/runtime/actors/agents.go index 0215c3669..7fa7f0bf8 100644 --- a/runtime/actors/agents.go +++ b/runtime/actors/agents.go @@ -67,26 +67,21 @@ func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) { agentId := xid.New().String() host := wasmhost.GetWasmHost(ctx) - spawnActorForAgentAsync(host, plugin, agentId, agentName, true) + pid, err := spawnActorForAgent(host, plugin, agentId, agentName, true) + if err != nil { + return nil, fmt.Errorf("error spawning actor for agent %s: %w", agentId, err) + } + actor := pid.Actor().(*wasmAgentActor) info := &AgentInfo{ - Id: agentId, - Name: agentName, - Status: AgentStatusStarting, + Id: actor.agentId, + Name: actor.agentName, + Status: actor.status, } return info, nil } -func spawnActorForAgentAsync(host wasmhost.WasmHost, plugin *plugins.Plugin, agentId, agentName string, initializing bool) { - // We spawn the actor in a goroutine to avoid blocking while the actor is being spawned. - // This allows many agents to be spawned in parallel, if needed. - // Errors are logged but not returned, as the actor system will handle them. - go func() { - _, _ = spawnActorForAgent(host, plugin, agentId, agentName, initializing) - }() -} - func spawnActorForAgent(host wasmhost.WasmHost, plugin *plugins.Plugin, agentId, agentName string, initializing bool) (*goakt.PID, error) { // The actor needs to spawn in its own context, so we don't pass one in to this function. // If we did, then when the original context was cancelled or completed, the actor initialization would be cancelled too. diff --git a/sdk/assemblyscript/examples/agents/assembly/index.ts b/sdk/assemblyscript/examples/agents/assembly/index.ts index cae0ee120..a6d748158 100644 --- a/sdk/assemblyscript/examples/agents/assembly/index.ts +++ b/sdk/assemblyscript/examples/agents/assembly/index.ts @@ -25,6 +25,16 @@ export function startCounterAgent(): AgentInfo { return agents.start("Counter"); } +/** + * Starts a counter agent with an initial count value, + * and returns info including its ID and status. + */ +export function startCounterAgentWithData(initialCount: i32): AgentInfo { + const info = agents.start("Counter"); + updateCountAsync(info.id, initialCount); + return info; +} + /** * Stops the specified agent by ID, returning its status info. * This will terminate the agent, and it cannot be resumed or restarted. diff --git a/sdk/go/examples/agents/main.go b/sdk/go/examples/agents/main.go index bee0229d8..e9110d764 100644 --- a/sdk/go/examples/agents/main.go +++ b/sdk/go/examples/agents/main.go @@ -30,6 +30,18 @@ func StartCounterAgent() (agents.AgentInfo, error) { return agents.Start("Counter") } +// Starts a counter agent with an initial count value, +// and returns info including its ID and status. +func StartCounterAgentWithData(initialCount int) (agents.AgentInfo, error) { + info, err := agents.Start("Counter") + if err != nil { + return agents.AgentInfo{}, err + } + + err = UpdateCountAsync(info.Id, initialCount) + return info, err +} + // Stops the specified agent by ID, returning its status info. // This will terminate the agent, and it cannot be resumed or restarted. // However, a new agent with the same name can be started at any time.