diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f7e8d0db..c6614ce21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ - feat: enforce WASI reactor mode [#859](https://github.com/hypermodeinc/modus/pull/859) - feat: return user and chat errors in API response [#863](https://github.com/hypermodeinc/modus/pull/863) - feat: list agents on health endpoint [#865](https://github.com/hypermodeinc/modus/pull/865) +- feat: add agent management APIs [#866](https://github.com/hypermodeinc/modus/pull/866) ## 2025-05-22 - Go SDK 0.18.0-alpha.3 diff --git a/runtime/actors/agents.go b/runtime/actors/agents.go index c0970ca7d..b11f51a6a 100644 --- a/runtime/actors/agents.go +++ b/runtime/actors/agents.go @@ -93,22 +93,58 @@ func spawnActorForAgent(host wasmhost.WasmHost, plugin *plugins.Plugin, agentId, }() } -func StopAgent(ctx context.Context, agentId string) bool { +func StopAgent(ctx context.Context, agentId string) (*AgentInfo, error) { pid, err := getActorPid(ctx, agentId) if err != nil { - logger.Err(ctx, err).Msg("Error stopping agent.") - return false + // see if it's in the database before erroring + if agent, e := db.GetAgentState(ctx, agentId); e == nil { + return &AgentInfo{ + Id: agent.Id, + Name: agent.Name, + Status: agent.Status, + }, nil + } + + return nil, fmt.Errorf("error stopping agent %s: %w", agentId, err) } + // it was found, so we can stop it actor := pid.Actor().(*wasmAgentActor) actor.status = AgentStatusStopping - if err := pid.Shutdown(ctx); err != nil { - logger.Err(ctx, err).Msg("Error stopping agent.") - return false + return nil, fmt.Errorf("error stopping agent %s: %w", agentId, err) } - return true + return &AgentInfo{ + Id: actor.agentId, + Name: actor.agentName, + Status: actor.status, + }, nil +} + +func GetAgentInfo(ctx context.Context, agentId string) (*AgentInfo, error) { + + // Try the local actor system first. + if pid, err := getActorPid(ctx, agentId); err == nil { + actor := pid.Actor().(*wasmAgentActor) + return &AgentInfo{ + Id: actor.agentId, + Name: actor.agentName, + Status: actor.status, + }, nil + } + + // Check the database as a fallback. + // This is useful if the actor is terminated, or running on another node. + if agent, err := db.GetAgentState(ctx, agentId); err == nil { + return &AgentInfo{ + Id: agent.Id, + Name: agent.Name, + Status: agent.Status, + }, nil + } + + return nil, fmt.Errorf("agent %s not found", agentId) } type agentMessageResponse struct { @@ -167,7 +203,25 @@ func getActorPid(ctx context.Context, agentId string) (*goakt.PID, error) { return pid, nil } -func ListAgents() []AgentInfo { +func ListActiveAgents(ctx context.Context) ([]AgentInfo, error) { + agents, err := db.QueryActiveAgents(ctx) + if err != nil { + return nil, fmt.Errorf("error listing active agents: %w", err) + } + + results := make([]AgentInfo, 0, len(agents)) + for _, agent := range agents { + results = append(results, AgentInfo{ + Id: agent.Id, + Name: agent.Name, + Status: agent.Status, + }) + } + + return results, nil +} + +func ListLocalAgents() []AgentInfo { if _actorSystem == nil { return nil } diff --git a/runtime/db/agentstate.go b/runtime/db/agentstate.go index 77c49c457..f859ed7dd 100644 --- a/runtime/db/agentstate.go +++ b/runtime/db/agentstate.go @@ -38,6 +38,14 @@ func WriteAgentState(ctx context.Context, state AgentState) error { } } +func GetAgentState(ctx context.Context, id string) (*AgentState, error) { + if useModusDB() { + return getAgentStateFromModusDB(ctx, id) + } else { + return getAgentStateFromPostgresDB(ctx, id) + } +} + func QueryActiveAgents(ctx context.Context) ([]AgentState, error) { if useModusDB() { return queryActiveAgentsFromModusDB(ctx) @@ -61,6 +69,21 @@ func writeAgentStateToModusDB(ctx context.Context, state AgentState) error { return err } +func getAgentStateFromModusDB(ctx context.Context, id string) (*AgentState, error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + + _, result, err := modusgraph.Get[AgentState](ctx, GlobalModusDbEngine, modusgraph.ConstrainedField{ + Key: "id", + Value: id, + }) + if err != nil { + return nil, fmt.Errorf("failed to query agent state: %w", err) + } + + return &result, nil +} + func queryActiveAgentsFromModusDB(ctx context.Context) ([]AgentState, error) { span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) defer span.Finish() @@ -106,6 +129,30 @@ func writeAgentStateToPostgresDB(ctx context.Context, state AgentState) error { return err } +func getAgentStateFromPostgresDB(ctx context.Context, id string) (*AgentState, error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + + const query = "SELECT id, name, status, data, updated FROM agents WHERE id = $1" + + var a AgentState + var ts time.Time + err := WithTx(ctx, func(tx pgx.Tx) error { + row := tx.QueryRow(ctx, query, id) + if err := row.Scan(&a.Id, &a.Name, &a.Status, &a.Data, &ts); err != nil { + return fmt.Errorf("failed to get agent state: %w", err) + } + a.UpdatedAt = ts.UTC().Format(time.RFC3339) + return nil + }) + + if err != nil { + return nil, err + } + + return &a, nil +} + func queryActiveAgentsFromPostgresDB(ctx context.Context) ([]AgentState, error) { span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) defer span.Finish() diff --git a/runtime/hostfunctions/agents.go b/runtime/hostfunctions/agents.go index 19435ef9d..6d760db42 100644 --- a/runtime/hostfunctions/agents.go +++ b/runtime/hostfunctions/agents.go @@ -30,6 +30,15 @@ func init() { return fmt.Sprintf("AgentId: %s", agentId) })) + registerHostFunction(module_name, "getAgentInfo", actors.GetAgentInfo, + withErrorMessage("Error getting agent info."), + withMessageDetail(func(agentId string) string { + return fmt.Sprintf("AgentId: %s", agentId) + })) + + registerHostFunction(module_name, "listAgents", actors.ListActiveAgents, + withErrorMessage("Error listing agents.")) + registerHostFunction(module_name, "sendMessage", actors.SendAgentMessage, withErrorMessage("Error sending message to agent."), withMessageDetail(func(agentId string, msgName string, data *string, timeout int64) string { diff --git a/runtime/httpserver/health.go b/runtime/httpserver/health.go index d10a46b7a..813b3750b 100644 --- a/runtime/httpserver/health.go +++ b/runtime/httpserver/health.go @@ -22,7 +22,7 @@ import ( var healthHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { env := app.Config().Environment() ver := app.VersionNumber() - agents := actors.ListAgents() + agents := actors.ListLocalAgents() // custom format the JSON response for easy readability diff --git a/sdk/assemblyscript/examples/agents/assembly/index.ts b/sdk/assemblyscript/examples/agents/assembly/index.ts index 40efb3124..cae0ee120 100644 --- a/sdk/assemblyscript/examples/agents/assembly/index.ts +++ b/sdk/assemblyscript/examples/agents/assembly/index.ts @@ -26,12 +26,26 @@ export function startCounterAgent(): AgentInfo { } /** - * Stops the specified agent by ID. + * Stops the specified agent by ID, returning its status info. * This will terminate the agent, and it cannot be resumed or restarted. * However, a new agent with the same name can be started at any time. */ -export function stopAgent(agentId: string): void { - agents.stop(agentId); +export function stopAgent(agentId: string): AgentInfo { + return agents.stop(agentId); +} + +/** + * Gets information about the specified agent. + */ +export function getAgentInfo(agentId: string): AgentInfo { + return agents.getInfo(agentId); +} + +/** + * List all agents, except those that have been fully terminated. + */ +export function ListAgents(): AgentInfo[] { + return agents.listAll(); } /** diff --git a/sdk/assemblyscript/src/assembly/agent.ts b/sdk/assemblyscript/src/assembly/agent.ts index 20409c92a..712b8cd8e 100644 --- a/sdk/assemblyscript/src/assembly/agent.ts +++ b/sdk/assemblyscript/src/assembly/agent.ts @@ -8,6 +8,7 @@ */ import { AgentStatus } from "./enums"; +import * as utils from "./utils"; const agents = new Map(); let activeAgent: Agent | null = null; @@ -103,7 +104,15 @@ declare function hostStartAgent(agentName: string): AgentInfo; // @ts-expect-error: decorator @external("modus_agents", "stopAgent") -declare function hostStopAgent(agentId: string): bool; +declare function hostStopAgent(agentId: string): AgentInfo; + +// @ts-expect-error: decorator +@external("modus_agents", "getAgentInfo") +declare function hostGetAgentInfo(agentId: string): AgentInfo; + +// @ts-expect-error: decorator +@external("modus_agents", "listAgents") +declare function hostListAgents(): AgentInfo[]; /** * Starts an agent with the given name. @@ -114,7 +123,11 @@ export function startAgent(name: string): AgentInfo { throw new Error(`Agent ${name} not found.`); } - return hostStartAgent(name); + const info = hostStartAgent(name); + if (utils.resultIsInvalid(info)) { + throw new Error(`Failed to start agent ${name}.`); + } + return info; } /** @@ -122,14 +135,40 @@ export function startAgent(name: string): AgentInfo { * This will terminate the agent, and it cannot be resumed or restarted. * This can be called from any user code, such as function or another agent's methods. */ -export function stopAgent(agentId: string): void { +export function stopAgent(agentId: string): AgentInfo { if (agentId == "") { throw new Error("Agent ID cannot be empty."); } - const ok = hostStopAgent(agentId); - if (!ok) { + const info = hostStopAgent(agentId); + if (utils.resultIsInvalid(info)) { throw new Error(`Failed to stop agent ${agentId}.`); } + return info; +} + +/** + * Gets information about an agent with the given ID. + */ +export function getAgentInfo(agentId: string): AgentInfo { + if (agentId == "") { + throw new Error("Agent ID cannot be empty."); + } + const info = hostGetAgentInfo(agentId); + if (utils.resultIsInvalid(info)) { + throw new Error(`Failed to get info for agent ${agentId}.`); + } + return info; +} + +/** + * Returns a list of all agents, except those that have been fully terminated. + */ +export function listAgents(): AgentInfo[] { + const agents = hostListAgents(); + if (utils.resultIsInvalid(agents)) { + throw new Error("Failed to list agents."); + } + return agents; } /** diff --git a/sdk/assemblyscript/src/assembly/agents.ts b/sdk/assemblyscript/src/assembly/agents.ts index 7e14ca33f..5317c5285 100644 --- a/sdk/assemblyscript/src/assembly/agents.ts +++ b/sdk/assemblyscript/src/assembly/agents.ts @@ -13,6 +13,8 @@ export { registerAgent as register, startAgent as start, stopAgent as stop, + getAgentInfo as getInfo, + listAgents as listAll, } from "./agent"; // @ts-expect-error: decorator diff --git a/sdk/go/examples/agents/main.go b/sdk/go/examples/agents/main.go index 69f85d61a..bee0229d8 100644 --- a/sdk/go/examples/agents/main.go +++ b/sdk/go/examples/agents/main.go @@ -30,13 +30,23 @@ func StartCounterAgent() (agents.AgentInfo, error) { return agents.Start("Counter") } -// Stops the specified agent by ID. +// Stops the specified agent by ID, returning its status info. // This will terminate the agent, and it cannot be resumed or restarted. // However, a new agent with the same name can be started at any time. -func StopAgent(agentId string) error { +func StopAgent(agentId string) (agents.AgentInfo, error) { return agents.Stop(agentId) } +// Gets information about the specified agent. +func GetAgentInfo(agentId string) (agents.AgentInfo, error) { + return agents.GetInfo(agentId) +} + +// List all agents, except those that have been fully terminated. +func ListAgents() ([]agents.AgentInfo, error) { + return agents.ListAll() +} + // Returns the current count of the specified agent. func GetCount(agentId string) (int, error) { count, err := agents.SendMessage(agentId, "count") diff --git a/sdk/go/pkg/agents/agents.go b/sdk/go/pkg/agents/agents.go index 6122f6312..b5816163b 100644 --- a/sdk/go/pkg/agents/agents.go +++ b/sdk/go/pkg/agents/agents.go @@ -56,13 +56,36 @@ func Start(name string) (AgentInfo, error) { return *info, nil } -// Stops an agent with the given ID. +// Stops an agent with the given ID and returns its status info. // This will terminate the agent, and it cannot be resumed or restarted. -func Stop(agentId string) error { - if ok := hostStopAgent(&agentId); !ok { - return fmt.Errorf("failed to stop agent %s", agentId) +func Stop(agentId string) (AgentInfo, error) { + if info := hostStopAgent(&agentId); info != nil { + return *info, nil } - return nil + return AgentInfo{}, fmt.Errorf("failed to stop agent %s", agentId) +} + +// Gets information about an agent with the given ID. +func GetInfo(agentId string) (AgentInfo, error) { + if len(agentId) == 0 { + return AgentInfo{}, errors.New("invalid agent ID") + } + + info := hostGetAgentInfo(&agentId) + if info == nil { + return AgentInfo{}, fmt.Errorf("agent %s not found", agentId) + } + + return *info, nil +} + +// Returns a list of all agents, except those that have been fully terminated. +func ListAll() ([]AgentInfo, error) { + agents := hostListAgents() + if agents == nil { + return nil, errors.New("failed to list agents") + } + return *agents, nil } // These functions are only invoked as wasm exports from the host. diff --git a/sdk/go/pkg/agents/imports_mock.go b/sdk/go/pkg/agents/imports_mock.go index 4e36b72f9..78c9f4708 100644 --- a/sdk/go/pkg/agents/imports_mock.go +++ b/sdk/go/pkg/agents/imports_mock.go @@ -18,6 +18,8 @@ import ( var StartAgentCallStack = testutils.NewCallStack() var SendMessageCallStack = testutils.NewCallStack() var StopAgentCallStack = testutils.NewCallStack() +var GetAgentInfoCallStack = testutils.NewCallStack() +var ListAgentsCallStack = testutils.NewCallStack() func hostStartAgent(agentName *string) *AgentInfo { StartAgentCallStack.Push(agentName) @@ -41,8 +43,38 @@ func hostSendMessage(agentId, msgName, data *string, timeout int64) *MessageResp return nil } -func hostStopAgent(agentId *string) bool { +func hostStopAgent(agentId *string) *AgentInfo { StopAgentCallStack.Push(agentId) - return agentId != nil && *agentId == "abc123" + if *agentId == "abc123" { + return &AgentInfo{ + Id: "abc123", + Name: "Counter", + Status: AgentStatusStopping, + } + } + return nil +} + +func hostGetAgentInfo(agentId *string) *AgentInfo { + GetAgentInfoCallStack.Push(agentId) + + if *agentId == "abc123" { + return &AgentInfo{ + Id: "abc123", + Name: "Counter", + Status: AgentStatusRunning, + } + } + + return nil +} + +func hostListAgents() *[]AgentInfo { + ListAgentsCallStack.Push() + + return &[]AgentInfo{ + {Id: "abc123", Name: "Counter", Status: AgentStatusRunning}, + {Id: "def456", Name: "Logger", Status: AgentStatusRunning}, + } } diff --git a/sdk/go/pkg/agents/imports_wasi.go b/sdk/go/pkg/agents/imports_wasi.go index 8198aaf2e..606de29c0 100644 --- a/sdk/go/pkg/agents/imports_wasi.go +++ b/sdk/go/pkg/agents/imports_wasi.go @@ -43,4 +43,40 @@ func hostSendMessage(agentId, msgName, data *string, timeout int64) *MessageResp //go:noescape //go:wasmimport modus_agents stopAgent -func hostStopAgent(agentId *string) bool +func _hostStopAgent(agentId *string) unsafe.Pointer + +//modus:import modus_agents stopAgent +func hostStopAgent(agentId *string) *AgentInfo { + info := _hostStopAgent(agentId) + if info == nil { + return nil + } + return (*AgentInfo)(info) +} + +//go:noescape +//go:wasmimport modus_agents getAgentInfo +func _hostGetAgentInfo(agentId *string) unsafe.Pointer + +//modus:import modus_agents getAgentInfo +func hostGetAgentInfo(agentId *string) *AgentInfo { + info := _hostGetAgentInfo(agentId) + if info == nil { + return nil + } + return (*AgentInfo)(info) +} + +//go:noescape +//go:wasmimport modus_agents listAgents +func _hostListAgents() unsafe.Pointer + +//modus:import modus_agents listAgents +func hostListAgents() *[]AgentInfo { + ptr := _hostListAgents() + if ptr == nil { + return nil + } + + return (*[]AgentInfo)(ptr) +}