diff --git a/.trunk/configs/cspell.json b/.trunk/configs/cspell.json index 173ef7132..1880d952e 100644 --- a/.trunk/configs/cspell.json +++ b/.trunk/configs/cspell.json @@ -119,6 +119,7 @@ "minilm", "modfile", "modusdb", + "modusgraph", "Msgf", "mydb", "mydgraph", diff --git a/CHANGELOG.md b/CHANGELOG.md index eebcaf5c2..3458bdc71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ## UNRELEASED - feat: initial implementation of Modus Agents [#840](https://github.com/hypermodeinc/modus/pull/840) +- feat: persistence and lifecycle improvements for Modus Agents [#843](https://github.com/hypermodeinc/modus/pull/843) ## 2025-05-06 - CLI 0.17.4 diff --git a/go.work b/go.work index 3e23c26c5..4dac6a2b4 100644 --- a/go.work +++ b/go.work @@ -1,4 +1,4 @@ -go 1.24.2 +go 1.24.3 use ( ./lib/manifest diff --git a/runtime/actors/actorsystem.go b/runtime/actors/actorsystem.go index 113549221..b4a894ad4 100644 --- a/runtime/actors/actorsystem.go +++ b/runtime/actors/actorsystem.go @@ -13,9 +13,11 @@ import ( "context" "time" + "github.com/hypermodeinc/modus/runtime/db" "github.com/hypermodeinc/modus/runtime/logger" "github.com/hypermodeinc/modus/runtime/pluginmanager" "github.com/hypermodeinc/modus/runtime/plugins" + "github.com/hypermodeinc/modus/runtime/wasmhost" goakt "github.com/tochemey/goakt/v3/actor" ) @@ -45,18 +47,36 @@ func Initialize(ctx context.Context) { logger.Info(ctx).Msg("Actor system started.") - pluginmanager.RegisterPluginLoadedCallback(reloadAgentActors) + pluginmanager.RegisterPluginLoadedCallback(loadAgentActors) } -func reloadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { - for _, pid := range _actorSystem.Actors() { - if actor, ok := pid.Actor().(*WasmAgentActor); ok { +func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { + // reload modules for actors that are already running + actors := _actorSystem.Actors() + runningAgents := make(map[string]bool, len(actors)) + for _, pid := range actors { + if actor, ok := pid.Actor().(*wasmAgentActor); ok { + runningAgents[actor.agentId] = true if err := actor.reloadModule(ctx, plugin); err != nil { return err } } } + // spawn actors for agents with state in the database, that are not already running + // TODO: when we scale out with GoAkt cluster mode, we'll need to decide which node is responsible for spawning the actor + agents, err := db.QueryActiveAgents(ctx) + if err != nil { + logger.Err(ctx, err).Msg("Failed to query agents from database.") + return err + } + host := wasmhost.GetWasmHost(ctx) + for _, agent := range agents { + if !runningAgents[agent.Id] { + spawnActorForAgent(host, plugin, agent.Id, agent.Name, true, &agent.Data) + } + } + return nil } diff --git a/runtime/actors/agents.go b/runtime/actors/agents.go index 894801b31..56a08c19b 100644 --- a/runtime/actors/agents.go +++ b/runtime/actors/agents.go @@ -15,6 +15,7 @@ import ( "strings" "time" + "github.com/hypermodeinc/modus/runtime/db" "github.com/hypermodeinc/modus/runtime/logger" "github.com/hypermodeinc/modus/runtime/messages" "github.com/hypermodeinc/modus/runtime/plugins" @@ -26,27 +27,25 @@ import ( goakt "github.com/tochemey/goakt/v3/actor" ) -type AgentInfo struct { - Id string - Name string - Status AgentStatus +type agentInfo struct { + id string + name string + status agentStatus } -type AgentStatus = string +type agentStatus = string -// TODO: validate these statuses are needed and used correctly const ( - AgentStatusUninitialized AgentStatus = "uninitialized" - AgentStatusError AgentStatus = "error" - AgentStatusStarting AgentStatus = "starting" - AgentStatusStarted AgentStatus = "started" - AgentStatusStopping AgentStatus = "stopping" - AgentStatusStopped AgentStatus = "stopped" - AgentStatusSuspended AgentStatus = "suspended" - AgentStatusTerminated AgentStatus = "terminated" + agentStatusStarting agentStatus = "starting" + agentStatusRunning agentStatus = "running" + agentStatusSuspending agentStatus = "suspending" + agentStatusSuspended agentStatus = "suspended" + agentStatusRestoring agentStatus = "restoring" + agentStatusTerminating agentStatus = "terminating" + agentStatusTerminated agentStatus = "terminated" ) -func SpawnAgentActor(ctx context.Context, agentName string) (*AgentInfo, error) { +func SpawnAgentActor(ctx context.Context, agentName string) (*agentInfo, error) { plugin, ok := plugins.GetPluginFromContext(ctx) if !ok { return nil, fmt.Errorf("no plugin found in context") @@ -54,50 +53,75 @@ func SpawnAgentActor(ctx context.Context, agentName string) (*AgentInfo, error) agentId := xid.New().String() host := wasmhost.GetWasmHost(ctx) + spawnActorForAgent(host, plugin, agentId, agentName, false, nil) - // We spawn the actor in a goroutine to avoid blocking the host function while the actor is being spawned. + info := &agentInfo{ + id: agentId, + name: agentName, + status: agentStatusStarting, + } + + return info, nil +} + +func spawnActorForAgent(host wasmhost.WasmHost, plugin *plugins.Plugin, agentId, agentName string, restoring bool, initialState *string) { + // The actor needs to spawn in its own context, so we don't pass one in to this function. + // If we did, then when the original context was cancelled or completed, the actor initialization would be cancelled too. + + // We spawn the actor in a goroutine to avoid blocking while the actor is being spawned. + // This allows many agents to be spawned in parallel, if needed. go func() { - // The actor needs to spawn in a new context. Otherwise, when the original context is cancelled (such as when the function completes), - // the actor initialization would be cancelled too. ctx := context.Background() ctx = context.WithValue(ctx, utils.WasmHostContextKey, host) ctx = context.WithValue(ctx, utils.PluginContextKey, plugin) ctx = context.WithValue(ctx, utils.AgentIdContextKey, agentId) ctx = context.WithValue(ctx, utils.AgentNameContextKey, agentName) - actor := NewWasmAgentActor(agentId, agentName, plugin) - actorName := fmt.Sprintf("agent-%s", agentId) + actor := newWasmAgentActor(agentId, agentName, host, plugin) + actorName := getActorName(agentId) + + if restoring { + actor.status = agentStatusRestoring + actor.initialState = initialState + } else { + actor.status = agentStatusStarting + } + if _, err := _actorSystem.Spawn(ctx, actorName, actor); err != nil { - logger.Err(ctx, err).Msg("Error spawning actor.") + logger.Err(ctx, err).Msg("Error spawning actor for agent.") } }() +} + +func TerminateAgent(ctx context.Context, agentId string) bool { + pid, err := getActorPid(ctx, agentId) + if err != nil { + logger.Err(ctx, err).Msg("Error terminating agent.") + return false + } + + actor := pid.Actor().(*wasmAgentActor) + actor.status = agentStatusTerminating - info := &AgentInfo{ - Id: agentId, - Name: agentName, - Status: AgentStatusStarting, + if err := pid.Shutdown(ctx); err != nil { + logger.Err(ctx, err).Msg("Error terminating agent.") + return false } - return info, nil + return true } -type AgentMessageResponse struct { +type agentMessageResponse struct { Data *string } -func SendAgentMessage(ctx context.Context, agentId string, msgName string, data *string, timeout int64) (*AgentMessageResponse, error) { +func SendAgentMessage(ctx context.Context, agentId string, msgName string, data *string, timeout int64) (*agentMessageResponse, error) { - addr, pid, err := _actorSystem.ActorOf(ctx, getActorName(agentId)) + pid, err := getActorPid(ctx, agentId) if err != nil { - if strings.HasSuffix(err.Error(), " not found") { - return nil, fmt.Errorf("agent %s not found", agentId) - } else { - return nil, fmt.Errorf("error getting actor for agent %s: %w", agentId, err) - } + return nil, err } - _ = addr // TODO: this will be used when we implement remote actors with clustering - msg := &messages.AgentRequestMessage{ Name: msgName, Data: data, @@ -108,7 +132,7 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data if err := goakt.Tell(ctx, pid, msg); err != nil { return nil, fmt.Errorf("error sending message to agent %s: %w", pid.ID(), err) } - return &AgentMessageResponse{}, nil + return &agentMessageResponse{}, nil } res, err := goakt.Ask(ctx, pid, msg, time.Duration(timeout)) @@ -117,7 +141,7 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data } if response, ok := res.(*messages.AgentResponseMessage); ok { - return &AgentMessageResponse{response.Data}, nil + return &agentMessageResponse{response.Data}, nil } else { return nil, fmt.Errorf("unexpected response type: %T", res) } @@ -127,30 +151,61 @@ func getActorName(agentId string) string { return "agent-" + agentId } -type WasmAgentActor struct { - agentId string - agentName string - plugin *plugins.Plugin - host wasmhost.WasmHost - module wasm.Module - buffers utils.OutputBuffers +func getActorPid(ctx context.Context, agentId string) (*goakt.PID, error) { + + addr, pid, err := _actorSystem.ActorOf(ctx, getActorName(agentId)) + if err != nil { + if strings.HasSuffix(err.Error(), " not found") { + return nil, fmt.Errorf("agent %s not found", agentId) + } else { + return nil, fmt.Errorf("error getting actor for agent %s: %w", agentId, err) + } + } + + _ = addr // TODO: this will be used when we implement remote actors with clustering + + return pid, nil +} + +type wasmAgentActor struct { + agentId string + agentName string + status agentStatus + plugin *plugins.Plugin + host wasmhost.WasmHost + module wasm.Module + buffers utils.OutputBuffers + initialState *string } -func NewWasmAgentActor(agentId, agentName string, plugin *plugins.Plugin) *WasmAgentActor { - return &WasmAgentActor{ +func newWasmAgentActor(agentId, agentName string, host wasmhost.WasmHost, plugin *plugins.Plugin) *wasmAgentActor { + return &wasmAgentActor{ agentId: agentId, agentName: agentName, + host: host, plugin: plugin, } } -func (a *WasmAgentActor) PreStart(ac *goakt.Context) error { - ctx := ac.Context() +func (a *wasmAgentActor) PreStart(ac *goakt.Context) error { + ctx := a.newContext() + + switch a.status { + case agentStatusStarting: + logger.Info(ctx).Msg("Starting agent.") + case agentStatusRestoring, agentStatusSuspended: + a.status = agentStatusRestoring + logger.Info(ctx).Msg("Restoring agent.") + default: + return fmt.Errorf("invalid agent status for actor PreStart: %s", a.status) + } + + if err := a.saveState(ctx); err != nil { + logger.Err(ctx, err).Msg("Error saving agent state.") + } - logger.Info(ctx).Bool("user_visible", true).Msg("Starting agent.") start := time.Now() - a.host = wasmhost.GetWasmHost(ctx) a.buffers = utils.NewOutputBuffers() if mod, err := a.host.GetModuleInstance(ctx, a.plugin, a.buffers); err != nil { return err @@ -158,46 +213,106 @@ func (a *WasmAgentActor) PreStart(ac *goakt.Context) error { a.module = mod } - if err := a.activateAgent(ctx, false); err != nil { - logger.Err(ctx, err).Bool("user_visible", true).Msg("Error starting agent.") + if err := a.activateAgent(ctx); err != nil { + logger.Err(ctx, err).Msg("Error activating agent.") return err } + if a.status == agentStatusRestoring { + if err := a.setAgentState(ctx, a.initialState); err != nil { + logger.Err(ctx, err).Msg("Error restoring agent state.") + } + a.initialState = nil + } + duration := time.Since(start) - logger.Info(ctx).Dur("duration_ms", duration).Bool("user_visible", true).Msg("Agent started successfully.") + if a.status == agentStatusRestoring { + logger.Info(ctx).Msg("Agent restored successfully.") + } else { + logger.Info(ctx).Dur("duration_ms", duration).Msg("Agent started successfully.") + } + + a.status = agentStatusRunning + + if err := a.saveState(ctx); err != nil { + logger.Err(ctx, err).Msg("Error saving agent state.") + } + return nil } -func (a *WasmAgentActor) PostStop(ac *goakt.Context) error { - ctx := ac.Context() +func (a *wasmAgentActor) PostStop(ac *goakt.Context) error { + ctx := a.newContext() defer a.module.Close(ctx) - // the context may not have these values set - ctx = context.WithValue(ctx, utils.PluginContextKey, a.plugin) - ctx = context.WithValue(ctx, utils.AgentIdContextKey, a.agentId) - ctx = context.WithValue(ctx, utils.AgentNameContextKey, a.agentName) + switch a.status { + case agentStatusRunning, agentStatusSuspending: + a.status = agentStatusSuspending + logger.Info(ctx).Msg("Suspending agent.") + case agentStatusTerminating: + logger.Info(ctx).Msg("Terminating agent.") + + default: + return fmt.Errorf("invalid agent status for actor PostStop: %s", a.status) + } + + if err := a.saveState(ctx); err != nil { + logger.Err(ctx, err).Msg("Error saving agent state.") + } - logger.Info(ctx).Bool("user_visible", true).Msg("Stopping agent.") start := time.Now() if err := a.shutdownAgent(ctx); err != nil { - logger.Err(ctx, err).Bool("user_visible", true).Msg("Error stopping agent.") + logger.Err(ctx, err).Msg("Error shutting down agent.") return err } duration := time.Since(start) - logger.Info(ctx).Dur("duration_ms", duration).Bool("user_visible", true).Msg("Agent stopped successfully.") + switch a.status { + case agentStatusSuspending: + a.status = agentStatusSuspended + if err := a.saveState(ctx); err != nil { + return err + } + logger.Info(ctx).Msg("Agent suspended successfully.") + case agentStatusTerminating: + a.status = agentStatusTerminated + if err := a.saveState(ctx); err != nil { + return err + } + logger.Info(ctx).Dur("duration_ms", duration).Msg("Agent terminated successfully.") + default: + return fmt.Errorf("invalid agent status for actor PostStop: %s", a.status) + } + return nil } -func (a *WasmAgentActor) Receive(rc *goakt.ReceiveContext) { - // Create a new context to avoid having the actor handle the message in the host function context. - // This is important for async messages, where the calling function may return before the message is handled. - ctx := context.Background() - ctx = context.WithValue(ctx, utils.WasmHostContextKey, a.host) - ctx = context.WithValue(ctx, utils.PluginContextKey, a.plugin) - ctx = context.WithValue(ctx, utils.AgentIdContextKey, a.agentId) - ctx = context.WithValue(ctx, utils.AgentNameContextKey, a.agentName) +func (a *wasmAgentActor) saveState(ctx context.Context) error { + var data string + if a.module != nil { + if d, err := a.getAgentState(ctx); err != nil { + return fmt.Errorf("error getting state from agent: %w", err) + } else if d != nil { + data = *d + } + } + + if err := db.WriteAgentState(ctx, db.AgentState{ + Id: a.agentId, + Name: a.agentName, + Status: a.status, + Data: data, + UpdatedAt: time.Now().UTC().Format(time.RFC3339), + }); err != nil { + return fmt.Errorf("error saving state to database: %w", err) + } + + return nil +} + +func (a *wasmAgentActor) Receive(rc *goakt.ReceiveContext) { + ctx := a.newContext() switch msg := rc.Message().(type) { case *messages.AgentRequestMessage: @@ -244,12 +359,26 @@ func (a *WasmAgentActor) Receive(rc *goakt.ReceiveContext) { duration := time.Since(start) logger.Info(ctx).Str("msg_name", msg.Name).Dur("duration_ms", duration).Msg("Message handled successfully.") + // save the state after handling the message to ensure the state is up to date in case of hard termination + if err := a.saveState(ctx); err != nil { + logger.Err(ctx, err).Msg("Error saving agent state.") + } + default: rc.Unhandled() } } -func (a *WasmAgentActor) activateAgent(ctx context.Context, reloading bool) error { +func (a *wasmAgentActor) newContext() context.Context { + ctx := context.Background() + ctx = context.WithValue(ctx, utils.WasmHostContextKey, a.host) + ctx = context.WithValue(ctx, utils.PluginContextKey, a.plugin) + ctx = context.WithValue(ctx, utils.AgentIdContextKey, a.agentId) + ctx = context.WithValue(ctx, utils.AgentNameContextKey, a.agentName) + return ctx +} + +func (a *wasmAgentActor) activateAgent(ctx context.Context) error { fnInfo, err := a.host.GetFunctionInfo("_modus_agent_activate") if err != nil { @@ -259,7 +388,7 @@ func (a *WasmAgentActor) activateAgent(ctx context.Context, reloading bool) erro params := map[string]any{ "name": a.agentName, "id": a.agentId, - "reloading": reloading, + "reloading": a.status == agentStatusRestoring, } execInfo, err := a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) @@ -268,19 +397,24 @@ func (a *WasmAgentActor) activateAgent(ctx context.Context, reloading bool) erro return err } -func (a *WasmAgentActor) shutdownAgent(ctx context.Context) error { +func (a *wasmAgentActor) shutdownAgent(ctx context.Context) error { fnInfo, err := a.host.GetFunctionInfo("_modus_agent_shutdown") if err != nil { return err } - execInfo, err := a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, nil) + + params := map[string]any{ + "suspending": a.status == agentStatusSuspending, + } + + execInfo, err := a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) _ = execInfo // TODO return err } -func (a *WasmAgentActor) getAgentState(ctx context.Context) (*string, error) { +func (a *wasmAgentActor) getAgentState(ctx context.Context) (*string, error) { fnInfo, err := a.host.GetFunctionInfo("_modus_agent_get_state") if err != nil { @@ -307,7 +441,7 @@ func (a *WasmAgentActor) getAgentState(ctx context.Context) (*string, error) { } } -func (a *WasmAgentActor) setAgentState(ctx context.Context, data *string) error { +func (a *wasmAgentActor) setAgentState(ctx context.Context, data *string) error { fnInfo, err := a.host.GetFunctionInfo("_modus_agent_set_state") if err != nil { return err @@ -326,19 +460,19 @@ func (a *WasmAgentActor) setAgentState(ctx context.Context, data *string) error return err } -func (a *WasmAgentActor) reloadModule(ctx context.Context, plugin *plugins.Plugin) error { +func (a *wasmAgentActor) reloadModule(ctx context.Context, plugin *plugins.Plugin) error { // the context may not have these values set ctx = context.WithValue(ctx, utils.PluginContextKey, a.plugin) ctx = context.WithValue(ctx, utils.AgentIdContextKey, a.agentId) ctx = context.WithValue(ctx, utils.AgentNameContextKey, a.agentName) - logger.Info(ctx).Bool("user_visible", true).Msg("Reloading module for agent.") + logger.Info(ctx).Msg("Reloading module for agent.") // get the current state and close the module instance state, err := a.getAgentState(ctx) if err != nil { - logger.Err(ctx, err).Bool("user_visible", true).Msg("Error getting agent state.") + logger.Err(ctx, err).Msg("Error getting agent state.") return err } a.module.Close(ctx) @@ -353,18 +487,18 @@ func (a *WasmAgentActor) reloadModule(ctx context.Context, plugin *plugins.Plugi a.module = mod // activate the agent in the new module instance - if err := a.activateAgent(ctx, true); err != nil { - logger.Err(ctx, err).Bool("user_visible", true).Msg("Error reloading agent.") + if err := a.activateAgent(ctx); err != nil { + logger.Err(ctx, err).Msg("Error reloading agent.") return err } // restore the state in the new module instance if err := a.setAgentState(ctx, state); err != nil { - logger.Err(ctx, err).Bool("user_visible", true).Msg("Error setting agent state.") + logger.Err(ctx, err).Msg("Error setting agent state.") return err } - logger.Info(ctx).Bool("user_visible", true).Msg("Agent reloaded module successfully.") + logger.Info(ctx).Msg("Agent reloaded module successfully.") return nil } diff --git a/runtime/db/agentstate.go b/runtime/db/agentstate.go new file mode 100644 index 000000000..77c49c457 --- /dev/null +++ b/runtime/db/agentstate.go @@ -0,0 +1,139 @@ +/* + * Copyright 2025 Hypermode Inc. + * Licensed under the terms of the Apache License, Version 2.0 + * See the LICENSE file that accompanied this code for further details. + * + * SPDX-FileCopyrightText: 2025 Hypermode Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package db + +import ( + "context" + "fmt" + "time" + + "github.com/hypermodeinc/modus/runtime/logger" + "github.com/hypermodeinc/modus/runtime/utils" + + "github.com/hypermodeinc/modusgraph" + "github.com/jackc/pgx/v5" +) + +type AgentState struct { + Gid uint64 `json:"gid,omitempty"` + Id string `json:"id" db:"constraint=unique"` + Name string `json:"name"` + Status string `json:"status"` + Data string `json:"data,omitempty"` + UpdatedAt string `json:"updated"` +} + +func WriteAgentState(ctx context.Context, state AgentState) error { + if useModusDB() { + return writeAgentStateToModusDB(ctx, state) + } else { + return writeAgentStateToPostgresDB(ctx, state) + } +} + +func QueryActiveAgents(ctx context.Context) ([]AgentState, error) { + if useModusDB() { + return queryActiveAgentsFromModusDB(ctx) + } else { + return queryActiveAgentsFromPostgresDB(ctx) + } +} + +func writeAgentStateToModusDB(ctx context.Context, state AgentState) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + + if GlobalModusDbEngine == nil { + logger.Warn(ctx).Msg("ModusDB engine is not available. Agent state will not be saved.") + return nil + } + + gid, _, _, err := modusgraph.Upsert(ctx, GlobalModusDbEngine, state) + state.Gid = gid + + return err +} + +func queryActiveAgentsFromModusDB(ctx context.Context) ([]AgentState, error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + + _, results, err := modusgraph.Query[AgentState](ctx, GlobalModusDbEngine, modusgraph.QueryParams{ + Filter: &modusgraph.Filter{ + Not: &modusgraph.Filter{ + Field: "status", + String: modusgraph.StringPredicate{ + Equals: "terminated", + }, + }, + }, + // TODO: Sorting gives a dgraph error. Why? + // Sorting: &modusgraph.Sorting{ + // OrderDescField: "updated", + // OrderDescFirst: true, + // }, + }) + + if err != nil { + return nil, fmt.Errorf("failed to query agent state: %w", err) + } + + return results, nil +} + +func writeAgentStateToPostgresDB(ctx context.Context, state AgentState) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + + const query = "INSERT INTO agents (id, name, status, data, updated) VALUES ($1, $2, $3, $4, $5) " + + "ON CONFLICT (id) DO UPDATE SET name = $2, status = $3, data = $4, updated = $5" + + err := WithTx(ctx, func(tx pgx.Tx) error { + _, err := tx.Exec(ctx, query, state.Id, state.Name, state.Status, state.Data, state.UpdatedAt) + if err != nil { + return fmt.Errorf("failed to write agent state: %w", err) + } + return nil + }) + + return err +} + +func queryActiveAgentsFromPostgresDB(ctx context.Context) ([]AgentState, error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + + const query = "SELECT id, name, status, data, updated FROM agents " + + "WHERE status != 'terminated' ORDER BY updated DESC" + + results := make([]AgentState, 0) + err := WithTx(ctx, func(tx pgx.Tx) error { + rows, err := tx.Query(ctx, query) + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var a AgentState + var ts time.Time + if err := rows.Scan(&a.Id, &a.Name, &a.Status, &a.Data, &ts); err != nil { + return err + } + a.UpdatedAt = ts.UTC().Format(time.RFC3339) + results = append(results, a) + } + if err := rows.Err(); err != nil { + return err + } + return nil + }) + + return results, err +} diff --git a/runtime/db/db.go b/runtime/db/db.go index 23dfd71ee..205c9f826 100644 --- a/runtime/db/db.go +++ b/runtime/db/db.go @@ -13,6 +13,8 @@ import ( "context" "errors" "fmt" + "os" + "strconv" "sync" "time" @@ -55,7 +57,7 @@ func logDbWarningOrError(ctx context.Context, err error, msg string) { if _, ok := err.(*pgconn.ConnectError); ok { logger.Warn(ctx).Err(err).Msgf("Database connection error. %s", msg) } else if errors.Is(err, errDbNotConfigured) { - if !app.IsDevEnvironment() { + if !useModusDB() { logger.Warn(ctx).Msgf("Database has not been configured. %s", msg) } } else { @@ -393,7 +395,7 @@ func QueryCollectionVectorsFromCheckpoint(ctx context.Context, collectionName, s func Initialize(ctx context.Context) { // this will initialize the pool and start the worker _, err := globalRuntimePostgresWriter.GetPool(ctx) - if err != nil && !app.IsDevEnvironment() { + if err != nil && !useModusDB() { logger.Warn(ctx).Err(err).Msg("Metadata database is not available.") } go globalRuntimePostgresWriter.worker(ctx) @@ -429,3 +431,23 @@ func WithTx(ctx context.Context, fn func(pgx.Tx) error) error { return tx.Commit(ctx) } + +var _useModusDBOnce sync.Once +var _useModusDB bool + +func useModusDB() bool { + _useModusDBOnce.Do(func() { + // this gives us a way to force the use or disuse of ModusDB for development + s := os.Getenv("MODUS_DB_USE_MODUSDB") + if s != "" { + if value, err := strconv.ParseBool(s); err == nil { + _useModusDB = value + return + } + } + + // otherwise, it's based on the environment + _useModusDB = app.IsDevEnvironment() + }) + return _useModusDB +} diff --git a/runtime/db/inferencehistory.go b/runtime/db/inferencehistory.go index de920b098..bd2dce3fb 100644 --- a/runtime/db/inferencehistory.go +++ b/runtime/db/inferencehistory.go @@ -15,12 +15,12 @@ import ( "time" "github.com/hypermodeinc/modus/lib/manifest" - "github.com/hypermodeinc/modus/runtime/app" "github.com/hypermodeinc/modus/runtime/metrics" "github.com/hypermodeinc/modus/runtime/plugins" "github.com/hypermodeinc/modus/runtime/secrets" "github.com/hypermodeinc/modus/runtime/utils" - "github.com/hypermodeinc/modusdb" + + "github.com/hypermodeinc/modusgraph" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" ) @@ -184,10 +184,10 @@ func getInferenceDataJson(val any) ([]byte, error) { func WritePluginInfo(ctx context.Context, plugin *plugins.Plugin) { - if app.IsDevEnvironment() { + if useModusDB() { err := writePluginInfoToModusdb(ctx, plugin) if err != nil { - logDbWarningOrError(ctx, err, "Plugin info not written to ModusDB.") + logDbWarningOrError(ctx, err, "Plugin info not written to modusgraph.") } return } @@ -250,7 +250,7 @@ func writePluginInfoToModusdb(ctx context.Context, plugin *plugins.Plugin) error if GlobalModusDbEngine == nil { return nil } - _, _, err := modusdb.Create[Plugin](ctx, GlobalModusDbEngine, Plugin{ + _, _, err := modusgraph.Create[Plugin](ctx, GlobalModusDbEngine, Plugin{ Id: plugin.Id, Name: plugin.Metadata.Name(), Version: plugin.Metadata.Version(), @@ -306,10 +306,10 @@ func WriteInferenceHistoryToDB(ctx context.Context, batch []inferenceHistory) { return } - if app.IsDevEnvironment() { + if useModusDB() { err := writeInferenceHistoryToModusDb(ctx, batch) if err != nil { - logDbWarningOrError(ctx, err, "Inference history not written to ModusDB.") + logDbWarningOrError(ctx, err, "Inference history not written to modusgraph.") } return } @@ -377,7 +377,7 @@ func writeInferenceHistoryToModusDb(ctx context.Context, batch []inferenceHistor } else { pluginId = *data.pluginId } - _, _, err = modusdb.Create[Inference](ctx, GlobalModusDbEngine, Inference{ + _, _, err = modusgraph.Create[Inference](ctx, GlobalModusDbEngine, Inference{ Id: utils.GenerateUUIDv7(), ModelHash: data.model.Hash(), Input: string(input), @@ -400,7 +400,7 @@ func QueryPlugins(ctx context.Context) ([]Plugin, error) { if GlobalModusDbEngine == nil { return nil, nil } - _, plugins, err := modusdb.Query[Plugin](ctx, GlobalModusDbEngine, modusdb.QueryParams{}) + _, plugins, err := modusgraph.Query[Plugin](ctx, GlobalModusDbEngine, modusgraph.QueryParams{}) return plugins, err } @@ -408,6 +408,6 @@ func QueryInferences(ctx context.Context) ([]Inference, error) { if GlobalModusDbEngine == nil { return nil, nil } - _, inferences, err := modusdb.Query[Inference](ctx, GlobalModusDbEngine, modusdb.QueryParams{}) + _, inferences, err := modusgraph.Query[Inference](ctx, GlobalModusDbEngine, modusgraph.QueryParams{}) return inferences, err } diff --git a/runtime/db/migrations/000006_add_started_at_index.down.sql b/runtime/db/migrations/000006_add_started_at_index.down.sql new file mode 100644 index 000000000..4b17edd88 --- /dev/null +++ b/runtime/db/migrations/000006_add_started_at_index.down.sql @@ -0,0 +1,5 @@ +BEGIN; + +DROP INDEX IF EXISTS idx_started_at_desc; + +COMMIT; diff --git a/runtime/db/migrations/000006_add_started_at_index.up.sql b/runtime/db/migrations/000006_add_started_at_index.up.sql new file mode 100644 index 000000000..3dd1d69b6 --- /dev/null +++ b/runtime/db/migrations/000006_add_started_at_index.up.sql @@ -0,0 +1,5 @@ +BEGIN; + +CREATE INDEX idx_started_at_desc ON inferences (started_at DESC); + +COMMIT; diff --git a/runtime/db/migrations/000007_add_agent_state.down.sql b/runtime/db/migrations/000007_add_agent_state.down.sql new file mode 100644 index 000000000..bd8495b31 --- /dev/null +++ b/runtime/db/migrations/000007_add_agent_state.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS "agents"; diff --git a/runtime/db/migrations/000007_add_agent_state.up.sql b/runtime/db/migrations/000007_add_agent_state.up.sql new file mode 100644 index 000000000..9dceee7c2 --- /dev/null +++ b/runtime/db/migrations/000007_add_agent_state.up.sql @@ -0,0 +1,16 @@ +BEGIN; + +CREATE TABLE IF NOT EXISTS "agents" ( + "id" TEXT NOT NULL PRIMARY KEY, + "name" TEXT NOT NULL, + "status" TEXT NOT NULL, + "data" TEXT, + "updated" TIMESTAMP(3) WITH TIME ZONE NOT NULL +); + +CREATE INDEX IF NOT EXISTS agents_name_idx ON agents (name); +CREATE INDEX IF NOT EXISTS agents_status_idx ON agents (status); +CREATE INDEX IF NOT EXISTS agents_updated_idx ON agents (updated); +CREATE INDEX IF NOT EXISTS agents_status_updated_idx ON agents (status, updated); + +COMMIT; diff --git a/runtime/db/modusdb.go b/runtime/db/modusdb.go index 3ab384c11..4e22a4cf2 100644 --- a/runtime/db/modusdb.go +++ b/runtime/db/modusdb.go @@ -19,13 +19,14 @@ import ( "github.com/hypermodeinc/modus/runtime/app" "github.com/hypermodeinc/modus/runtime/logger" - "github.com/hypermodeinc/modusdb" + + "github.com/hypermodeinc/modusgraph" ) -var GlobalModusDbEngine *modusdb.Engine +var GlobalModusDbEngine *modusgraph.Engine func InitModusDb(ctx context.Context) { - if !app.IsDevEnvironment() || runtime.GOOS == "windows" { + if !useModusDB() || runtime.GOOS == "windows" { // ModusDB should only be initialized in dev environment, // and currently does not work on Windows. return @@ -41,8 +42,8 @@ func InitModusDb(ctx context.Context) { dataDir = filepath.Join(appPath, ".modusdb") } - if eng, err := modusdb.NewEngine(modusdb.NewDefaultConfig(dataDir)); err != nil { - logger.Fatal(ctx).Err(err).Msg("Failed to initialize modusdb.") + if eng, err := modusgraph.NewEngine(modusgraph.NewDefaultConfig(dataDir)); err != nil { + logger.Fatal(ctx).Err(err).Msg("Failed to initialize the local modusGraph database.") } else { GlobalModusDbEngine = eng } diff --git a/runtime/go.mod b/runtime/go.mod index 72376be17..09bc9a0a4 100644 --- a/runtime/go.mod +++ b/runtime/go.mod @@ -1,6 +1,6 @@ module github.com/hypermodeinc/modus/runtime -go 1.24.2 +go 1.24.3 // trunk-ignore-all(osv-scanner/GHSA-9w9f-6mg8-jp7w): not affected by bleve vulnerability @@ -26,7 +26,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hypermodeinc/modus/lib/manifest v0.17.2 github.com/hypermodeinc/modus/lib/metadata v0.15.0 - github.com/hypermodeinc/modusdb v0.0.0-20250416120035-6d80353c1351 + github.com/hypermodeinc/modusgraph v0.0.0-20250515173656-5f22fe4d94b5 github.com/jackc/pgx/v5 v5.7.4 github.com/jensneuse/abstractlogger v0.0.4 github.com/joho/godotenv v1.5.1 @@ -64,7 +64,7 @@ require ( github.com/DataDog/datadog-go v4.8.3+incompatible // indirect github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20220622145613-731d59e8b567 // indirect github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect - github.com/IBM/sarama v1.45.0 // indirect + github.com/IBM/sarama v1.45.1 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/RoaringBitmap/roaring v1.9.4 // indirect github.com/Workiva/go-datastructures v1.1.5 // indirect @@ -111,6 +111,8 @@ require ( github.com/distribution/reference v0.6.0 // indirect github.com/dlclark/regexp2 v1.11.0 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/dolan-in/dgman/v2 v2.0.0 // indirect + github.com/dolan-in/reflectwalk v1.0.2-0.20210101124621-dc2073a29d71 // indirect github.com/dop251/goja v0.0.0-20230906160731-9410bcaa81d2 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/eapache/go-resiliency v1.7.0 // indirect @@ -132,7 +134,7 @@ require ( github.com/golang/glog v1.2.4 // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/golang/snappy v0.0.4 // indirect + github.com/golang/snappy v1.0.0 // indirect github.com/google/btree v1.1.3 // indirect github.com/google/codesearch v1.2.0 // indirect github.com/google/flatbuffers v25.2.10+incompatible // indirect @@ -153,7 +155,7 @@ require ( github.com/hashicorp/hcl v1.0.1-vault-7 // indirect github.com/hashicorp/logutils v1.0.0 // indirect github.com/hashicorp/memberlist v0.5.3 // indirect - github.com/hashicorp/vault/api v1.15.0 // indirect + github.com/hashicorp/vault/api v1.16.0 // indirect github.com/hypermodeinc/dgraph/v24 v24.1.2 // indirect github.com/hypermodeinc/modus/lib/wasmextractor v0.13.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -174,7 +176,6 @@ require ( github.com/lestrrat-go/httpcc v1.0.1 // indirect github.com/lestrrat-go/httprc/v3 v3.0.0-beta2 // indirect github.com/lestrrat-go/option v1.0.1 // indirect - github.com/magiconair/properties v1.8.9 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/miekg/dns v1.1.65 // indirect @@ -208,7 +209,6 @@ require ( github.com/reugn/go-quartz v0.14.0 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect github.com/sagikazarmark/locafero v0.7.0 // indirect - github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect github.com/segmentio/asm v1.2.0 // indirect @@ -218,7 +218,7 @@ require ( github.com/spf13/afero v1.12.0 // indirect github.com/spf13/cobra v1.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect - github.com/spf13/viper v1.19.0 // indirect + github.com/spf13/viper v1.20.1 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/tidwall/btree v1.7.0 // indirect github.com/tidwall/jsonc v0.3.2 // indirect @@ -238,7 +238,7 @@ require ( github.com/xdg/stringprep v1.0.3 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.akshayshah.org/connectproto v0.6.0 // indirect - go.etcd.io/etcd/raft/v3 v3.5.18 // indirect + go.etcd.io/etcd/raft/v3 v3.5.21 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect diff --git a/runtime/go.sum b/runtime/go.sum index a817327a2..ebadde82b 100644 --- a/runtime/go.sum +++ b/runtime/go.sum @@ -56,8 +56,8 @@ github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20220622145613-731d59e8 github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20220622145613-731d59e8b567/go.mod h1:/VV3EFO/hTNQZHAqaj+CPGy2+ioFrP4EX3iRwozubhQ= github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= -github.com/IBM/sarama v1.45.0 h1:IzeBevTn809IJ/dhNKhP5mpxEXTmELuezO2tgHD9G5E= -github.com/IBM/sarama v1.45.0/go.mod h1:EEay63m8EZkeumco9TDXf2JT3uDnZsZqFgV46n4yZdY= +github.com/IBM/sarama v1.45.1 h1:nY30XqYpqyXOXSNoe2XCgjj9jklGM1Ye94ierUb1jQ0= +github.com/IBM/sarama v1.45.1/go.mod h1:qifDhA3VWSrQ1TjSMyxDl3nYL3oX2C83u+G6L79sq4w= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8= @@ -231,6 +231,10 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/dolan-in/dgman/v2 v2.0.0 h1:kMhUo3dYr4SCWJe87cNbBctD5f2fC+pX/cLz52pVYAE= +github.com/dolan-in/dgman/v2 v2.0.0/go.mod h1:JI2D8vtkMBp9Ec8/wBPuJPPT89C+HQnQYb0biuXo4do= +github.com/dolan-in/reflectwalk v1.0.2-0.20210101124621-dc2073a29d71 h1:v3bErDrPApxsyBlz8/8nFTCb7Ai0wecA8TokfEHIQ80= +github.com/dolan-in/reflectwalk v1.0.2-0.20210101124621-dc2073a29d71/go.mod h1:Y9TyDkSL5jQ18ZnDaSxOdCUhbb5SCeamqYFQ7LYxxFs= github.com/dop251/goja v0.0.0-20211022113120-dc8c55024d06/go.mod h1:R9ET47fwRVRPZnOGvHxxhuZcbrMCuiqOz3Rlrh4KSnk= github.com/dop251/goja v0.0.0-20230906160731-9410bcaa81d2 h1:3J+RqSTu+JuyCYjoe82vvUUljEfgp8i6+nyhUsaYAbg= github.com/dop251/goja v0.0.0-20230906160731-9410bcaa81d2/go.mod h1:QMWlm50DNe14hD7t24KEqZuUdC9sOTy8W6XbCU1mlw4= @@ -356,8 +360,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= +github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= @@ -459,8 +463,8 @@ github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/memberlist v0.5.3 h1:tQ1jOCypD0WvMemw/ZhhtH+PWpzcftQvgCorLu0hndk= github.com/hashicorp/memberlist v0.5.3/go.mod h1:h60o12SZn/ua/j0B6iKAZezA4eDaGsIuPO70eOaJ6WE= -github.com/hashicorp/vault/api v1.15.0 h1:O24FYQCWwhwKnF7CuSqP30S51rTV7vz1iACXE/pj5DA= -github.com/hashicorp/vault/api v1.15.0/go.mod h1:+5YTO09JGn0u+b6ySD/LLVf8WkJCPLAL2Vkmrn2+CM8= +github.com/hashicorp/vault/api v1.16.0 h1:nbEYGJiAPGzT9U4oWgaaB0g+Rj8E59QuHKyA5LhwQN4= +github.com/hashicorp/vault/api v1.16.0/go.mod h1:KhuUhzOD8lDSk29AtzNjgAu2kxRA9jL9NAbkFlqvkBA= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/hypermodeinc/dgraph/v24 v24.1.2 h1:fRj+yH36D9Hv79X9fiY31B5ud9g28dbKlOngcEv0JXI= @@ -471,8 +475,8 @@ github.com/hypermodeinc/modus/lib/metadata v0.15.0 h1:Qu75TZg7l43Fi61EhnjasTHZvz github.com/hypermodeinc/modus/lib/metadata v0.15.0/go.mod h1:vnIwX2DpQyGk93DawGgIaqC5jEdvMeA9tGtvefbwTJw= github.com/hypermodeinc/modus/lib/wasmextractor v0.13.0 h1:9o8qqAllL9qIPYqc5adF+Aw3XWLmLqPiBPMu0AIyiMI= github.com/hypermodeinc/modus/lib/wasmextractor v0.13.0/go.mod h1:YCesMU95vF5qkscLMKSYr92OloLe1KGwyiqW2i4OmnE= -github.com/hypermodeinc/modusdb v0.0.0-20250416120035-6d80353c1351 h1:1jvPsAk8Qj5E8r6JxdYgFEPlOzFoOP/QG0s48xUQQUI= -github.com/hypermodeinc/modusdb v0.0.0-20250416120035-6d80353c1351/go.mod h1:5Jv6e2zP2vfL9Dh+/QHJyX3RZtG9jnoXrNvuXT3jJ2U= +github.com/hypermodeinc/modusgraph v0.0.0-20250515173656-5f22fe4d94b5 h1:aYRr8bXMGvj2arLZtZeGDBYyUkxhehRHRscuCFSLprM= +github.com/hypermodeinc/modusgraph v0.0.0-20250515173656-5f22fe4d94b5/go.mod h1:rpFxbK8u1/Oru8y8Dd8w7b0jzeslCh2WUNr507iccGE= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= @@ -565,8 +569,6 @@ github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381 h1:bqDmpDG49ZRn github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/logrusorgru/aurora/v3 v3.0.0 h1:R6zcoZZbvVcGMvDCKo45A9U/lzYyzl5NfYIvznmDfE4= github.com/logrusorgru/aurora/v3 v3.0.0/go.mod h1:vsR12bk5grlLvLXAYrBsb5Oc/N+LxAlxggSjiwMnCUc= -github.com/magiconair/properties v1.8.9 h1:nWcCbLq1N2v/cpNsy5WvQ37Fb+YElfq20WJ/a8RkpQM= -github.com/magiconair/properties v1.8.9/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/matryer/moq v0.0.0-20200106131100-75d0ddfc0007/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= @@ -731,8 +733,6 @@ github.com/ryanuber/go-glob v1.0.0 h1:iQh3xXAumdQ+4Ufa5b25cRpC5TYKlno6hsv6Cb3pkB github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/sagikazarmark/locafero v0.7.0 h1:5MqpDsTGNDhY8sGp0Aowyf0qKsPrhewaLSsFaodPcyo= github.com/sagikazarmark/locafero v0.7.0/go.mod h1:2za3Cg5rMaTMoG/2Ulr9AwtFaIppKXTRYnozin4aB5k= -github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= -github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 h1:PKK9DyHxif4LZo+uQSgXNqs0jj5+xZwwfKHgph2lxBw= github.com/santhosh-tekuri/jsonschema/v6 v6.0.1/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= @@ -769,8 +769,8 @@ github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo= github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0= github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= -github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= -github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= +github.com/spf13/viper v1.20.1 h1:ZMi+z/lvLyPSCoNtFCpqjy0S4kPbirhpTMwl8BkW9X4= +github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqjJvu4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -870,10 +870,10 @@ go.akshayshah.org/attest v1.0.0 h1:f66BDlh/xo2KjIfmtqOFlj5cpn6mvGrP1LXY3Tex4L0= go.akshayshah.org/attest v1.0.0/go.mod h1:PnWzcW5j9dkyGwTlBmUsYpPnHG0AUPrs1RQ+HrldWO0= go.akshayshah.org/connectproto v0.6.0 h1:tqmysQF2AfvUeYS03mRAAZTFpiQeXqhGIDnH1GO2D2U= go.akshayshah.org/connectproto v0.6.0/go.mod h1:uA9TR/6MhBlLn0fh8VXRyL26EKTJlimWao4jbz7JHbA= -go.etcd.io/etcd/client/pkg/v3 v3.5.18 h1:mZPOYw4h8rTk7TeJ5+3udUkfVGBqc+GCjOJYd68QgNM= -go.etcd.io/etcd/client/pkg/v3 v3.5.18/go.mod h1:BxVf2o5wXG9ZJV+/Cu7QNUiJYk4A29sAhoI5tIRsCu4= -go.etcd.io/etcd/raft/v3 v3.5.18 h1:gueCda+9U76Lvk6rINjNc/mXalUp0u8OK5CVESDZh4I= -go.etcd.io/etcd/raft/v3 v3.5.18/go.mod h1:XBaZHTJt3nLnpS8hMDR55Sxrq76cEC4xWYMBYSY3jcs= +go.etcd.io/etcd/client/pkg/v3 v3.5.21 h1:lPBu71Y7osQmzlflM9OfeIV2JlmpBjqBNlLtcoBqUTc= +go.etcd.io/etcd/client/pkg/v3 v3.5.21/go.mod h1:BgqT/IXPjK9NkeSDjbzwsHySX3yIle2+ndz28nVsjUs= +go.etcd.io/etcd/raft/v3 v3.5.21 h1:dOmE0mT55dIUsX77TKBLq+RgyumsQuYeiRQnW/ylugk= +go.etcd.io/etcd/raft/v3 v3.5.21/go.mod h1:fmcuY5R2SNkklU4+fKVBQi2biVp5vafMrWUEj4TJ4Cs= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= diff --git a/runtime/hostfunctions/agents.go b/runtime/hostfunctions/agents.go index 2054eb04a..e8b25b819 100644 --- a/runtime/hostfunctions/agents.go +++ b/runtime/hostfunctions/agents.go @@ -24,6 +24,12 @@ func init() { return fmt.Sprintf("Name: %s", agentName) })) + registerHostFunction(module_name, "terminateAgent", actors.TerminateAgent, + withErrorMessage("Error terminating agent."), + withMessageDetail(func(agentId string) string { + return fmt.Sprintf("AgentId: %s", agentId) + })) + registerHostFunction(module_name, "sendMessage", actors.SendAgentMessage, withErrorMessage("Error sending message to agent."), withMessageDetail(func(agentId string, msgName string, data *string, timeout int64) string { diff --git a/runtime/langsupport/utils.go b/runtime/langsupport/utils.go index 1a0a1e1f4..9d9553c10 100644 --- a/runtime/langsupport/utils.go +++ b/runtime/langsupport/utils.go @@ -9,7 +9,33 @@ package langsupport +import ( + "fmt" + "reflect" + + "github.com/hypermodeinc/modus/runtime/utils" +) + // AlignOffset returns the smallest y >= x such that y % a == 0. func AlignOffset(x, a uint32) uint32 { return (x + a - 1) &^ (a - 1) } + +// GetFieldObject retrieves the field object with the given name from either a map or a reflect.Value of a struct. +func GetFieldObject(fieldName string, mapObj map[string]any, rvObj reflect.Value) (any, error) { + if mapObj != nil { + // case sensitive when reading from map + if obj, ok := mapObj[fieldName]; !ok { + return nil, fmt.Errorf("field %s not found in map", fieldName) + } else { + return obj, nil + } + } else { + // case insensitive when reading from struct + if obj, err := utils.GetStructFieldValue(rvObj, fieldName, true); err != nil { + return nil, err + } else { + return obj, nil + } + } +} diff --git a/runtime/languages/assemblyscript/handler_classes.go b/runtime/languages/assemblyscript/handler_classes.go index 4f38eb35e..2b4049519 100644 --- a/runtime/languages/assemblyscript/handler_classes.go +++ b/runtime/languages/assemblyscript/handler_classes.go @@ -13,7 +13,6 @@ import ( "context" "fmt" "reflect" - "strings" "github.com/hypermodeinc/modus/lib/metadata" "github.com/hypermodeinc/modus/runtime/langsupport" @@ -91,13 +90,9 @@ func (h *classHandler) Write(ctx context.Context, wa langsupport.WasmAdapter, of fieldOffsets := h.typeInfo.ObjectFieldOffsets() for i, field := range h.typeDef.Fields { - var fieldObj any - if mapObj != nil { - // case sensitive when reading from map - fieldObj = mapObj[field.Name] - } else { - // case insensitive when reading from struct - fieldObj = rvObj.FieldByNameFunc(func(s string) bool { return strings.EqualFold(s, field.Name) }).Interface() + fieldObj, err := langsupport.GetFieldObject(field.Name, mapObj, rvObj) + if err != nil { + return nil, err } fieldOffset := offset + fieldOffsets[i] diff --git a/runtime/languages/golang/handler_structs.go b/runtime/languages/golang/handler_structs.go index 92af48516..f9ffb15b9 100644 --- a/runtime/languages/golang/handler_structs.go +++ b/runtime/languages/golang/handler_structs.go @@ -13,7 +13,6 @@ import ( "context" "fmt" "reflect" - "strings" "github.com/hypermodeinc/modus/lib/metadata" "github.com/hypermodeinc/modus/runtime/langsupport" @@ -106,13 +105,9 @@ func (h *structHandler) Write(ctx context.Context, wa langsupport.WasmAdapter, o cleaner := utils.NewCleanerN(numFields) for i, field := range h.typeDef.Fields { - var fieldObj any - if mapObj != nil { - // case sensitive when reading from map - fieldObj = mapObj[field.Name] - } else { - // case insensitive when reading from struct - fieldObj = rvObj.FieldByNameFunc(func(s string) bool { return strings.EqualFold(s, field.Name) }).Interface() + fieldObj, err := langsupport.GetFieldObject(field.Name, mapObj, rvObj) + if err != nil { + return nil, err } fieldOffset := offset + fieldOffsets[i] @@ -161,13 +156,9 @@ func (h *structHandler) Encode(ctx context.Context, wa langsupport.WasmAdapter, cleaner := utils.NewCleanerN(numFields) for i, field := range h.typeDef.Fields { - var fieldObj any - if mapObj != nil { - // case sensitive when reading from map - fieldObj = mapObj[field.Name] - } else { - // case insensitive when reading from struct - fieldObj = rvObj.FieldByNameFunc(func(s string) bool { return strings.EqualFold(s, field.Name) }).Interface() + fieldObj, err := langsupport.GetFieldObject(field.Name, mapObj, rvObj) + if err != nil { + return nil, cleaner, err } handler := h.fieldHandlers[i] diff --git a/runtime/utils/utils.go b/runtime/utils/utils.go index 5a842cb69..5a795873b 100644 --- a/runtime/utils/utils.go +++ b/runtime/utils/utils.go @@ -108,3 +108,38 @@ func CanBeNil(rt reflect.Type) bool { return false } } + +func GetStructFieldValue(rs reflect.Value, fieldName string, caseInsensitive bool) (any, error) { + if rs.Kind() != reflect.Struct { + return nil, fmt.Errorf("expected a struct, got %s", rs.Kind()) + } + + rsType := rs.Type() + + var field reflect.StructField + var found bool + if caseInsensitive { + field, found = rsType.FieldByNameFunc(func(s string) bool { return strings.EqualFold(s, fieldName) }) + } else { + field, found = rsType.FieldByName(fieldName) + } + + if !found { + return nil, fmt.Errorf("field %s not found in struct %s", fieldName, rs.Type().Name()) + } + + rf := rs.FieldByIndex(field.Index) + + // allow retrieving values of unexported fields + // see https://stackoverflow.com/a/43918797 + if !rf.CanInterface() { + if !rf.CanAddr() { + rs2 := reflect.New(rsType.Elem()) + rs2.Set(rs) + rf = rs2.FieldByIndex(field.Index) + } + rf = reflect.NewAt(rf.Type(), unsafe.Pointer(rf.UnsafeAddr())).Elem() + } + + return rf.Interface(), nil +} diff --git a/sdk/assemblyscript/examples/agents/assembly/counterAgent.ts b/sdk/assemblyscript/examples/agents/assembly/counterAgent.ts index a7bfbcc47..0cb2d4469 100644 --- a/sdk/assemblyscript/examples/agents/assembly/counterAgent.ts +++ b/sdk/assemblyscript/examples/agents/assembly/counterAgent.ts @@ -46,29 +46,52 @@ export class CounterAgent extends Agent { this.count = i32.parse(data); } - // When the agent is started, this method is automatically called. - // It is optional, but can be used to initialize state, retrieve data, etc. + // When the agent is first started, this method is automatically called. Implementing it is optional. + // If you don't need to do anything special when the agent starts, then you can omit it. + // It can be used to initialize state, retrieve data, etc. // This is a good place to set up any listeners or subscriptions. onStart(): void { console.info("Counter agent started"); } - // When the agent is stopped, this method is automatically called. - // It is optional, but can be used to clean up any resources, send final data, etc. - // This is a good place to unsubscribe from any listeners or subscriptions. - onStop(): void { - console.info("Counter agent stopped"); + // When the agent is suspended, this method is automatically called. Implementing it is optional. + // If you don't need to do anything special when the agent is suspended, then you can omit it. + // The agent may be suspended for a variety of reasons, including: + // - The agent code has being updated. + // - The host is shutting down or restarting. + // - The agent is being suspended to save resources. + // - The agent is being relocated to a different host. + // Note that the agent may be suspended and resumed multiple times during its lifetime, + // but the Modus Runtime will automatically save and restore the state of the agent, + // so you don't need to worry about that here. + onSuspend(): void { + console.info("Counter agent suspended"); } - // If the agent is reloaded, this method is automatically called. - // It is optional, but can be used to keep track of the agent's status. - onReload(): void { - console.info("Counter agent reloaded"); + // When the agent is restored, this method is automatically called. Implementing it is optional. + // If you don't need to do anything special when the agent is restored, then you can omit it. + onRestore(): void { + console.info("Counter agent restored"); + } + + // When the agent is terminated, this method is automatically called. Implementing it is optional. + // It can be used to send final data somewhere, such as a database or an API. + // This is a good place to unsubscribe from any listeners or subscriptions. + // Note that resources are automatically cleaned up when the agent is terminated, + // so you don't need to worry about that here. + // Once an agent is terminated, it cannot be restored. + onTerminate(): void { + console.info("Counter agent terminated"); } // This method is called when the agent receives a message. // This is how agents update their state and share data. + // You should implement this method to handle messages sent to this agent. onReceiveMessage(name: string, data: string | null): string | null { + // You can use the name of the message to determine what to do. + // You can either handle the message here, or pass it to another method or function. + // If you don't have any response to send back, return null. + // A "count" message just returns the current count. if (name == "count") { return this.count.toString(); diff --git a/sdk/assemblyscript/examples/agents/assembly/index.ts b/sdk/assemblyscript/examples/agents/assembly/index.ts index 39f3480b8..a4abbba7b 100644 --- a/sdk/assemblyscript/examples/agents/assembly/index.ts +++ b/sdk/assemblyscript/examples/agents/assembly/index.ts @@ -25,6 +25,15 @@ export function startCounterAgent(): AgentInfo { return agents.start("Counter"); } +/** + * Terminates the specified agent by ID. + * Once terminated, the agent cannot be restored or restarted. + * However, a new agent with the same name can be started at any time. + */ +export function terminateAgent(agentId: string): void { + agents.terminate(agentId); +} + /** * Returns the current count of the specified agent. */ diff --git a/sdk/assemblyscript/src/assembly/agent.ts b/sdk/assemblyscript/src/assembly/agent.ts index c67b14b63..f525f226a 100644 --- a/sdk/assemblyscript/src/assembly/agent.ts +++ b/sdk/assemblyscript/src/assembly/agent.ts @@ -48,23 +48,35 @@ export abstract class Agent { abstract setState(data: string | null): void; /** - * Called when the agent is started. + * Called when the agent is first started. * Override this method to perform any initialization. */ onStart(): void {} /** - * Called when the agent is stopped. - * Override this method to perform any cleanup. + * Called when the agent is suspended. + * Override this method if you want to do anything special when the agent is suspended, + * such as sending a notification. + * Note that you do not need to save the internal state of the agent here, + * as that is handled automatically. */ - onStop(): void {} + onSuspend(): void {} /** - * Called when the agent is reloaded. - * Override this method if you want to perform any actions when the agent is reloaded. - * Reloading is done when the agent is updated, or when the agent is resumed after being suspended. + * Called when the agent is restored. + * Override this method if you want to do anything special when the agent is restored, + * such as sending a notification. + * Note that you do not need to restore the internal state of the agent here, + * as that is handled automatically. */ - onReload(): void {} + onRestore(): void {} + + /** + * Called when the agent is terminated. + * Override this method to send or save any final data. + * Note that once an agent is terminated, it cannot be restored. + */ + onTerminate(): void {} /** * Called when the agent receives a message. @@ -89,6 +101,10 @@ export function registerAgent(): void { @external("modus_agents", "spawnAgentActor") declare function hostSpawnAgentActor(agentName: string): AgentInfo; +// @ts-expect-error: decorator +@external("modus_agents", "terminateAgent") +declare function hostTerminateAgent(agentId: string): bool; + /** * Starts an agent with the given name. * This can be called from any user code, such as function or another agent's methods. @@ -101,6 +117,20 @@ export function startAgent(name: string): AgentInfo { return hostSpawnAgentActor(name); } +/** + * Terminates an agent with the given ID. + * Once terminated, the agent cannot be restored. + */ +export function terminateAgent(agentId: string): void { + if (agentId == "") { + throw new Error("Agent ID cannot be empty."); + } + const ok = hostTerminateAgent(agentId); + if (!ok) { + throw new Error(`Failed to terminate agent ${agentId}.`); + } +} + /** * The Modus Runtime will call this function to activate an agent. * It is not intended to be called from user code. @@ -118,7 +148,7 @@ export function activateAgent(name: string, id: string, reloading: bool): void { activeAgentId = id; if (reloading) { - activeAgent!.onReload(); + activeAgent!.onRestore(); } else { activeAgent!.onStart(); } @@ -128,14 +158,16 @@ export function activateAgent(name: string, id: string, reloading: bool): void { * The Modus Runtime will call this function to shutdown an agent. * It is not intended to be called from user code. */ -export function shutdownAgent(): void { +export function shutdownAgent(suspending: bool): void { if (!activeAgent) { throw new Error("No active agent to shut down."); } - activeAgent!.onStop(); - activeAgent = null; - activeAgentId = null; + if (suspending) { + activeAgent!.onSuspend(); + } else { + activeAgent!.onTerminate(); + } } /** @@ -193,11 +225,7 @@ export class AgentInfo { */ readonly status: AgentStatus; - constructor( - id: string, - name: string, - status: AgentStatus = AgentStatus.Uninitialized, - ) { + constructor(id: string, name: string, status: AgentStatus = "") { this.id = id; this.name = name; this.status = status; diff --git a/sdk/assemblyscript/src/assembly/agents.ts b/sdk/assemblyscript/src/assembly/agents.ts index 311a2c515..5cec5c125 100644 --- a/sdk/assemblyscript/src/assembly/agents.ts +++ b/sdk/assemblyscript/src/assembly/agents.ts @@ -9,7 +9,11 @@ import { Duration } from "./enums"; -export { registerAgent as register, startAgent as start } from "./agent"; +export { + registerAgent as register, + startAgent as start, + terminateAgent as terminate, +} from "./agent"; // @ts-expect-error: decorator @external("modus_agents", "sendMessage") diff --git a/sdk/assemblyscript/src/assembly/enums.ts b/sdk/assemblyscript/src/assembly/enums.ts index 38fc11dcb..55f15106d 100644 --- a/sdk/assemblyscript/src/assembly/enums.ts +++ b/sdk/assemblyscript/src/assembly/enums.ts @@ -17,17 +17,14 @@ export enum Duration { hour = 60 * Duration.minute, } -// TODO: validate status values - // eslint-disable-next-line @typescript-eslint/no-namespace export namespace AgentStatus { - export const Uninitialized = "uninitialized"; - export const Error = "error"; export const Starting = "starting"; - export const Started = "started"; - export const Stopping = "stopping"; - export const Stopped = "stopped"; + export const Running = "running"; + export const Suspending = "suspending"; export const Suspended = "suspended"; + export const Restoring = "restoring"; + export const Terminating = "terminating"; export const Terminated = "terminated"; } export type AgentStatus = string; diff --git a/sdk/go/examples/agents/counterAgent.go b/sdk/go/examples/agents/counterAgent.go index b9d5acb98..bc3133526 100644 --- a/sdk/go/examples/agents/counterAgent.go +++ b/sdk/go/examples/agents/counterAgent.go @@ -61,26 +61,45 @@ func (c *CounterAgent) SetState(data *string) { } } -// When the agent is started, this method is automatically called. -// It is optional, but can be used to initialize state, retrieve data, etc. +// When the agent is first started, this method is automatically called. Implementing it is optional. +// If you don't need to do anything special when the agent starts, then you can omit it. +// It can be used to initialize state, retrieve data, etc. // This is a good place to set up any listeners or subscriptions. func (c *CounterAgent) OnStart() error { fmt.Println("Counter agent started") return nil } -// When the agent is stopped, this method is automatically called. -// It is optional, but can be used to clean up any resources, send final data, etc. -// This is a good place to unsubscribe from any listeners or subscriptions. -func (c *CounterAgent) OnStop() error { - fmt.Println("Counter agent stopped") +// When the agent is suspended, this method is automatically called. Implementing it is optional. +// If you don't need to do anything special when the agent is suspended, then you can omit it. +// The agent may be suspended for a variety of reasons, including: +// - The agent code has being updated. +// - The host is shutting down or restarting. +// - The agent is being suspended to save resources. +// - The agent is being relocated to a different host. +// Note that the agent may be suspended and resumed multiple times during its lifetime, +// but the Modus Runtime will automatically save and restore the state of the agent, +// so you don't need to worry about that here. +func (c *CounterAgent) OnSuspend() error { + fmt.Println("Counter agent suspended") return nil } -// If the agent is reloaded, this method is automatically called. -// It is optional, but can be used to keep track of the agent's status. -func (c *CounterAgent) OnReload() error { - fmt.Println("Counter agent reloaded") +// When the agent is restored, this method is automatically called. Implementing it is optional. +// If you don't need to do anything special when the agent is restored, then you can omit it. +func (c *CounterAgent) OnRestore() error { + fmt.Println("Counter agent restored") + return nil +} + +// When the agent is terminated, this method is automatically called. Implementing it is optional. +// It can be used to send final data somewhere, such as a database or an API. +// This is a good place to unsubscribe from any listeners or subscriptions. +// Note that resources are automatically cleaned up when the agent is terminated, +// so you don't need to worry about that here. +// Once an agent is terminated, it cannot be restored. +func (c *CounterAgent) OnTerminate() error { + fmt.Println("Counter agent terminated") return nil } diff --git a/sdk/go/examples/agents/main.go b/sdk/go/examples/agents/main.go index 7cf336d36..6771a9c4b 100644 --- a/sdk/go/examples/agents/main.go +++ b/sdk/go/examples/agents/main.go @@ -30,6 +30,13 @@ func StartCounterAgent() (agents.AgentInfo, error) { return agents.Start("Counter") } +// Terminates the specified agent by ID. +// Once terminated, the agent cannot be restored or restarted. +// However, a new agent with the same name can be started at any time. +func TerminateAgent(agentId string) error { + return agents.Terminate(agentId) +} + // Returns the current count of the specified agent. func GetCount(agentId string) (int, error) { count, err := agents.SendMessage(agentId, "count") diff --git a/sdk/go/pkg/agents/agents.go b/sdk/go/pkg/agents/agents.go index e2ad1829f..1b46e8920 100644 --- a/sdk/go/pkg/agents/agents.go +++ b/sdk/go/pkg/agents/agents.go @@ -25,16 +25,14 @@ type AgentInfo struct { type AgentStatus = string -// TODO: validate these statuses are needed and used correctly const ( - AgentStatusUninitialized AgentStatus = "uninitialized" - AgentStatusError AgentStatus = "error" - AgentStatusStarting AgentStatus = "starting" - AgentStatusStarted AgentStatus = "started" - AgentStatusStopping AgentStatus = "stopping" - AgentStatusStopped AgentStatus = "stopped" - AgentStatusSuspended AgentStatus = "suspended" - AgentStatusTerminated AgentStatus = "terminated" + AgentStatusStarting AgentStatus = "starting" + AgentStatusRunning AgentStatus = "running" + AgentStatusSuspending AgentStatus = "suspending" + AgentStatusSuspended AgentStatus = "suspended" + AgentStatusRestoring AgentStatus = "restoring" + AgentStatusTerminating AgentStatus = "terminating" + AgentStatusTerminated AgentStatus = "terminated" ) var agents = make(map[string]Agent) @@ -58,6 +56,15 @@ func Start(name string) (AgentInfo, error) { return *info, nil } +// Terminates an agent with the given ID. +// Once terminated, the agent cannot be restored. +func Terminate(agentId string) error { + if ok := hostTerminateAgent(&agentId); !ok { + return fmt.Errorf("failed to terminate agent %s", agentId) + } + return nil +} + // These functions are only invoked as wasm exports from the host. // Assigning them to discard variables avoids "unused" linting errors. var ( @@ -85,7 +92,7 @@ func activateAgent(name, id string, reloading bool) { activeAgentId = &id if reloading { - if err := agent.OnReload(); err != nil { + if err := agent.OnRestore(); err != nil { console.Errorf("Error reloading agent %s: %v", name, err) } } else { @@ -99,17 +106,22 @@ func activateAgent(name, id string, reloading bool) { // The Modus Runtime will call this function to shutdown an agent. // //go:export _modus_agent_shutdown -func shutdownAgent() { +func shutdownAgent(suspending bool) { if activeAgent == nil { console.Error("No active agent to shutdown.") return } - if err := (*activeAgent).OnStop(); err != nil { - console.Errorf("Error stopping agent %s: %v", (*activeAgent).Name(), err) + if suspending { + if err := (*activeAgent).OnSuspend(); err != nil { + console.Errorf("Error suspending agent %s: %v", (*activeAgent).Name(), err) + return + } } else { - activeAgent = nil - activeAgentId = nil + if err := (*activeAgent).OnTerminate(); err != nil { + console.Errorf("Error terminating agent %s: %v", (*activeAgent).Name(), err) + return + } } } @@ -177,14 +189,19 @@ type Agent interface { // Custom agents may implement this method to perform any initialization. OnStart() error - // OnStop is called when the agent is stopped. - // Custom agents may implement this method to perform any cleanup. - OnStop() error + // OnSuspend is called when the agent is suspended. + // Custom agents may implement this method if for example, to send a notification of the suspension. + // Note that you do not need to save the internal state of the agent here, as that is handled automatically. + OnSuspend() error + + // OnRestore is called when the agent is restored from a suspended state. + // Custom agents may implement this method if for example, to send a notification of the restoration. + // Note that you do not need to restore the internal state of the agent here, as that is handled automatically. + OnRestore() error - // OnReload is called when the agent is reloaded. - // Custom agents may implement this method to perform any actions when the agent is reloaded. - // Reloading is done when the agent is updated, or when the agent is resumed after being suspended. - OnReload() error + // OnTerminate is called when the agent is terminated. + // Custom agents may implement this method to send or save any final data. + OnTerminate() error // OnReceiveMessage is called when the agent receives a message. // Custom agents may implement this method to handle incoming messages. diff --git a/sdk/go/pkg/agents/imports_mock.go b/sdk/go/pkg/agents/imports_mock.go index 184058649..4ebad8ba6 100644 --- a/sdk/go/pkg/agents/imports_mock.go +++ b/sdk/go/pkg/agents/imports_mock.go @@ -17,6 +17,7 @@ import ( var SpawnAgentActorCallStack = testutils.NewCallStack() var SendMessageCallStack = testutils.NewCallStack() +var TerminateAgentCallStack = testutils.NewCallStack() func hostSpawnAgentActor(agentName *string) *AgentInfo { SpawnAgentActorCallStack.Push(agentName) @@ -39,3 +40,9 @@ func hostSendMessage(agentId, msgName, data *string, timeout int64) *MessageResp return nil } + +func hostTerminateAgent(agentId *string) bool { + TerminateAgentCallStack.Push(agentId) + + return agentId != nil && *agentId == "abc123" +} diff --git a/sdk/go/pkg/agents/imports_wasi.go b/sdk/go/pkg/agents/imports_wasi.go index f0a124a5c..f008dcb1f 100644 --- a/sdk/go/pkg/agents/imports_wasi.go +++ b/sdk/go/pkg/agents/imports_wasi.go @@ -40,3 +40,7 @@ func hostSendMessage(agentId, msgName, data *string, timeout int64) *MessageResp } return (*MessageResponse)(response) } + +//go:noescape +//go:wasmimport modus_agents terminateAgent +func hostTerminateAgent(agentId *string) bool