diff --git a/CHANGELOG.md b/CHANGELOG.md index e043cadc4..3dfe76659 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ # Change Log +## 2025-06-22 - Runtime 0.18.0-alpha.10 + +- fix: more cluster resiliency improvements [#901](https://github.com/hypermodeinc/modus/pull/901) + ## 2025-06-21 - Runtime 0.18.0-alpha.9 - fix: improve cluster resiliency [#900](https://github.com/hypermodeinc/modus/pull/900) diff --git a/runtime/actors/actorlogger.go b/runtime/actors/actorlogger.go index d940890d6..38f7a152a 100644 --- a/runtime/actors/actorlogger.go +++ b/runtime/actors/actorlogger.go @@ -13,8 +13,6 @@ import ( "fmt" "io" "log" - "slices" - "strings" "github.com/hypermodeinc/modus/runtime/logger" "github.com/hypermodeinc/modus/runtime/utils" @@ -22,12 +20,6 @@ import ( actorLog "github.com/tochemey/goakt/v3/log" ) -// some messages are ignored during shutdown because they are expected -var shutdownIgnoredMessages = []string{ - "Failed to acquire semaphore: context canceled", - " is down. modus is going to shutdown.", -} - func newActorLogger(logger *zerolog.Logger) *actorLogger { var minLevel zerolog.Level @@ -43,28 +35,10 @@ func newActorLogger(logger *zerolog.Logger) *actorLogger { } type actorLogger struct { - logger *zerolog.Logger - paused bool - shuttingDown bool -} - -func (al *actorLogger) Pause() { - al.paused = true -} - -func (al *actorLogger) Resume() { - al.paused = false + logger *zerolog.Logger } func (al *actorLogger) writeToLog(level zerolog.Level, msg string) { - if al.paused { - return - } - if al.shuttingDown && slices.ContainsFunc(shutdownIgnoredMessages, func(s string) bool { - return strings.Contains(msg, s) - }) { - return - } al.logger.WithLevel(level).Msg(msg) } diff --git a/runtime/actors/actorsystem.go b/runtime/actors/actorsystem.go index 10f1d82e9..cb5fe46cf 100644 --- a/runtime/actors/actorsystem.go +++ b/runtime/actors/actorsystem.go @@ -19,6 +19,7 @@ import ( "github.com/hypermodeinc/modus/runtime/messages" "github.com/hypermodeinc/modus/runtime/pluginmanager" "github.com/hypermodeinc/modus/runtime/plugins" + "github.com/hypermodeinc/modus/runtime/utils" "github.com/hypermodeinc/modus/runtime/wasmhost" goakt "github.com/tochemey/goakt/v3/actor" @@ -27,6 +28,8 @@ import ( var _actorSystem goakt.ActorSystem func Initialize(ctx context.Context) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() wasmExt := &wasmExtension{ host: wasmhost.GetWasmHost(ctx), @@ -42,24 +45,47 @@ func Initialize(ctx context.Context) { } opts = append(opts, clusterOptions(ctx)...) - if actorSystem, err := goakt.NewActorSystem("modus", opts...); err != nil { + actorSystem, err := goakt.NewActorSystem("modus", opts...) + if err != nil { logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.") - } else if err := actorSystem.Start(ctx); err != nil { + } + + if err := startActorSystem(ctx, actorSystem); err != nil { logger.Fatal(ctx).Err(err).Msg("Failed to start actor system.") - } else if err := actorSystem.Inject(&wasmAgentInfo{}); err != nil { + } + + if err := actorSystem.Inject(&wasmAgentInfo{}); err != nil { logger.Fatal(ctx).Err(err).Msg("Failed to inject wasm agent info into actor system.") - } else { - _actorSystem = actorSystem } - waitForClusterSync() + _actorSystem = actorSystem logger.Info(ctx).Msg("Actor system started.") pluginmanager.RegisterPluginLoadedCallback(loadAgentActors) } +func startActorSystem(ctx context.Context, actorSystem goakt.ActorSystem) error { + maxRetries := getIntFromEnv("MODUS_ACTOR_SYSTEM_START_MAX_RETRIES", 5) + retryInterval := getDurationFromEnv("MODUS_ACTOR_SYSTEM_START_RETRY_INTERVAL_SECONDS", 2, time.Second) + + for i := range maxRetries { + if err := actorSystem.Start(ctx); err != nil { + logger.Warn(ctx).Err(err).Int("attempt", i+1).Msgf("Failed to start actor system, retrying in %s...", retryInterval) + time.Sleep(retryInterval) + retryInterval *= 2 // Exponential backoff + continue + } + return nil + } + + return fmt.Errorf("failed to start actor system after %d retries", maxRetries) +} + func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + // restart local actors that are already running, which will reload the plugin actors := _actorSystem.Actors() localAgents := make(map[string]bool, len(actors)) @@ -72,36 +98,44 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { } } - // spawn actors for agents with state in the database, that are not already running - // check both locally and on remote nodes in the cluster - logger.Debug(ctx).Msg("Restoring agent actors from database.") - agents, err := db.QueryActiveAgents(ctx) - if err != nil { - return fmt.Errorf("failed to query active agents: %w", err) - } - inCluster := _actorSystem.InCluster() - for _, agent := range agents { - if !localAgents[agent.Id] { - if inCluster { - actorName := getActorName(agent.Id) - if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil { - logger.Err(ctx, err).Msgf("Failed to check if actor %s exists in cluster.", actorName) - } else if exists { - // if the actor already exists in the cluster, skip spawning it - continue + // do this next part in a goroutine to avoid blocking the cluster engine startup + go func() { + waitForClusterSync() + + // spawn actors for agents with state in the database, that are not already running + // check both locally and on remote nodes in the cluster + logger.Debug(ctx).Msg("Restoring agent actors from database.") + agents, err := db.QueryActiveAgents(ctx) + if err != nil { + logger.Err(ctx, err).Msg("Failed to query active agents from database.") + return + } + inCluster := _actorSystem.InCluster() + for _, agent := range agents { + if !localAgents[agent.Id] { + if inCluster { + actorName := getActorName(agent.Id) + if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil { + logger.Err(ctx, err).Msgf("Failed to check if actor %s exists in cluster.", actorName) + } else if exists { + // if the actor already exists in the cluster, skip spawning it + continue + } + } + if err := spawnActorForAgent(ctx, plugin.Name(), agent.Id, agent.Name, false); err != nil { + logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id) } - } - if err := spawnActorForAgent(ctx, plugin.Name(), agent.Id, agent.Name, false); err != nil { - logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id) } } - } + }() return nil } func beforeShutdown(ctx context.Context) error { - _actorSystem.Logger().(*actorLogger).shuttingDown = true + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + logger.Info(ctx).Msg("Actor system shutting down...") actors := _actorSystem.Actors() @@ -127,7 +161,7 @@ func beforeShutdown(ctx context.Context) error { } } - waitForClusterSync() + // waitForClusterSync() // then allow the actor system to continue with its shutdown process return nil @@ -135,11 +169,14 @@ func beforeShutdown(ctx context.Context) error { func waitForClusterSync() { if clusterEnabled() { - time.Sleep(peerSyncInterval() * 2) + time.Sleep(nodesSyncInterval()) } } func Shutdown(ctx context.Context) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + if _actorSystem == nil { logger.Fatal(ctx).Msg("Actor system is not initialized, cannot shutdown.") } diff --git a/runtime/actors/agents.go b/runtime/actors/agents.go index 5461b1477..b92b8fa06 100644 --- a/runtime/actors/agents.go +++ b/runtime/actors/agents.go @@ -63,6 +63,9 @@ const ( ) func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + plugin, ok := plugins.GetPluginFromContext(ctx) if !ok { return nil, fmt.Errorf("no plugin found in context") @@ -77,6 +80,8 @@ func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) { } func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName string, initializing bool) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() ctx = context.WithoutCancel(ctx) ctx = context.WithValue(ctx, utils.AgentIdContextKey, agentId) @@ -99,6 +104,9 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri } func StopAgent(ctx context.Context, agentId string) (*AgentInfo, error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + actorName := getActorName(agentId) if err := tell(ctx, actorName, &messages.ShutdownAgent{}); err != nil { if !errors.Is(err, goakt.ErrActorNotFound) { @@ -122,6 +130,9 @@ func StopAgent(ctx context.Context, agentId string) (*AgentInfo, error) { } func getAgentInfoFromDatabase(ctx context.Context, agentId string) (*AgentInfo, error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + if agent, e := db.GetAgentState(ctx, agentId); e == nil { return &AgentInfo{ Id: agent.Id, @@ -133,6 +144,9 @@ func getAgentInfoFromDatabase(ctx context.Context, agentId string) (*AgentInfo, } func GetAgentInfo(ctx context.Context, agentId string) (*AgentInfo, error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + actorName := getActorName(agentId) request := &messages.AgentInfoRequest{} @@ -170,6 +184,9 @@ func newAgentMessageErrorResponse(errMsg string) *agentMessageResponse { } func SendAgentMessage(ctx context.Context, agentId string, msgName string, data *string, timeout int64) (*agentMessageResponse, error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + actorName := getActorName(agentId) msg := &messages.AgentRequest{ @@ -202,6 +219,8 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data } func PublishAgentEvent(ctx context.Context, agentId, eventName string, eventData *string) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() var data any if eventData != nil { @@ -264,6 +283,9 @@ func getAgentTopic(agentId string) string { } func ListActiveAgents(ctx context.Context) ([]AgentInfo, error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + agents, err := db.QueryActiveAgents(ctx) if err != nil { return nil, fmt.Errorf("error listing active agents: %w", err) @@ -281,11 +303,14 @@ func ListActiveAgents(ctx context.Context) ([]AgentInfo, error) { return results, nil } -func ListLocalAgents() []AgentInfo { +func ListLocalAgents(ctx context.Context) []AgentInfo { if _actorSystem == nil { return nil } + span, _ := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + actors := _actorSystem.Actors() results := make([]AgentInfo, 0, len(actors)) diff --git a/runtime/actors/cluster.go b/runtime/actors/cluster.go index b831fdf4e..568920dbb 100644 --- a/runtime/actors/cluster.go +++ b/runtime/actors/cluster.go @@ -11,6 +11,8 @@ package actors import ( "context" + "errors" + "fmt" "net/url" "os" "strings" @@ -18,6 +20,7 @@ import ( "github.com/hypermodeinc/modus/runtime/app" "github.com/hypermodeinc/modus/runtime/logger" + "github.com/hypermodeinc/modus/runtime/utils" goakt "github.com/tochemey/goakt/v3/actor" "github.com/tochemey/goakt/v3/discovery" @@ -28,6 +31,8 @@ import ( ) func clusterOptions(ctx context.Context) []goakt.Option { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() clusterMode := clusterMode() if clusterMode == clusterModeNone { @@ -45,45 +50,9 @@ func clusterOptions(ctx context.Context) []goakt.Option { Int("peers_port", peersPort). Msg("Clustering enabled.") - var disco discovery.Provider - switch clusterMode { - case clusterModeNats: - natsUrl := clusterNatsUrl() - clusterHost := clusterHost() - logger.Info(ctx). - Str("cluster_host", clusterHost). - Str("nats_server", natsUrl). - Msg("Using NATS for node discovery.") - - disco = nats.NewDiscovery(&nats.Config{ - NatsSubject: "modus-gossip", - NatsServer: natsUrl, - Host: clusterHost, - DiscoveryPort: discoveryPort, - }) - - case clusterModeKubernetes: - namespace, ok := app.KubernetesNamespace() - if !ok { - logger.Fatal(ctx). - Msg("Kubernetes cluster mode enabled, but a Kubernetes namespace was not found. Ensure running in a Kubernetes environment.") - return nil - } - - logger.Info(ctx). - Str("namespace", namespace). - Msg("Using Kubernetes for node discovery.") - - disco = kubernetes.NewDiscovery(&kubernetes.Config{ - Namespace: namespace, - PodLabels: getPodLabels(), - DiscoveryPortName: "discovery-port", - RemotingPortName: "remoting-port", - PeersPortName: "peers-port", - }) - - default: - panic("Unsupported cluster mode: " + clusterMode.String()) + disco, err := newDiscoveryProvider(ctx, clusterMode, discoveryPort) + if err != nil { + logger.Fatal(ctx).Err(err).Msg("Failed to create cluster discovery provider.") } var remotingHost string @@ -95,11 +64,10 @@ func clusterOptions(ctx context.Context) []goakt.Option { remotingHost = "0.0.0.0" } - readTimeout := time.Duration(getIntFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 2)) * time.Second - writeTimeout := time.Duration(getIntFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 2)) * time.Second + readTimeout := getDurationFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 2, time.Second) + writeTimeout := getDurationFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 2, time.Second) return []goakt.Option{ - goakt.WithPeerStateLoopInterval(peerSyncInterval()), goakt.WithRemote(remote.NewConfig(remotingHost, remotingPort)), goakt.WithCluster(goakt.NewClusterConfig(). WithDiscovery(disco). @@ -107,6 +75,9 @@ func clusterOptions(ctx context.Context) []goakt.Option { WithPeersPort(peersPort). WithReadTimeout(readTimeout). WithWriteTimeout(writeTimeout). + // WithPartitionCount(3). + WithClusterStateSyncInterval(nodesSyncInterval()). + WithPeersStateSyncInterval(peerSyncInterval()). WithKinds(&wasmAgentActor{}, &subscriptionActor{}), ), } @@ -202,9 +173,17 @@ func clusterPorts() (discoveryPort, remotingPort, peersPort int) { return } +// peerSyncInterval returns the interval at which the cluster peers sync their list of actors across the cluster. +// We use a tight sync interval of 1 second by default, to ensure quick peer discovery as agents are added or removed. func peerSyncInterval() time.Duration { - // we use a tight sync interval by default, to ensure quick peer discovery - return time.Duration(getIntFromEnv("MODUS_CLUSTER_PEER_SYNC_MS", 500)) * time.Millisecond + return getDurationFromEnv("MODUS_CLUSTER_PEER_SYNC_SECONDS", 1, time.Second) +} + +// nodesSyncInterval returns the interval at which the cluster syncs the list of active nodes across the cluster. +// On each interval, discovery will be triggered to find new nodes and update the cluster state. +// The default is 10 seconds, which is a reasonable balance between responsiveness and network overhead. +func nodesSyncInterval() time.Duration { + return getDurationFromEnv("MODUS_CLUSTER_NODES_SYNC_SECONDS", 10, time.Second) } func getPodLabels() map[string]string { @@ -228,3 +207,104 @@ func getPodLabels() map[string]string { "app.kubernetes.io/component": "runtime", } } + +func newDiscoveryProvider(ctx context.Context, clusterMode goaktClusterMode, discoveryPort int) (discovery.Provider, error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + + switch clusterMode { + case clusterModeNats: + natsUrl := clusterNatsUrl() + clusterHost := clusterHost() + + logger.Info(ctx). + Str("cluster_host", clusterHost). + Str("nats_server", natsUrl). + Msg("Using NATS for node discovery.") + + disco := nats.NewDiscovery(&nats.Config{ + NatsSubject: "modus-gossip", + NatsServer: natsUrl, + Host: clusterHost, + DiscoveryPort: discoveryPort, + }) + + return wrapProvider(ctx, disco), nil + + case clusterModeKubernetes: + namespace, ok := app.KubernetesNamespace() + if !ok { + return nil, errors.New("Kubernetes cluster mode enabled, but namespace was not found") + } + + logger.Info(ctx). + Str("namespace", namespace). + Msg("Using Kubernetes for node discovery.") + + disco := kubernetes.NewDiscovery(&kubernetes.Config{ + Namespace: namespace, + PodLabels: getPodLabels(), + DiscoveryPortName: "discovery-port", + RemotingPortName: "remoting-port", + PeersPortName: "peers-port", + }) + + return wrapProvider(ctx, disco), nil + } + + return nil, fmt.Errorf("unsupported cluster mode: %s", clusterMode) +} + +// wrapProvider wraps a discovery provider to add Sentry tracing to its methods. +func wrapProvider(ctx context.Context, provider discovery.Provider) discovery.Provider { + if provider == nil { + return nil + } + + return &providerWrapper{ctx, provider} +} + +// providerWrapper is a wrapper around a discovery provider that adds Sentry tracing to its methods. +type providerWrapper struct { + ctx context.Context + provider discovery.Provider +} + +func (w *providerWrapper) Close() error { + span, _ := utils.NewSentrySpanForCurrentFunc(w.ctx) + defer span.Finish() + + return w.provider.Close() +} + +func (w *providerWrapper) Deregister() error { + span, _ := utils.NewSentrySpanForCurrentFunc(w.ctx) + defer span.Finish() + + return w.provider.Deregister() +} + +func (w *providerWrapper) DiscoverPeers() ([]string, error) { + span, _ := utils.NewSentrySpanForCurrentFunc(w.ctx) + defer span.Finish() + + return w.provider.DiscoverPeers() +} + +func (w *providerWrapper) ID() string { + return w.provider.ID() +} + +func (w *providerWrapper) Initialize() error { + span, _ := utils.NewSentrySpanForCurrentFunc(w.ctx) + defer span.Finish() + + return w.provider.Initialize() +} + +func (w *providerWrapper) Register() error { + span, _ := utils.NewSentrySpanForCurrentFunc(w.ctx) + defer span.Finish() + + return w.provider.Register() +} diff --git a/runtime/actors/misc.go b/runtime/actors/misc.go index e76506f29..74cf7bfe7 100644 --- a/runtime/actors/misc.go +++ b/runtime/actors/misc.go @@ -17,6 +17,7 @@ import ( "time" "github.com/hypermodeinc/modus/runtime/logger" + "github.com/hypermodeinc/modus/runtime/utils" goakt "github.com/tochemey/goakt/v3/actor" "google.golang.org/protobuf/proto" @@ -25,6 +26,9 @@ import ( // Sends a message to an actor identified by its name. // Uses either Tell or RemoteTell based on whether the actor is local or remote. func tell(ctx context.Context, actorName string, message proto.Message) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + addr, pid, err := _actorSystem.ActorOf(ctx, actorName) if err != nil { return err @@ -39,6 +43,9 @@ func tell(ctx context.Context, actorName string, message proto.Message) error { // Sends a message to an actor identified by its name, then waits for a response within the timeout duration. // Uses either Ask or RemoteAsk based on whether the actor is local or remote. func ask(ctx context.Context, actorName string, message proto.Message, timeout time.Duration) (response proto.Message, err error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + addr, pid, err := _actorSystem.ActorOf(ctx, actorName) if err != nil { return nil, err @@ -69,3 +76,14 @@ func getIntFromEnv(envVar string, defaultValue int) int { return value } + +// Retrieves a duration value from an environment variable. +func getDurationFromEnv(envVar string, defaultValue int, unit time.Duration) time.Duration { + intVal := getIntFromEnv(envVar, defaultValue) + if intVal <= 0 { + duration := time.Duration(defaultValue) * unit + logger.Warnf("Invalid value for %s. Using %s instead.", envVar, duration) + return duration + } + return time.Duration(intVal) * unit +} diff --git a/runtime/actors/subscriber.go b/runtime/actors/subscriber.go index 8164aa08b..f947dd280 100644 --- a/runtime/actors/subscriber.go +++ b/runtime/actors/subscriber.go @@ -30,6 +30,8 @@ type agentEvent struct { } func SubscribeForAgentEvents(ctx context.Context, agentId string, update func(data []byte), done func()) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() if a, err := GetAgentInfo(ctx, agentId); err != nil { return err @@ -106,6 +108,9 @@ func (a *subscriptionActor) PostStop(ac *goakt.Context) error { func (a *subscriptionActor) Receive(rc *goakt.ReceiveContext) { if msg, ok := rc.Message().(*messages.AgentEvent); ok { + span, _ := utils.NewSentrySpanForCurrentFunc(rc.Context()) + defer span.Finish() + event := &agentEvent{ Name: msg.Name, Data: msg.Data, diff --git a/runtime/actors/wasmagent.go b/runtime/actors/wasmagent.go index c70eaa7f5..c20908d8f 100644 --- a/runtime/actors/wasmagent.go +++ b/runtime/actors/wasmagent.go @@ -42,10 +42,13 @@ type wasmAgentActor struct { func (a *wasmAgentActor) PreStart(ac *goakt.Context) error { ctx := ac.Context() - a.buffers = utils.NewOutputBuffers() + + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() wasmExt := ac.Extension(wasmExtensionId).(*wasmExtension) a.host = wasmExt.host + a.buffers = utils.NewOutputBuffers() if id, ok := strings.CutPrefix(ac.ActorName(), "agent-"); ok { a.agentId = id @@ -68,6 +71,8 @@ func (a *wasmAgentActor) PreStart(ac *goakt.Context) error { func (a *wasmAgentActor) Receive(rc *goakt.ReceiveContext) { ctx := a.augmentContext(rc.Context(), rc.Self()) + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() switch msg := rc.Message().(type) { @@ -134,6 +139,8 @@ func (a *wasmAgentActor) Receive(rc *goakt.ReceiveContext) { func (a *wasmAgentActor) PostStop(ac *goakt.Context) error { ctx := ac.Context() + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() // suspend the agent if it's not already suspended or terminated if a.status != AgentStatusSuspended && a.status != AgentStatusTerminated { @@ -151,6 +158,9 @@ func (a *wasmAgentActor) PostStop(ac *goakt.Context) error { } func (a *wasmAgentActor) handleAgentRequest(ctx context.Context, rc *goakt.ReceiveContext, msg *messages.AgentRequest) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + if a.status != AgentStatusRunning { return fmt.Errorf("cannot process message because agent is %s", a.status) } @@ -210,6 +220,9 @@ func (a *wasmAgentActor) updateStatus(ctx context.Context, status AgentStatus) e return nil } + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + a.status = status if err := db.UpdateAgentStatus(ctx, a.agentId, string(status)); err != nil { @@ -225,6 +238,9 @@ func (a *wasmAgentActor) updateStatus(ctx context.Context, status AgentStatus) e } func (a *wasmAgentActor) saveState(ctx context.Context) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + var data string if a.module != nil { if d, err := a.getAgentState(ctx); err != nil { @@ -248,6 +264,9 @@ func (a *wasmAgentActor) saveState(ctx context.Context) error { } func (a *wasmAgentActor) restoreState(ctx context.Context) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + if a.module == nil { return fmt.Errorf("module is not initialized") } @@ -284,6 +303,8 @@ func (a *wasmAgentActor) augmentContext(ctx context.Context, pid *goakt.PID) con } func (a *wasmAgentActor) activateAgent(ctx context.Context) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() if plugin, found := pluginmanager.GetPluginByName(a.pluginName); found { a.plugin = plugin @@ -312,6 +333,9 @@ func (a *wasmAgentActor) activateAgent(ctx context.Context) error { } func (a *wasmAgentActor) deactivateAgent(ctx context.Context) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + if err := a.module.Close(ctx); err != nil { return err } @@ -320,6 +344,9 @@ func (a *wasmAgentActor) deactivateAgent(ctx context.Context) error { } func (a *wasmAgentActor) startAgent(ctx context.Context) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + logger.Info(ctx).Msg("Starting agent.") if err := a.saveState(ctx); err != nil { return err @@ -338,6 +365,9 @@ func (a *wasmAgentActor) startAgent(ctx context.Context) error { } func (a *wasmAgentActor) resumeAgent(ctx context.Context) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + logger.Info(ctx).Msg("Resuming agent.") if err := a.updateStatus(ctx, AgentStatusResuming); err != nil { return err @@ -356,6 +386,9 @@ func (a *wasmAgentActor) resumeAgent(ctx context.Context) error { } func (a *wasmAgentActor) suspendAgent(ctx context.Context) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + logger.Info(ctx).Msg("Suspending agent.") if err := a.updateStatus(ctx, AgentStatusSuspending); err != nil { return err @@ -374,6 +407,9 @@ func (a *wasmAgentActor) suspendAgent(ctx context.Context) error { } func (a *wasmAgentActor) stopAgent(ctx context.Context) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + logger.Info(ctx).Msg("Stopping agent.") if err := a.updateStatus(ctx, AgentStatusStopping); err != nil { return err @@ -392,6 +428,9 @@ func (a *wasmAgentActor) stopAgent(ctx context.Context) error { } func (a *wasmAgentActor) callEventHandler(ctx context.Context, action agentEventAction) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + fnInfo, err := a.host.GetFunctionInfo("_modus_agent_handle_event") if err != nil { return err @@ -406,6 +445,8 @@ func (a *wasmAgentActor) callEventHandler(ctx context.Context, action agentEvent } func (a *wasmAgentActor) getAgentState(ctx context.Context) (*string, error) { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() fnInfo, err := a.host.GetFunctionInfo("_modus_agent_get_state") if err != nil { @@ -433,6 +474,9 @@ func (a *wasmAgentActor) getAgentState(ctx context.Context) (*string, error) { } func (a *wasmAgentActor) setAgentState(ctx context.Context, data *string) error { + span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) + defer span.Finish() + fnInfo, err := a.host.GetFunctionInfo("_modus_agent_set_state") if err != nil { return err diff --git a/runtime/go.mod b/runtime/go.mod index b7e9e3fbc..4e1f84bb9 100644 --- a/runtime/go.mod +++ b/runtime/go.mod @@ -42,7 +42,7 @@ require ( github.com/tetratelabs/wazero v1.9.0 github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 - github.com/tochemey/goakt/v3 v3.6.4 + github.com/tochemey/goakt/v3 v3.6.5 github.com/travisjeffery/go-dynaport v1.0.0 github.com/twpayne/go-geom v1.6.1 github.com/wundergraph/graphql-go-tools/execution v1.3.2-0.20250618131920-dd0d9cc2a919 diff --git a/runtime/go.sum b/runtime/go.sum index a2be3e935..487023a93 100644 --- a/runtime/go.sum +++ b/runtime/go.sum @@ -860,16 +860,8 @@ github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDW github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po= github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0= -github.com/tochemey/goakt/v3 v3.6.3-0.20250618223233-7f41e6a9633c h1:LG2v6TCSPGqhbzSEfaDpcF8xi2q+qQLOycvTDczf0f4= -github.com/tochemey/goakt/v3 v3.6.3-0.20250618223233-7f41e6a9633c/go.mod h1:k/i5dX6WqSMDzbFi/JY5+KRiL7TZQ1by/DeeSGZwWig= -github.com/tochemey/goakt/v3 v3.6.3-0.20250619184937-bee36bdbb97f h1:urkE4+wMItr8SgZy0vcbUjTdQzcnn++7Fsmp8fE1AC8= -github.com/tochemey/goakt/v3 v3.6.3-0.20250619184937-bee36bdbb97f/go.mod h1:k/i5dX6WqSMDzbFi/JY5+KRiL7TZQ1by/DeeSGZwWig= -github.com/tochemey/goakt/v3 v3.6.3-0.20250619212444-09365547acb0 h1:jlX5bfpRPZmL/mC5kSziVfoz345PcY/gCV4ubUoo2FU= -github.com/tochemey/goakt/v3 v3.6.3-0.20250619212444-09365547acb0/go.mod h1:k/i5dX6WqSMDzbFi/JY5+KRiL7TZQ1by/DeeSGZwWig= -github.com/tochemey/goakt/v3 v3.6.3 h1:OuIf65TMKmrjU/Hmkpdlc3Criv2ImFrUbypJKkoVU2M= -github.com/tochemey/goakt/v3 v3.6.3/go.mod h1:k/i5dX6WqSMDzbFi/JY5+KRiL7TZQ1by/DeeSGZwWig= -github.com/tochemey/goakt/v3 v3.6.4 h1:VyFTP1Ng1It0YQp/rYbdUPAxYN/VJWAm3w2rJQlfQVc= -github.com/tochemey/goakt/v3 v3.6.4/go.mod h1:6HHDMI+BXFPt+Pq59svoGdxXTme/UsqX868FG1gICO4= +github.com/tochemey/goakt/v3 v3.6.5 h1:Iun9Tv6L4xLmof0OzY3HH5zUR03AowiZDCTxB+dVaSo= +github.com/tochemey/goakt/v3 v3.6.5/go.mod h1:6HHDMI+BXFPt+Pq59svoGdxXTme/UsqX868FG1gICO4= github.com/tochemey/olric v0.2.3 h1:LGmsHLQBSEs3uasZNLT5MdS2pBMNJ71gSrXnYfkb62M= github.com/tochemey/olric v0.2.3/go.mod h1:BAD82xys8R8IAWFV+GC0B8I+J4QsYZvmPS5NT/dhmtI= github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw= @@ -1355,16 +1347,13 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -k8s.io/api v0.33.1 h1:tA6Cf3bHnLIrUK4IqEgb2v++/GYUtqiu9sRVk3iBXyw= -k8s.io/api v0.33.1/go.mod h1:87esjTn9DRSRTD4fWMXamiXxJhpOIREjWOSjsW1kEHw= +k8s.io/api v0.33.2 h1:YgwIS5jKfA+BZg//OQhkJNIfie/kmRsO0BmNaVSimvY= k8s.io/api v0.33.2/go.mod h1:fhrbphQJSM2cXzCWgqU29xLDuks4mu7ti9vveEnpSXs= k8s.io/apiextensions-apiserver v0.33.0 h1:d2qpYL7Mngbsc1taA4IjJPRJ9ilnsXIrndH+r9IimOs= k8s.io/apiextensions-apiserver v0.33.0/go.mod h1:VeJ8u9dEEN+tbETo+lFkwaaZPg6uFKLGj5vyNEwwSzc= -k8s.io/apimachinery v0.33.1 h1:mzqXWV8tW9Rw4VeW9rEkqvnxj59k1ezDUl20tFK/oM4= -k8s.io/apimachinery v0.33.1/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM= +k8s.io/apimachinery v0.33.2 h1:IHFVhqg59mb8PJWTLi8m1mAoepkUNYmptHsV+Z1m5jY= k8s.io/apimachinery v0.33.2/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM= -k8s.io/client-go v0.33.1 h1:ZZV/Ks2g92cyxWkRRnfUDsnhNn28eFpt26aGc8KbXF4= -k8s.io/client-go v0.33.1/go.mod h1:JAsUrl1ArO7uRVFWfcj6kOomSlCv+JpvIsp6usAGefA= +k8s.io/client-go v0.33.2 h1:z8CIcc0P581x/J1ZYf4CNzRKxRvQAwoAolYPbtQes+E= k8s.io/client-go v0.33.2/go.mod h1:9mCgT4wROvL948w6f6ArJNb7yQd7QsvqavDeZHvNmHo= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= diff --git a/runtime/httpserver/health.go b/runtime/httpserver/health.go index a0a54ae76..046fcf5cf 100644 --- a/runtime/httpserver/health.go +++ b/runtime/httpserver/health.go @@ -21,7 +21,7 @@ import ( var healthHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { env := app.Config().Environment() ver := app.VersionNumber() - agents := actors.ListLocalAgents() + agents := actors.ListLocalAgents(r.Context()) w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK)