diff --git a/.gitignore b/.gitignore index 52030c1cc..f6bfe9eaa 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ # Ignore macOS system files .DS_Store +# Ignore VS Code workspace files +*.code-workspace + # Ignore environment variable files .env .env.* diff --git a/.trunk/configs/cspell.json b/.trunk/configs/cspell.json index 67854f80b..22706888a 100644 --- a/.trunk/configs/cspell.json +++ b/.trunk/configs/cspell.json @@ -48,6 +48,7 @@ "dynamicmap", "dynaport", "envfiles", + "Errf", "estree", "euclidian", "expfmt", diff --git a/.vscode/launch.json b/.vscode/launch.json index 672b5d8e0..1d700c767 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -18,6 +18,29 @@ "${workspaceFolder}/sdk/${input:appLanguage}/examples/${input:exampleApp}/build" ] }, + { + "name": "Debug Modus Runtime (cluster mode)", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}/runtime", + "env": { + "FORCE_COLOR": "1", + "MODUS_ENV": "dev", + "MODUS_CLUSTER_MODE": "NATS", + "MODUS_CLUSTER_NATS_URL": "nats://localhost:4222", + "MODUS_DEBUG_ACTORS": "true", + "MODUS_DEBUG": "true", + "MODUS_USE_MODUSDB": "false", + "MODUS_DB": "postgresql://postgres:postgres@localhost:5432/modus?sslmode=disable" // checkov:skip=CKV_SECRET_4 + }, + "args": [ + "--port=${input:httpServerPort}", + "--refresh=1s", + "--appPath", + "${workspaceFolder}/sdk/${input:appLanguage}/examples/${input:exampleApp}/build" + ] + }, { "name": "Debug Modus Runtime (input path)", "type": "go", @@ -225,6 +248,12 @@ "type": "promptString", "description": "Enter the S3 storage folder name", "default": "shared" + }, + { + "id": "httpServerPort", + "type": "promptString", + "description": "Enter the HTTP server port", + "default": "8686" } ] } diff --git a/CHANGELOG.md b/CHANGELOG.md index debf6df34..49d1ade19 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ## UNRELEASED +- feat: cluster mode [#895](https://github.com/hypermodeinc/modus/pull/895) - fix: default send message to have no timeout [#896](https://github.com/hypermodeinc/modus/pull/896) ## 2025-06-12 CLI 0.18.1 diff --git a/go.work b/go.work index 59f316ae5..fc50ec44a 100644 --- a/go.work +++ b/go.work @@ -1,4 +1,4 @@ -go 1.24.3 +go 1.24.4 use ( ./lib/manifest diff --git a/runtime/actors/actorsystem.go b/runtime/actors/actorsystem.go index b5bf6f6d3..df4f4d183 100644 --- a/runtime/actors/actorsystem.go +++ b/runtime/actors/actorsystem.go @@ -11,73 +11,41 @@ package actors import ( "context" - "fmt" - "os" - "strconv" "time" "github.com/hypermodeinc/modus/runtime/db" "github.com/hypermodeinc/modus/runtime/logger" + "github.com/hypermodeinc/modus/runtime/messages" "github.com/hypermodeinc/modus/runtime/pluginmanager" "github.com/hypermodeinc/modus/runtime/plugins" "github.com/hypermodeinc/modus/runtime/wasmhost" goakt "github.com/tochemey/goakt/v3/actor" - goakt_static "github.com/tochemey/goakt/v3/discovery/static" - goakt_remote "github.com/tochemey/goakt/v3/remote" - "github.com/travisjeffery/go-dynaport" ) var _actorSystem goakt.ActorSystem func Initialize(ctx context.Context) { + wasmExt := &wasmExtension{ + host: wasmhost.GetWasmHost(ctx), + } + opts := []goakt.Option{ goakt.WithLogger(newActorLogger(logger.Get(ctx))), - goakt.WithCoordinatedShutdown(beforeShutdown), goakt.WithPubSub(), goakt.WithActorInitTimeout(10 * time.Second), // TODO: adjust this value, or make it configurable goakt.WithActorInitMaxRetries(1), // TODO: adjust this value, or make it configurable - - // for now, keep passivation disabled so that agents can perform long-running tasks without the actor stopping. - // TODO: figure out how to deal with this better - goakt.WithPassivationDisabled(), - } - - // NOTE: we're not relying on cluster mode yet. The below code block is for future use and testing purposes only. - if clusterMode, _ := strconv.ParseBool(os.Getenv("MODUS_USE_CLUSTER_MODE")); clusterMode { - // TODO: static discovery should really only be used for local development and testing. - // In production, we should use a more robust discovery mechanism, such as Kubernetes or NATS. - // See https://tochemey.gitbook.io/goakt/features/service-discovery - - // We just get three random ports for now. - // In prod, these will need to be configured so they are consistent across all nodes. - ports := dynaport.Get(3) - var gossip_port = ports[0] - var peers_port = ports[1] - var remoting_port = ports[2] - - disco := goakt_static.NewDiscovery(&goakt_static.Config{ - Hosts: []string{ - fmt.Sprintf("localhost:%d", gossip_port), - }, - }) - - opts = append(opts, - goakt.WithRemote(goakt_remote.NewConfig("localhost", remoting_port)), - goakt.WithCluster(goakt.NewClusterConfig(). - WithDiscovery(disco). - WithDiscoveryPort(gossip_port). - WithPeersPort(peers_port). - WithKinds(&wasmAgentActor{}, &subscriptionActor{}), - ), - ) + goakt.WithExtensions(wasmExt), } + opts = append(opts, clusterOptions(ctx)...) if actorSystem, err := goakt.NewActorSystem("modus", opts...); err != nil { logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.") } else if err := actorSystem.Start(ctx); err != nil { logger.Fatal(ctx).Err(err).Msg("Failed to start actor system.") + } else if err := actorSystem.Inject(&wasmAgentInfo{}); err != nil { + logger.Fatal(ctx).Err(err).Msg("Failed to inject wasm agent info into actor system.") } else { _actorSystem = actorSystem } @@ -88,46 +56,59 @@ func Initialize(ctx context.Context) { } func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { - // restart actors that are already running, giving them the new plugin instance + // restart local actors that are already running, which will reload the plugin actors := _actorSystem.Actors() - runningAgents := make(map[string]bool, len(actors)) + localAgents := make(map[string]bool, len(actors)) for _, pid := range actors { - go func(f_ctx context.Context, f_pid *goakt.PID) { - if actor, ok := f_pid.Actor().(*wasmAgentActor); ok { - runningAgents[actor.agentId] = true - actor.plugin = plugin - if err := f_pid.Restart(f_ctx); err != nil { - logger.Err(f_ctx, err).Msgf("Failed to restart actor for agent %s.", actor.agentId) - } + if a, ok := pid.Actor().(*wasmAgentActor); ok { + localAgents[a.agentId] = true + if err := goakt.Tell(ctx, pid, &messages.RestartAgent{}); err != nil { + logger.Err(ctx, err).Str("agent_id", a.agentId).Msg("Failed to send restart agent message to actor.") } - }(ctx, pid) + } } // spawn actors for agents with state in the database, that are not already running - // TODO: when we scale out to allow more nodes in the cluster, we'll need to decide - // which node is responsible for spawning each actor. + // check both locally and on remote nodes in the cluster agents, err := db.QueryActiveAgents(ctx) if err != nil { logger.Err(ctx, err).Msg("Failed to query agents from database.") return err } - host := wasmhost.GetWasmHost(ctx) for _, agent := range agents { - if !runningAgents[agent.Id] { - go func(f_ctx context.Context, agentId string, agentName string) { - if _, err := spawnActorForAgent(host, plugin, agentId, agentName, false); err != nil { + if !localAgents[agent.Id] { + if _actorSystem.InCluster() { + actorName := getActorName(agent.Id) + if _, err := _actorSystem.RemoteActor(ctx, actorName); err == nil { + // found actor in cluster, no need to spawn it again + continue + } + } + go func(f_ctx context.Context, pluginName, agentId, agentName string) { + if err := spawnActorForAgent(f_ctx, pluginName, agentId, agentName, false); err != nil { logger.Err(f_ctx, err).Msgf("Failed to spawn actor for agent %s.", agentId) } - }(ctx, agent.Id, agent.Name) + }(ctx, plugin.Name(), agent.Id, agent.Name) } } return nil } -func beforeShutdown(ctx context.Context) error { +func beforeShutdown(ctx context.Context) { logger.Info(ctx).Msg("Actor system shutting down...") - return nil + + // stop all agent actors before shutdown so they can suspend properly + for _, pid := range _actorSystem.Actors() { + if _, ok := pid.Actor().(*wasmAgentActor); ok { + + // pass the pid so it can be used during shutdown as an event sender + ctx := context.WithValue(ctx, pidContextKey{}, pid) + if err := pid.Shutdown(ctx); err != nil { + logger.Err(ctx, err).Msgf("Failed to shutdown actor %s.", pid.Name()) + } + } + } } func Shutdown(ctx context.Context) { @@ -135,9 +116,21 @@ func Shutdown(ctx context.Context) { return } + beforeShutdown(ctx) + if err := _actorSystem.Stop(ctx); err != nil { logger.Err(ctx, err).Msg("Failed to shutdown actor system.") } logger.Info(ctx).Msg("Actor system shutdown complete.") } + +const wasmExtensionId = "wasm" + +type wasmExtension struct { + host wasmhost.WasmHost +} + +func (w *wasmExtension) ID() string { + return wasmExtensionId +} diff --git a/runtime/actors/agents.go b/runtime/actors/agents.go index da1e8b8d5..5461b1477 100644 --- a/runtime/actors/agents.go +++ b/runtime/actors/agents.go @@ -20,23 +20,26 @@ import ( "github.com/hypermodeinc/modus/runtime/messages" "github.com/hypermodeinc/modus/runtime/plugins" "github.com/hypermodeinc/modus/runtime/utils" - "github.com/hypermodeinc/modus/runtime/wasmhost" - "github.com/rs/xid" goakt "github.com/tochemey/goakt/v3/actor" "github.com/tochemey/goakt/v3/goaktpb" + + "github.com/rs/xid" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/structpb" "google.golang.org/protobuf/types/known/timestamppb" ) +type pidContextKey struct{} + type AgentInfo struct { Id string `json:"id"` Name string `json:"name"` Status AgentStatus `json:"status"` } -type AgentStatus = string +type AgentStatus string const ( AgentStatusStarting AgentStatus = "starting" @@ -50,7 +53,7 @@ const ( const agentStatusEventName = "agentStatusUpdated" -type agentEventAction = string +type agentEventAction string const ( agentEventActionInitialize agentEventAction = "initialize" @@ -66,97 +69,91 @@ func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) { } agentId := xid.New().String() - host := wasmhost.GetWasmHost(ctx) - pid, err := spawnActorForAgent(host, plugin, agentId, agentName, true) - if err != nil { + if err := spawnActorForAgent(ctx, plugin.Name(), agentId, agentName, true); err != nil { return nil, fmt.Errorf("error spawning actor for agent %s: %w", agentId, err) } - actor := pid.Actor().(*wasmAgentActor) - info := &AgentInfo{ - Id: actor.agentId, - Name: actor.agentName, - Status: actor.status, - } - - return info, nil + return GetAgentInfo(ctx, agentId) } -func spawnActorForAgent(host wasmhost.WasmHost, plugin *plugins.Plugin, agentId, agentName string, initializing bool) (*goakt.PID, error) { - // The actor needs to spawn in its own context, so we don't pass one in to this function. - // If we did, then when the original context was cancelled or completed, the actor initialization would be cancelled too. - ctx := context.Background() - ctx = context.WithValue(ctx, utils.WasmHostContextKey, host) - ctx = context.WithValue(ctx, utils.PluginContextKey, plugin) +func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName string, initializing bool) error { + + ctx = context.WithoutCancel(ctx) ctx = context.WithValue(ctx, utils.AgentIdContextKey, agentId) ctx = context.WithValue(ctx, utils.AgentNameContextKey, agentName) actor := &wasmAgentActor{ - agentId: agentId, - agentName: agentName, - host: host, - plugin: plugin, + // this only works because we always spawn locally the first time initializing: initializing, } actorName := getActorName(agentId) - pid, err := _actorSystem.Spawn(ctx, actorName, actor) - if err != nil { - logger.Err(ctx, err).Msg("Error spawning actor for agent.") - } - return pid, err + _, err := _actorSystem.Spawn(ctx, actorName, actor, + goakt.WithLongLived(), + goakt.WithDependencies(&wasmAgentInfo{ + AgentName: agentName, + PluginName: pluginName, + }), + ) + return err } func StopAgent(ctx context.Context, agentId string) (*AgentInfo, error) { - info, pid, err := ensureAgentReady(ctx, agentId) - if pid == nil && info != nil { - return info, nil - } else if err != nil { + actorName := getActorName(agentId) + if err := tell(ctx, actorName, &messages.ShutdownAgent{}); err != nil { + if !errors.Is(err, goakt.ErrActorNotFound) { + return nil, fmt.Errorf("error stopping agent %s: %w", agentId, err) + } + } + + // Don't ask the actor, because it might already be stopped. + info, err := getAgentInfoFromDatabase(ctx, agentId) + if err != nil { return nil, err } - // shut down the actor, which will then stop the agent - actor := pid.Actor().(*wasmAgentActor) - if actor.status != AgentStatusStopping && actor.status != AgentStatusTerminated { - actor.terminating = true - if err := pid.Shutdown(ctx); err != nil { - return nil, fmt.Errorf("error stopping agent %s: %w", agentId, err) - } + // If the agent is not yet terminated, we'll send back "stopping" + // so we don't have to wait for the actor to be stopped synchronously. + if info.Status != AgentStatusTerminated { + info.Status = AgentStatusStopping } + return info, nil - return &AgentInfo{ - Id: actor.agentId, - Name: actor.agentName, - Status: actor.status, - }, nil } -func GetAgentInfo(ctx context.Context, agentId string) (*AgentInfo, error) { - info, _, err := getAgentInfo(ctx, agentId) - return info, err +func getAgentInfoFromDatabase(ctx context.Context, agentId string) (*AgentInfo, error) { + if agent, e := db.GetAgentState(ctx, agentId); e == nil { + return &AgentInfo{ + Id: agent.Id, + Name: agent.Name, + Status: AgentStatus(agent.Status), + }, nil + } + return nil, fmt.Errorf("agent %s not found", agentId) } -func getAgentInfo(ctx context.Context, agentId string) (*AgentInfo, *goakt.PID, error) { - pid, err := getActorPid(ctx, agentId) - if errors.Is(err, goakt.ErrActorNotFound) { - if agent, e := db.GetAgentState(ctx, agentId); e == nil { - return &AgentInfo{ - Id: agent.Id, - Name: agent.Name, - Status: agent.Status, - }, nil, nil - } - return nil, nil, fmt.Errorf("agent %s not found", agentId) - } else if err != nil { - return nil, nil, err +func GetAgentInfo(ctx context.Context, agentId string) (*AgentInfo, error) { + actorName := getActorName(agentId) + request := &messages.AgentInfoRequest{} + + // We first try to ask the actor for its info. Use a short timeout to avoid blocking indefinitely. + response, err := ask(ctx, actorName, request, 500*time.Millisecond) + if err == nil { + msg := response.(*messages.AgentInfoResponse) + return &AgentInfo{ + Id: agentId, + Name: msg.Name, + Status: AgentStatus(msg.Status), + }, nil + } + + // If the actor is not found, or if the request timed out, we can check the database for the agent state. + // This is useful for agents that are terminated or suspended, or just busy processing another request. + if errors.Is(err, goakt.ErrActorNotFound) || errors.Is(err, goakt.ErrRequestTimeout) { + return getAgentInfoFromDatabase(ctx, agentId) } - actor := pid.Actor().(*wasmAgentActor) - return &AgentInfo{ - Id: actor.agentId, - Name: actor.agentName, - Status: actor.status, - }, pid, nil + return nil, fmt.Errorf("error getting agent info: %w", err) } type agentMessageResponse struct { @@ -164,63 +161,43 @@ type agentMessageResponse struct { Error *string } -func ensureAgentReady(ctx context.Context, agentId string) (*AgentInfo, *goakt.PID, error) { - info, pid, err := getAgentInfo(ctx, agentId) - if pid != nil { - return info, pid, nil - } - if info == nil { - return info, nil, err - } +func newAgentMessageDataResponse(data *string) *agentMessageResponse { + return &agentMessageResponse{Data: data} +} - switch info.Status { - case AgentStatusSuspended: - // the actor is suspended, so we can try to resume it - host := wasmhost.GetWasmHost(ctx) - plugin, ok := plugins.GetPluginFromContext(ctx) - if !ok { - return info, nil, fmt.Errorf("no plugin found in context for agent %s", agentId) - } - pid, err := spawnActorForAgent(host, plugin, agentId, info.Name, false) - return info, pid, err - case AgentStatusTerminated: - return info, nil, fmt.Errorf("agent %s is terminated", agentId) - default: - // this means the agent is running on another node - TODO: handle this somehow - return info, nil, fmt.Errorf("agent %s is %s, but not found in local actor system", agentId, info.Status) - } +func newAgentMessageErrorResponse(errMsg string) *agentMessageResponse { + return &agentMessageResponse{Error: &errMsg} } func SendAgentMessage(ctx context.Context, agentId string, msgName string, data *string, timeout int64) (*agentMessageResponse, error) { + actorName := getActorName(agentId) - _, pid, err := ensureAgentReady(ctx, agentId) - if err != nil { - e := err.Error() - return &agentMessageResponse{Error: &e}, err - } - - msg := &messages.AgentRequestMessage{ + msg := &messages.AgentRequest{ Name: msgName, Data: data, Respond: timeout > 0, } + var err error + var res proto.Message if timeout == 0 { - if err := goakt.Tell(ctx, pid, msg); err != nil { - return nil, fmt.Errorf("error sending message to agent %s: %w", pid.ID(), err) - } - return &agentMessageResponse{}, nil + err = tell(ctx, actorName, msg) + } else { + res, err = ask(ctx, actorName, msg, time.Duration(timeout)) } - res, err := goakt.Ask(ctx, pid, msg, time.Duration(timeout)) - if err != nil { - return nil, fmt.Errorf("error sending message to agent %s: %w", pid.ID(), err) + if errors.Is(err, goakt.ErrActorNotFound) { + return newAgentMessageErrorResponse("agent not found"), nil + } else if err != nil { + return nil, fmt.Errorf("error sending message to agent: %w", err) } - if response, ok := res.(*messages.AgentResponseMessage); ok { - return &agentMessageResponse{Data: response.Data}, nil + if res == nil { + return newAgentMessageDataResponse(nil), nil + } else if response, ok := res.(*messages.AgentResponse); ok { + return newAgentMessageDataResponse(response.Data), nil } else { - return nil, fmt.Errorf("unexpected response type: %T", res) + return nil, fmt.Errorf("unexpected agent response type: %T", res) } } @@ -238,7 +215,7 @@ func PublishAgentEvent(ctx context.Context, agentId, eventName string, eventData return fmt.Errorf("error creating event data value: %w", err) } - event := &messages.AgentEventMessage{ + event := &messages.AgentEvent{ Name: eventName, Data: dataValue, Timestamp: timestamppb.Now(), @@ -257,17 +234,22 @@ func PublishAgentEvent(ctx context.Context, agentId, eventName string, eventData topicActor := _actorSystem.TopicActor() - pid, err := getActorPid(ctx, agentId) + // if the pid is in context, we're being called as a host function + if pid, ok := ctx.Value(pidContextKey{}).(*goakt.PID); ok { + return pid.Tell(ctx, topicActor, pubMsg) + } + + // otherwise, we try to get the actor PID for the agent (we should avoid this) + pid, err := _actorSystem.LocalActor(getActorName(agentId)) if err == nil { + logger.Warn(ctx).Str("event", eventName).Any("data", eventData).Msg("Agent actor not in context. Using lookup to publish event.") return pid.Tell(ctx, topicActor, pubMsg) } - // publish anonymously if the actor is not found + // publish anonymously if the actor is not found (we should avoid this) if errors.Is(err, goakt.ErrActorNotFound) { - // For now, we use the topic actor directly to publish the message. - // See https://github.com/Tochemey/goakt/pull/761 - // TODO: use goakt.Tell after it's fixed - return topicActor.Tell(ctx, topicActor, pubMsg) + logger.Warn(ctx).Str("event", eventName).Any("data", eventData).Msg("Agent actor not found. Publishing event anonymously.") + return goakt.Tell(ctx, topicActor, pubMsg) } return err @@ -281,20 +263,6 @@ func getAgentTopic(agentId string) string { return getActorName(agentId) + ".events" } -func getActorPid(ctx context.Context, agentId string) (*goakt.PID, error) { - if _, err := xid.FromString(agentId); err != nil { - return nil, fmt.Errorf("invalid agent ID format: %s", agentId) - } - - actorName := getActorName(agentId) - _, pid, err := _actorSystem.ActorOf(ctx, actorName) - if err == nil { - return pid, nil - } - - return nil, fmt.Errorf("error getting actor for agent %s: %w", agentId, err) -} - func ListActiveAgents(ctx context.Context) ([]AgentInfo, error) { agents, err := db.QueryActiveAgents(ctx) if err != nil { @@ -306,7 +274,7 @@ func ListActiveAgents(ctx context.Context) ([]AgentInfo, error) { results = append(results, AgentInfo{ Id: agent.Id, Name: agent.Name, - Status: agent.Status, + Status: AgentStatus(agent.Status), }) } diff --git a/runtime/actors/cluster.go b/runtime/actors/cluster.go new file mode 100644 index 000000000..a6d5a3bd0 --- /dev/null +++ b/runtime/actors/cluster.go @@ -0,0 +1,234 @@ +/* + * Copyright 2025 Hypermode Inc. + * Licensed under the terms of the Apache License, Version 2.0 + * See the LICENSE file that accompanied this code for further details. + * + * SPDX-FileCopyrightText: 2025 Hypermode Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package actors + +import ( + "context" + "net/url" + "os" + "strconv" + "strings" + + "github.com/hypermodeinc/modus/runtime/app" + "github.com/hypermodeinc/modus/runtime/logger" + + goakt "github.com/tochemey/goakt/v3/actor" + "github.com/tochemey/goakt/v3/discovery" + "github.com/tochemey/goakt/v3/discovery/kubernetes" + "github.com/tochemey/goakt/v3/discovery/nats" + "github.com/tochemey/goakt/v3/remote" + "github.com/travisjeffery/go-dynaport" +) + +func clusterOptions(ctx context.Context) []goakt.Option { + + clusterMode := clusterMode() + if clusterMode == clusterModeNone { + if !app.IsDevEnvironment() { + logger.Warnf("Cluster mode is disabled, which is not recommended for production environments. Set MODUS_CLUSTER_MODE to enable clustering.") + } + return nil + } + + discoveryPort, remotingPort, peersPort := clusterPorts() + logger.Info(ctx). + Str("cluster_mode", clusterMode.String()). + Int("discovery_port", discoveryPort). + Int("remoting_port", remotingPort). + Int("peers_port", peersPort). + Msg("Clustering enabled.") + + var disco discovery.Provider + switch clusterMode { + case clusterModeNats: + natsUrl := clusterNatsUrl() + clusterHost := clusterHost() + logger.Info(ctx). + Str("cluster_host", clusterHost). + Str("nats_server", natsUrl). + Msg("Using NATS for node discovery.") + + disco = nats.NewDiscovery(&nats.Config{ + NatsSubject: "modus-gossip", + NatsServer: natsUrl, + Host: clusterHost, + DiscoveryPort: discoveryPort, + }) + + case clusterModeKubernetes: + namespace, ok := app.KubernetesNamespace() + if !ok { + logger.Fatal(ctx). + Msg("Kubernetes cluster mode enabled, but a Kubernetes namespace was not found. Ensure running in a Kubernetes environment.") + return nil + } + + logger.Info(ctx). + Str("namespace", namespace). + Msg("Using Kubernetes for node discovery.") + + disco = kubernetes.NewDiscovery(&kubernetes.Config{ + Namespace: namespace, + PodLabels: getPodLabels(), + DiscoveryPortName: "discovery-port", + RemotingPortName: "remoting-port", + PeersPortName: "peers-port", + }) + + default: + panic("Unsupported cluster mode: " + clusterMode.String()) + } + + var remotingHost string + if app.IsDevEnvironment() { + // only bind to localhost in development + remotingHost = "127.0.0.1" + } else { + // otherwise bind to all interfaces + remotingHost = "0.0.0.0" + } + + return []goakt.Option{ + goakt.WithRemote(remote.NewConfig(remotingHost, remotingPort)), + goakt.WithCluster(goakt.NewClusterConfig(). + WithDiscovery(disco). + WithDiscoveryPort(discoveryPort). + WithPeersPort(peersPort). + WithKinds(&wasmAgentActor{}, &subscriptionActor{}), + ), + } +} + +type goaktClusterMode int + +const ( + clusterModeNone goaktClusterMode = iota + clusterModeNats + clusterModeKubernetes +) + +func (c goaktClusterMode) String() string { + switch c { + case clusterModeNone: + return "none" + case clusterModeNats: + return "NATS" + case clusterModeKubernetes: + return "Kubernetes" + default: + return "unknown" + } +} + +func parseClusterMode(mode string) goaktClusterMode { + switch strings.ToLower(mode) { + case "none", "": + return clusterModeNone + case "nats": + return clusterModeNats + case "kubernetes", "k8s": + return clusterModeKubernetes + default: + logger.Warnf("Unknown cluster mode: '%s'. Defaulting to 'none'.", mode) + return clusterModeNone + } +} + +func clusterMode() goaktClusterMode { + return parseClusterMode(os.Getenv("MODUS_CLUSTER_MODE")) +} + +func clusterNatsUrl() string { + const envVar = "MODUS_CLUSTER_NATS_URL" + const defaultNatsUrl = "nats://localhost:4222" + urlStr := os.Getenv(envVar) + if urlStr == "" { + logger.Warnf("%s not set. Using default: %s", envVar, defaultNatsUrl) + return defaultNatsUrl + } + if _, err := url.Parse(urlStr); err != nil { + logger.Warnf("Invalid URL for %s. Using default: %s", envVar, defaultNatsUrl) + return defaultNatsUrl + } + + return urlStr +} + +func clusterHost() string { + const envVar = "MODUS_CLUSTER_HOST" + if host := os.Getenv(envVar); host != "" { + if _, err := url.Parse("http://" + host); err != nil { + logger.Fatalf("Invalid value for %s: %s.", envVar, host) + } + return host + } + + if app.IsDevEnvironment() { + return "localhost" + } else { + // this hack gets the same IP that the remoting system would bind to by default + rc := remote.NewConfig("0.0.0.0", 0) + _ = rc.Sanitize() + return rc.BindAddr() + } +} + +func clusterPorts() (discoveryPort, remotingPort, peersPort int) { + + // Get default ports dynamically + ports := dynaport.Get(3) + discoveryPort = ports[0] + remotingPort = ports[1] + peersPort = ports[2] + + // Override with environment variables if set + discoveryPort = getPortFromEnv("MODUS_CLUSTER_DISCOVERY_PORT", discoveryPort) + remotingPort = getPortFromEnv("MODUS_CLUSTER_REMOTING_PORT", remotingPort) + peersPort = getPortFromEnv("MODUS_CLUSTER_PEERS_PORT", peersPort) + + return +} + +func getPortFromEnv(envVar string, defaultPort int) int { + portStr := os.Getenv(envVar) + if portStr == "" { + return defaultPort + } + + port, err := strconv.Atoi(portStr) + if err != nil || port <= 0 { + logger.Warnf("Invalid value for %s. Using %d instead.", envVar, defaultPort) + return defaultPort + } + + return port +} + +func getPodLabels() map[string]string { + // example value: "app.kubernetes.io/name=modus,app.kubernetes.io/component=runtime" + if labels := os.Getenv("MODUS_CLUSTER_POD_LABELS"); labels != "" { + podLabels := make(map[string]string) + for label := range strings.SplitSeq(labels, ",") { + parts := strings.SplitN(label, "=", 2) + if len(parts) == 2 { + podLabels[strings.TrimSpace(parts[0])] = strings.TrimSpace(parts[1]) + } else { + logger.Warnf("Invalid pod label format: '%s'. Expected 'key=value'.", label) + } + } + return podLabels + } + + // defaults + return map[string]string{ + "app.kubernetes.io/name": "modus", + "app.kubernetes.io/component": "runtime", + } +} diff --git a/runtime/actors/misc.go b/runtime/actors/misc.go new file mode 100644 index 000000000..139ea10bf --- /dev/null +++ b/runtime/actors/misc.go @@ -0,0 +1,52 @@ +/* + * Copyright 2025 Hypermode Inc. + * Licensed under the terms of the Apache License, Version 2.0 + * See the LICENSE file that accompanied this code for further details. + * + * SPDX-FileCopyrightText: 2025 Hypermode Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package actors + +import ( + "context" + "fmt" + "time" + + goakt "github.com/tochemey/goakt/v3/actor" + + "google.golang.org/protobuf/proto" +) + +// Sends a message to an actor identified by its name. +// Uses either Tell or RemoteTell based on whether the actor is local or remote. +func tell(ctx context.Context, actorName string, message proto.Message) error { + addr, pid, err := _actorSystem.ActorOf(ctx, actorName) + if err != nil { + return err + } else if pid != nil { + return goakt.Tell(ctx, pid, message) + } else if addr != nil { + return goakt.NoSender.RemoteTell(ctx, addr, message) + } + return fmt.Errorf("failed to get address or PID for actor %s", actorName) +} + +// Sends a message to an actor identified by its name, then waits for a response within the timeout duration. +// Uses either Ask or RemoteAsk based on whether the actor is local or remote. +func ask(ctx context.Context, actorName string, message proto.Message, timeout time.Duration) (response proto.Message, err error) { + addr, pid, err := _actorSystem.ActorOf(ctx, actorName) + if err != nil { + return nil, err + } else if pid != nil { + return goakt.Ask(ctx, pid, message, timeout) + } else if addr != nil { + response, err := goakt.NoSender.RemoteAsk(ctx, addr, message, timeout) + if err != nil { + return nil, err + } + return response.UnmarshalNew() + } + return nil, fmt.Errorf("failed to get address or PID for actor %s", actorName) +} diff --git a/runtime/actors/subscriber.go b/runtime/actors/subscriber.go index 301d90eef..8164aa08b 100644 --- a/runtime/actors/subscriber.go +++ b/runtime/actors/subscriber.go @@ -59,22 +59,29 @@ func SubscribeForAgentEvents(ctx context.Context, agentId string, update func(da return fmt.Errorf("failed to spawn subscription actor: %w", err) } - subMsg := &goaktpb.Subscribe{ - Topic: getAgentTopic(agentId), - } + topic := getAgentTopic(agentId) + subscribe := &goaktpb.Subscribe{Topic: topic} - if err := subActor.Tell(ctx, _actorSystem.TopicActor(), subMsg); err != nil { + if err := subActor.Tell(ctx, _actorSystem.TopicActor(), subscribe); err != nil { return fmt.Errorf("failed to subscribe to topic: %w", err) } - // When the context is done, we will stop the subscription actor. + // When the context is done, we will unsubscribe and stop the subscription actor. // For example, the GraphQL subscription is closed or the client disconnects. go func() { <-ctx.Done() - // a new context is needed because the original context is already done - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + // reset cancellation because the original context is already done + ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), time.Second) defer cancel() + + logger.Debug(ctx).Msgf("Unsubscribing from topic %s and shutting down subscription actor %s", topic, subActor.Name()) + + unsubscribe := &goaktpb.Unsubscribe{Topic: topic} + if err := subActor.Tell(ctx, _actorSystem.TopicActor(), unsubscribe); err != nil { + logger.Err(ctx, err).Msg("Failed to unsubscribe from topic") + } + if err := subActor.Shutdown(ctx); err != nil { logger.Err(ctx, err).Msg("Failed to shut down subscription actor") } @@ -98,7 +105,7 @@ func (a *subscriptionActor) PostStop(ac *goakt.Context) error { } func (a *subscriptionActor) Receive(rc *goakt.ReceiveContext) { - if msg, ok := rc.Message().(*messages.AgentEventMessage); ok { + if msg, ok := rc.Message().(*messages.AgentEvent); ok { event := &agentEvent{ Name: msg.Name, Data: msg.Data, @@ -111,7 +118,7 @@ func (a *subscriptionActor) Receive(rc *goakt.ReceiveContext) { } if msg.Name == agentStatusEventName { - status := msg.Data.GetStructValue().Fields["status"].GetStringValue() + status := AgentStatus(msg.Data.GetStructValue().Fields["status"].GetStringValue()) if status == AgentStatusTerminated { rc.Shutdown() } diff --git a/runtime/actors/wasmagent.go b/runtime/actors/wasmagent.go index b07757eb5..c70eaa7f5 100644 --- a/runtime/actors/wasmagent.go +++ b/runtime/actors/wasmagent.go @@ -12,64 +12,194 @@ package actors import ( "context" "fmt" + "strings" "time" "github.com/hypermodeinc/modus/runtime/db" "github.com/hypermodeinc/modus/runtime/logger" "github.com/hypermodeinc/modus/runtime/messages" + "github.com/hypermodeinc/modus/runtime/pluginmanager" "github.com/hypermodeinc/modus/runtime/plugins" "github.com/hypermodeinc/modus/runtime/utils" "github.com/hypermodeinc/modus/runtime/wasmhost" wasm "github.com/tetratelabs/wazero/api" goakt "github.com/tochemey/goakt/v3/actor" + "github.com/tochemey/goakt/v3/goaktpb" ) type wasmAgentActor struct { agentId string agentName string status AgentStatus + pluginName string plugin *plugins.Plugin host wasmhost.WasmHost module wasm.Module buffers utils.OutputBuffers initializing bool - terminating bool } func (a *wasmAgentActor) PreStart(ac *goakt.Context) error { - ctx := a.newContext() + ctx := ac.Context() + a.buffers = utils.NewOutputBuffers() + + wasmExt := ac.Extension(wasmExtensionId).(*wasmExtension) + a.host = wasmExt.host + + if id, ok := strings.CutPrefix(ac.ActorName(), "agent-"); ok { + a.agentId = id + } else { + return fmt.Errorf("actor name %s does not start with 'agent-'", ac.ActorName()) + } + + if info, err := getAgentInfo(ac); err != nil { + return err + } else { + a.agentName = info.AgentName + a.pluginName = info.PluginName + } if err := a.activateAgent(ctx); err != nil { return fmt.Errorf("error activating agent: %w", err) } + return nil +} + +func (a *wasmAgentActor) Receive(rc *goakt.ReceiveContext) { + ctx := a.augmentContext(rc.Context(), rc.Self()) - if a.initializing { - if err := a.startAgent(ctx); err != nil { - return fmt.Errorf("error starting agent: %w", err) + switch msg := rc.Message().(type) { + + case *messages.AgentRequest: + if err := a.handleAgentRequest(ctx, rc, msg); err != nil { + rc.Err(err) + } + + case *goaktpb.PostStart: + if a.initializing { + if err := a.startAgent(ctx); err != nil { + rc.Err(fmt.Errorf("error starting agent: %w", err)) + } + a.initializing = false + } else { + if err := a.resumeAgent(ctx); err != nil { + rc.Err(fmt.Errorf("error resuming agent: %w", err)) + } + } + + case *messages.AgentInfoRequest: + rc.Response(&messages.AgentInfoResponse{ + Name: a.agentName, + Status: string(a.status), + }) + + case *messages.RestartAgent: + if a.status != AgentStatusRunning { + logger.Warn(ctx).Msgf("Agent is not %s, cannot restart now.", a.status) + return + } + if err := a.suspendAgent(ctx); err != nil { + rc.Err(fmt.Errorf("error suspending agent: %w", err)) + return + } + if err := a.deactivateAgent(ctx); err != nil { + rc.Err(fmt.Errorf("error deactivating agent: %w", err)) + return + } + if err := a.activateAgent(ctx); err != nil { + rc.Err(fmt.Errorf("error reactivating agent: %w", err)) + return } - a.initializing = false - } else { if err := a.resumeAgent(ctx); err != nil { - return fmt.Errorf("error resuming agent: %w", err) + rc.Err(fmt.Errorf("error resuming agent: %w", err)) + return } - } - return nil + case *messages.ShutdownAgent: + if a.status == AgentStatusStopping { + logger.Warn(ctx).Msg("Agent is already stopping, cannot shutdown again.") + return + } + if err := a.stopAgent(ctx); err != nil { + rc.Err(fmt.Errorf("error stopping agent: %w", err)) + } + rc.Shutdown() + + default: + logger.Warn(ctx).Msgf("Unhandled message type %T in wasm agent actor.", msg) + rc.Unhandled() + } } func (a *wasmAgentActor) PostStop(ac *goakt.Context) error { - ctx := a.newContext() - defer a.module.Close(ctx) + ctx := ac.Context() - if a.terminating { - if err := a.stopAgent(ctx); err != nil { - return fmt.Errorf("error stopping agent: %w", err) - } - } else { + // suspend the agent if it's not already suspended or terminated + if a.status != AgentStatusSuspended && a.status != AgentStatusTerminated { if err := a.suspendAgent(ctx); err != nil { - return fmt.Errorf("error suspending agent: %w", err) + logger.Err(ctx, err).Msg("Error suspending agent.") + // don't return on error - we'll still try to deactivate the agent + } + } + + // deactivate the agent to clean up resources + if err := a.deactivateAgent(ctx); err != nil { + return fmt.Errorf("error deactivating agent: %w", err) + } + return nil +} + +func (a *wasmAgentActor) handleAgentRequest(ctx context.Context, rc *goakt.ReceiveContext, msg *messages.AgentRequest) error { + if a.status != AgentStatusRunning { + return fmt.Errorf("cannot process message because agent is %s", a.status) + } + + logger.Info(ctx).Str("msg_name", msg.Name).Msg("Received message.") + start := time.Now() + + fnInfo, err := a.host.GetFunctionInfo("_modus_agent_handle_message") + if err != nil { + return err + } + + params := map[string]any{ + "msgName": msg.Name, + "data": msg.Data, + } + + execInfo, err := a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) + if err != nil { + return err + } + + if msg.Respond { + result := execInfo.Result() + response := &messages.AgentResponse{} + if result != nil { + switch result := result.(type) { + case string: + response.Data = &result + case *string: + response.Data = result + default: + err := fmt.Errorf("unexpected result type: %T", result) + logger.Err(ctx, err).Msg("Error handling message.") + return err + } } + // responding will cancel the context when finished, so we need to ensure + // that the context used for the remainder of this function is not cancelled. + ctx = context.WithoutCancel(ctx) + rc.Response(response) + } + + duration := time.Since(start) + logger.Info(ctx).Str("msg_name", msg.Name).Dur("duration_ms", duration).Msg("Message handled successfully.") + + // save the state after handling the message to ensure the state is up to date in case of hard termination + if err := a.saveState(ctx); err != nil { + logger.Err(ctx, err).Msg("Error saving agent state.") } return nil @@ -82,7 +212,7 @@ func (a *wasmAgentActor) updateStatus(ctx context.Context, status AgentStatus) e a.status = status - if err := db.UpdateAgentStatus(ctx, a.agentId, status); err != nil { + if err := db.UpdateAgentStatus(ctx, a.agentId, string(status)); err != nil { return fmt.Errorf("error updating agent status in database: %w", err) } @@ -107,7 +237,7 @@ func (a *wasmAgentActor) saveState(ctx context.Context) error { if err := db.WriteAgentState(ctx, db.AgentState{ Id: a.agentId, Name: a.agentName, - Status: a.status, + Status: string(a.status), Data: data, UpdatedAt: time.Now().UTC().Format(utils.TimeFormat), }); err != nil { @@ -134,81 +264,35 @@ func (a *wasmAgentActor) restoreState(ctx context.Context) error { return nil } -func (a *wasmAgentActor) Receive(rc *goakt.ReceiveContext) { - ctx := a.newContext() - - // NOTE: GoAkt will send a goaktpb.PostStart message, but we don't need to do anything with it. - - switch msg := rc.Message().(type) { - - case *messages.AgentRequestMessage: - - logger.Info(ctx).Str("msg_name", msg.Name).Msg("Received message.") - start := time.Now() - - fnInfo, err := a.host.GetFunctionInfo("_modus_agent_handle_message") - if err != nil { - rc.Err(err) - return - } - - params := map[string]any{ - "msgName": msg.Name, - "data": msg.Data, - } - - execInfo, err := a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) - if err != nil { - rc.Err(err) - return - } - - if msg.Respond { - result := execInfo.Result() - response := &messages.AgentResponseMessage{} - if result != nil { - switch result := result.(type) { - case string: - response.Data = &result - case *string: - response.Data = result - default: - err := fmt.Errorf("unexpected result type: %T", result) - logger.Err(ctx, err).Msg("Error handling message.") - rc.Err(err) - return - } - } - rc.Response(response) - } - - duration := time.Since(start) - logger.Info(ctx).Str("msg_name", msg.Name).Dur("duration_ms", duration).Msg("Message handled successfully.") - - // save the state after handling the message to ensure the state is up to date in case of hard termination - if err := a.saveState(ctx); err != nil { - logger.Err(ctx, err).Msg("Error saving agent state.") - } - - default: - rc.Unhandled() +func (a *wasmAgentActor) augmentContext(ctx context.Context, pid *goakt.PID) context.Context { + if ctx.Value(utils.WasmHostContextKey) == nil { + ctx = context.WithValue(ctx, utils.WasmHostContextKey, a.host) + } + if ctx.Value(utils.PluginContextKey) == nil { + ctx = context.WithValue(ctx, utils.PluginContextKey, a.plugin) + } + if ctx.Value(utils.AgentIdContextKey) == nil { + ctx = context.WithValue(ctx, utils.AgentIdContextKey, a.agentId) + } + if ctx.Value(utils.AgentNameContextKey) == nil { + ctx = context.WithValue(ctx, utils.AgentNameContextKey, a.agentName) + } + if ctx.Value(pidContextKey{}) == nil { + ctx = context.WithValue(ctx, pidContextKey{}, pid) } -} - -func (a *wasmAgentActor) newContext() context.Context { - ctx := context.Background() - ctx = context.WithValue(ctx, utils.WasmHostContextKey, a.host) - ctx = context.WithValue(ctx, utils.PluginContextKey, a.plugin) - ctx = context.WithValue(ctx, utils.AgentIdContextKey, a.agentId) - ctx = context.WithValue(ctx, utils.AgentNameContextKey, a.agentName) return ctx } func (a *wasmAgentActor) activateAgent(ctx context.Context) error { - a.buffers = utils.NewOutputBuffers() + if plugin, found := pluginmanager.GetPluginByName(a.pluginName); found { + a.plugin = plugin + } else { + return fmt.Errorf("plugin %s not found", a.pluginName) + } + if mod, err := a.host.GetModuleInstance(ctx, a.plugin, a.buffers); err != nil { - return fmt.Errorf("error getting module instance in actor pre-start: %w", err) + return err } else { a.module = mod } @@ -227,6 +311,14 @@ func (a *wasmAgentActor) activateAgent(ctx context.Context) error { return err } +func (a *wasmAgentActor) deactivateAgent(ctx context.Context) error { + if err := a.module.Close(ctx); err != nil { + return err + } + a.module = nil + return nil +} + func (a *wasmAgentActor) startAgent(ctx context.Context) error { logger.Info(ctx).Msg("Starting agent.") if err := a.saveState(ctx); err != nil { @@ -353,3 +445,39 @@ func (a *wasmAgentActor) setAgentState(ctx context.Context, data *string) error _, err = a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) return err } + +const agentInfoDependencyId = "agent-info" + +type wasmAgentInfo struct { + AgentName string `json:"agent"` + PluginName string `json:"plugin"` +} + +func (*wasmAgentInfo) ID() string { + return agentInfoDependencyId +} + +func (info *wasmAgentInfo) MarshalBinary() ([]byte, error) { + jsonBytes, err := utils.JsonSerialize(info) + if err != nil { + return nil, fmt.Errorf("error serializing agent info to JSON: %w", err) + } + return jsonBytes, nil +} + +func (info *wasmAgentInfo) UnmarshalBinary(data []byte) error { + if err := utils.JsonDeserialize(data, info); err != nil { + return fmt.Errorf("error deserializing agent info from JSON: %w", err) + } + return nil +} + +func getAgentInfo(ac *goakt.Context) (*wasmAgentInfo, error) { + deps := ac.Dependencies() + for _, dep := range deps { + if dep.ID() == agentInfoDependencyId { + return dep.(*wasmAgentInfo), nil + } + } + return nil, fmt.Errorf("agent info dependency not found") +} diff --git a/runtime/app/app.go b/runtime/app/app.go index 73261b68b..44820f42c 100644 --- a/runtime/app/app.go +++ b/runtime/app/app.go @@ -14,6 +14,7 @@ import ( "path" "path/filepath" "runtime" + "strings" "sync" "github.com/fatih/color" @@ -94,3 +95,13 @@ func ModusHomeDir() string { } return modusHome } + +func KubernetesNamespace() (string, bool) { + if ns := os.Getenv("NAMESPACE"); ns != "" { + return ns, true + } + if data, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil { + return strings.TrimSpace(string(data)), true + } + return "", false +} diff --git a/runtime/db/db.go b/runtime/db/db.go index 7a71ddbee..b7a229435 100644 --- a/runtime/db/db.go +++ b/runtime/db/db.go @@ -110,7 +110,7 @@ var _useModusDB bool func useModusDB() bool { _useModusDBOnce.Do(func() { - s := os.Getenv("MODUS_DB_USE_MODUSDB") + s := os.Getenv("MODUS_USE_MODUSDB") if s != "" { if value, err := strconv.ParseBool(s); err == nil { _useModusDB = value diff --git a/runtime/go.mod b/runtime/go.mod index 656da6723..5c02ee75e 100644 --- a/runtime/go.mod +++ b/runtime/go.mod @@ -1,6 +1,6 @@ module github.com/hypermodeinc/modus/runtime -go 1.24.3 +go 1.24.4 // trunk-ignore-all(osv-scanner/GHSA-9w9f-6mg8-jp7w): not affected by bleve vulnerability @@ -42,7 +42,7 @@ require ( github.com/tetratelabs/wazero v1.9.0 github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 - github.com/tochemey/goakt/v3 v3.5.1 + github.com/tochemey/goakt/v3 v3.6.2 github.com/travisjeffery/go-dynaport v1.0.0 github.com/twpayne/go-geom v1.6.1 github.com/wundergraph/graphql-go-tools/execution v1.3.1 @@ -206,6 +206,9 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mschoch/smat v0.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/nats.go v1.43.0 // indirect + github.com/nats-io/nkeys v0.4.11 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/panjf2000/ants/v2 v2.11.3 // indirect @@ -221,7 +224,7 @@ require ( github.com/prometheus/statsd_exporter v0.28.0 // indirect github.com/r3labs/sse/v2 v2.8.1 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect - github.com/redis/go-redis/v9 v9.9.0 // indirect + github.com/redis/go-redis/v9 v9.10.0 // indirect github.com/reugn/go-quartz v0.14.0 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect github.com/sagikazarmark/locafero v0.7.0 // indirect @@ -287,7 +290,7 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.33.0 // indirect k8s.io/klog/v2 v2.130.1 // indirect - k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff // indirect + k8s.io/kube-openapi v0.0.0-20250610211856-8b98d1ed966a // indirect k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 // indirect rogchap.com/v8go v0.9.0 // indirect sigs.k8s.io/json v0.0.0-20241014173422-cfa47c3a1cc8 // indirect diff --git a/runtime/go.sum b/runtime/go.sum index 11178d347..14ce86e66 100644 --- a/runtime/go.sum +++ b/runtime/go.sum @@ -755,8 +755,8 @@ github.com/r3labs/sse/v2 v2.8.1 h1:lZH+W4XOLIq88U5MIHOsLec7+R62uhz3bIi2yn0Sg8o= github.com/r3labs/sse/v2 v2.8.1/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/redis/go-redis/v9 v9.9.0 h1:URbPQ4xVQSQhZ27WMQVmZSo3uT3pL+4IdHVcYq2nVfM= -github.com/redis/go-redis/v9 v9.9.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/redis/go-redis/v9 v9.10.0 h1:FxwK3eV8p/CQa0Ch276C7u2d0eNC9kCmAYQ7mCXCzVs= +github.com/redis/go-redis/v9 v9.10.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/reugn/go-quartz v0.14.0 h1:KlIBAsOIw1JI8Rc7/f8VrrHBHOr+BiqrTiB35pRe84M= github.com/reugn/go-quartz v0.14.0/go.mod h1:00DVnBKq2Fxag/HlR9mGXjmHNlMFQ1n/LNM+Fn0jUaE= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -858,8 +858,8 @@ github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDW github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po= github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0= -github.com/tochemey/goakt/v3 v3.5.1 h1:WCZUviuQ3pw0lyXx0nKmpGOKOUhXDTdJmFDvWc4P9do= -github.com/tochemey/goakt/v3 v3.5.1/go.mod h1:S+nNYJJZZX5MG0xlontuo9ObXZPHkSuw3LBGOHxOgqY= +github.com/tochemey/goakt/v3 v3.6.2 h1:oAucnRBY9KX1MmGkTR8vmr/eOcxXqrR+RC/NgMHIgVY= +github.com/tochemey/goakt/v3 v3.6.2/go.mod h1:4DZZVaEUZmgyRV+AMl+V1Q0mnGSnVvMfgSqE773Y2Rk= github.com/tochemey/olric v0.2.3 h1:LGmsHLQBSEs3uasZNLT5MdS2pBMNJ71gSrXnYfkb62M= github.com/tochemey/olric v0.2.3/go.mod h1:BAD82xys8R8IAWFV+GC0B8I+J4QsYZvmPS5NT/dhmtI= github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw= @@ -1355,8 +1355,8 @@ k8s.io/client-go v0.33.1 h1:ZZV/Ks2g92cyxWkRRnfUDsnhNn28eFpt26aGc8KbXF4= k8s.io/client-go v0.33.1/go.mod h1:JAsUrl1ArO7uRVFWfcj6kOomSlCv+JpvIsp6usAGefA= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= -k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff h1:/usPimJzUKKu+m+TE36gUyGcf03XZEP0ZIKgKj35LS4= -k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff/go.mod h1:5jIi+8yX4RIb8wk3XwBo5Pq2ccx4FP10ohkbSKCZoK8= +k8s.io/kube-openapi v0.0.0-20250610211856-8b98d1ed966a h1:ZV3Zr+/7s7aVbjNGICQt+ppKWsF1tehxggNfbM7XnG8= +k8s.io/kube-openapi v0.0.0-20250610211856-8b98d1ed966a/go.mod h1:5jIi+8yX4RIb8wk3XwBo5Pq2ccx4FP10ohkbSKCZoK8= k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 h1:hwvWFiBzdWw1FhfY1FooPn3kzWuJ8tmbZBHi4zVsl1Y= k8s.io/utils v0.0.0-20250604170112-4c0f3b243397/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= rogchap.com/v8go v0.9.0 h1:wYbUCO4h6fjTamziHrzyrPnpFNuzPpjZY+nfmZjNaew= diff --git a/runtime/logger/logger.go b/runtime/logger/logger.go index ab12da44d..a6fc75d07 100644 --- a/runtime/logger/logger.go +++ b/runtime/logger/logger.go @@ -59,6 +59,11 @@ func Initialize() *zerolog.Logger { "agent_id", "execution_id", "duration_ms", + "cluster_mode", + "cluster_host", + "discovery_port", + "remoting_port", + "peers_port", } } else { consoleWriter.TimeFormat = "2006-01-02 15:04:05.000 -07:00" @@ -145,3 +150,27 @@ func Err(ctx context.Context, err error) *zerolog.Event { func Fatal(ctx context.Context) *zerolog.Event { return Get(ctx).Fatal() } + +func Debugf(msg string, v ...any) { + log.Debug().Msgf(msg, v...) +} + +func Infof(msg string, v ...any) { + log.Info().Msgf(msg, v...) +} + +func Warnf(msg string, v ...any) { + log.Warn().Msgf(msg, v...) +} + +func Errorf(msg string, v ...any) { + log.Error().Msgf(msg, v...) +} + +func Fatalf(msg string, v ...any) { + log.Fatal().Msgf(msg, v...) +} + +func Errf(err error, msg string, v ...any) { + log.Err(err).Msgf(msg, v...) +} diff --git a/runtime/messages/messages.pb.go b/runtime/messages/messages.pb.go index ecbadb011..c76a9fc78 100644 --- a/runtime/messages/messages.pb.go +++ b/runtime/messages/messages.pb.go @@ -23,7 +23,7 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type AgentRequestMessage struct { +type AgentRequest struct { state protoimpl.MessageState `protogen:"open.v1"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Data *string `protobuf:"bytes,2,opt,name=data,proto3,oneof" json:"data,omitempty"` @@ -32,20 +32,20 @@ type AgentRequestMessage struct { sizeCache protoimpl.SizeCache } -func (x *AgentRequestMessage) Reset() { - *x = AgentRequestMessage{} +func (x *AgentRequest) Reset() { + *x = AgentRequest{} mi := &file_messages_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *AgentRequestMessage) String() string { +func (x *AgentRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*AgentRequestMessage) ProtoMessage() {} +func (*AgentRequest) ProtoMessage() {} -func (x *AgentRequestMessage) ProtoReflect() protoreflect.Message { +func (x *AgentRequest) ProtoReflect() protoreflect.Message { mi := &file_messages_proto_msgTypes[0] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -57,53 +57,53 @@ func (x *AgentRequestMessage) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use AgentRequestMessage.ProtoReflect.Descriptor instead. -func (*AgentRequestMessage) Descriptor() ([]byte, []int) { +// Deprecated: Use AgentRequest.ProtoReflect.Descriptor instead. +func (*AgentRequest) Descriptor() ([]byte, []int) { return file_messages_proto_rawDescGZIP(), []int{0} } -func (x *AgentRequestMessage) GetName() string { +func (x *AgentRequest) GetName() string { if x != nil { return x.Name } return "" } -func (x *AgentRequestMessage) GetData() string { +func (x *AgentRequest) GetData() string { if x != nil && x.Data != nil { return *x.Data } return "" } -func (x *AgentRequestMessage) GetRespond() bool { +func (x *AgentRequest) GetRespond() bool { if x != nil { return x.Respond } return false } -type AgentResponseMessage struct { +type AgentResponse struct { state protoimpl.MessageState `protogen:"open.v1"` Data *string `protobuf:"bytes,1,opt,name=data,proto3,oneof" json:"data,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } -func (x *AgentResponseMessage) Reset() { - *x = AgentResponseMessage{} +func (x *AgentResponse) Reset() { + *x = AgentResponse{} mi := &file_messages_proto_msgTypes[1] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *AgentResponseMessage) String() string { +func (x *AgentResponse) String() string { return protoimpl.X.MessageStringOf(x) } -func (*AgentResponseMessage) ProtoMessage() {} +func (*AgentResponse) ProtoMessage() {} -func (x *AgentResponseMessage) ProtoReflect() protoreflect.Message { +func (x *AgentResponse) ProtoReflect() protoreflect.Message { mi := &file_messages_proto_msgTypes[1] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -115,19 +115,19 @@ func (x *AgentResponseMessage) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use AgentResponseMessage.ProtoReflect.Descriptor instead. -func (*AgentResponseMessage) Descriptor() ([]byte, []int) { +// Deprecated: Use AgentResponse.ProtoReflect.Descriptor instead. +func (*AgentResponse) Descriptor() ([]byte, []int) { return file_messages_proto_rawDescGZIP(), []int{1} } -func (x *AgentResponseMessage) GetData() string { +func (x *AgentResponse) GetData() string { if x != nil && x.Data != nil { return *x.Data } return "" } -type AgentEventMessage struct { +type AgentEvent struct { state protoimpl.MessageState `protogen:"open.v1"` Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Data *structpb.Value `protobuf:"bytes,2,opt,name=data,proto3,oneof" json:"data,omitempty"` @@ -136,20 +136,20 @@ type AgentEventMessage struct { sizeCache protoimpl.SizeCache } -func (x *AgentEventMessage) Reset() { - *x = AgentEventMessage{} +func (x *AgentEvent) Reset() { + *x = AgentEvent{} mi := &file_messages_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } -func (x *AgentEventMessage) String() string { +func (x *AgentEvent) String() string { return protoimpl.X.MessageStringOf(x) } -func (*AgentEventMessage) ProtoMessage() {} +func (*AgentEvent) ProtoMessage() {} -func (x *AgentEventMessage) ProtoReflect() protoreflect.Message { +func (x *AgentEvent) ProtoReflect() protoreflect.Message { mi := &file_messages_proto_msgTypes[2] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -161,32 +161,192 @@ func (x *AgentEventMessage) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use AgentEventMessage.ProtoReflect.Descriptor instead. -func (*AgentEventMessage) Descriptor() ([]byte, []int) { +// Deprecated: Use AgentEvent.ProtoReflect.Descriptor instead. +func (*AgentEvent) Descriptor() ([]byte, []int) { return file_messages_proto_rawDescGZIP(), []int{2} } -func (x *AgentEventMessage) GetName() string { +func (x *AgentEvent) GetName() string { if x != nil { return x.Name } return "" } -func (x *AgentEventMessage) GetData() *structpb.Value { +func (x *AgentEvent) GetData() *structpb.Value { if x != nil { return x.Data } return nil } -func (x *AgentEventMessage) GetTimestamp() *timestamppb.Timestamp { +func (x *AgentEvent) GetTimestamp() *timestamppb.Timestamp { if x != nil { return x.Timestamp } return nil } +type AgentInfoRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AgentInfoRequest) Reset() { + *x = AgentInfoRequest{} + mi := &file_messages_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AgentInfoRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AgentInfoRequest) ProtoMessage() {} + +func (x *AgentInfoRequest) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AgentInfoRequest.ProtoReflect.Descriptor instead. +func (*AgentInfoRequest) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{3} +} + +type AgentInfoResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AgentInfoResponse) Reset() { + *x = AgentInfoResponse{} + mi := &file_messages_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AgentInfoResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AgentInfoResponse) ProtoMessage() {} + +func (x *AgentInfoResponse) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AgentInfoResponse.ProtoReflect.Descriptor instead. +func (*AgentInfoResponse) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{4} +} + +func (x *AgentInfoResponse) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *AgentInfoResponse) GetStatus() string { + if x != nil { + return x.Status + } + return "" +} + +type RestartAgent struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RestartAgent) Reset() { + *x = RestartAgent{} + mi := &file_messages_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RestartAgent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RestartAgent) ProtoMessage() {} + +func (x *RestartAgent) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RestartAgent.ProtoReflect.Descriptor instead. +func (*RestartAgent) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{5} +} + +type ShutdownAgent struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *ShutdownAgent) Reset() { + *x = ShutdownAgent{} + mi := &file_messages_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *ShutdownAgent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ShutdownAgent) ProtoMessage() {} + +func (x *ShutdownAgent) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ShutdownAgent.ProtoReflect.Descriptor instead. +func (*ShutdownAgent) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{6} +} + var File_messages_proto protoreflect.FileDescriptor var file_messages_proto_rawDesc = string([]byte{ @@ -195,37 +355,43 @@ var file_messages_proto_rawDesc = string([]byte{ 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x65, 0x0a, 0x13, 0x41, 0x67, 0x65, - 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x88, 0x01, 0x01, 0x12, 0x18, 0x0a, - 0x07, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, - 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x64, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x64, 0x61, 0x74, 0x61, - 0x22, 0x38, 0x0a, 0x14, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x17, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x88, 0x01, - 0x01, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x22, 0x9b, 0x01, 0x0a, 0x11, 0x41, - 0x67, 0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, - 0x74, 0x61, 0x88, 0x01, 0x01, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, - 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, - 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, - 0x07, 0x0a, 0x05, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x42, 0x8f, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, - 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x0d, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, 0x5a, 0x2e, 0x67, 0x69, - 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x79, 0x70, 0x65, 0x72, 0x6d, 0x6f, - 0x64, 0x65, 0x69, 0x6e, 0x63, 0x2f, 0x6d, 0x6f, 0x64, 0x75, 0x73, 0x2f, 0x72, 0x75, 0x6e, 0x74, - 0x69, 0x6d, 0x65, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0xa2, 0x02, 0x03, 0x4d, - 0x58, 0x58, 0xaa, 0x02, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0xca, 0x02, 0x08, - 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0xe2, 0x02, 0x14, 0x4d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x73, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, - 0x02, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x5e, 0x0a, 0x0c, 0x41, 0x67, 0x65, + 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, + 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x64, + 0x61, 0x74, 0x61, 0x88, 0x01, 0x01, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x64, + 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x22, 0x31, 0x0a, 0x0d, 0x41, 0x67, 0x65, + 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x17, 0x0a, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x88, 0x01, 0x01, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x22, 0x94, 0x01, 0x0a, + 0x0a, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, + 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x88, 0x01, 0x01, + 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, + 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x64, + 0x61, 0x74, 0x61, 0x22, 0x12, 0x0a, 0x10, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x49, 0x6e, 0x66, 0x6f, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x3f, 0x0a, 0x11, 0x41, 0x67, 0x65, 0x6e, 0x74, + 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x12, 0x0a, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x0e, 0x0a, 0x0c, 0x52, 0x65, 0x73, 0x74, + 0x61, 0x72, 0x74, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x22, 0x0f, 0x0a, 0x0d, 0x53, 0x68, 0x75, 0x74, + 0x64, 0x6f, 0x77, 0x6e, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x42, 0x8f, 0x01, 0x0a, 0x0c, 0x63, 0x6f, + 0x6d, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x0d, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, 0x5a, 0x2e, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x79, 0x70, 0x65, 0x72, 0x6d, + 0x6f, 0x64, 0x65, 0x69, 0x6e, 0x63, 0x2f, 0x6d, 0x6f, 0x64, 0x75, 0x73, 0x2f, 0x72, 0x75, 0x6e, + 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0xa2, 0x02, 0x03, + 0x4d, 0x58, 0x58, 0xaa, 0x02, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0xca, 0x02, + 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0xe2, 0x02, 0x14, 0x4d, 0x65, 0x73, 0x73, + 0x61, 0x67, 0x65, 0x73, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, + 0xea, 0x02, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, }) var ( @@ -240,17 +406,21 @@ func file_messages_proto_rawDescGZIP() []byte { return file_messages_proto_rawDescData } -var file_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_messages_proto_goTypes = []any{ - (*AgentRequestMessage)(nil), // 0: messages.AgentRequestMessage - (*AgentResponseMessage)(nil), // 1: messages.AgentResponseMessage - (*AgentEventMessage)(nil), // 2: messages.AgentEventMessage - (*structpb.Value)(nil), // 3: google.protobuf.Value - (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp + (*AgentRequest)(nil), // 0: messages.AgentRequest + (*AgentResponse)(nil), // 1: messages.AgentResponse + (*AgentEvent)(nil), // 2: messages.AgentEvent + (*AgentInfoRequest)(nil), // 3: messages.AgentInfoRequest + (*AgentInfoResponse)(nil), // 4: messages.AgentInfoResponse + (*RestartAgent)(nil), // 5: messages.RestartAgent + (*ShutdownAgent)(nil), // 6: messages.ShutdownAgent + (*structpb.Value)(nil), // 7: google.protobuf.Value + (*timestamppb.Timestamp)(nil), // 8: google.protobuf.Timestamp } var file_messages_proto_depIdxs = []int32{ - 3, // 0: messages.AgentEventMessage.data:type_name -> google.protobuf.Value - 4, // 1: messages.AgentEventMessage.timestamp:type_name -> google.protobuf.Timestamp + 7, // 0: messages.AgentEvent.data:type_name -> google.protobuf.Value + 8, // 1: messages.AgentEvent.timestamp:type_name -> google.protobuf.Timestamp 2, // [2:2] is the sub-list for method output_type 2, // [2:2] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name @@ -272,7 +442,7 @@ func file_messages_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_messages_proto_rawDesc), len(file_messages_proto_rawDesc)), NumEnums: 0, - NumMessages: 3, + NumMessages: 7, NumExtensions: 0, NumServices: 0, }, diff --git a/runtime/pluginmanager/loader.go b/runtime/pluginmanager/loader.go index bf124caaf..400ac26b5 100644 --- a/runtime/pluginmanager/loader.go +++ b/runtime/pluginmanager/loader.go @@ -149,8 +149,8 @@ func unloadPlugin(ctx context.Context, filename string) error { span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) defer span.Finish() - p := globalPluginRegistry.GetByFile(filename) - if p == nil { + p, found := globalPluginRegistry.GetByFile(filename) + if !found { return fmt.Errorf("plugin not found: %s", filename) } diff --git a/runtime/pluginmanager/pluginregistry.go b/runtime/pluginmanager/pluginregistry.go index 1e7306c01..363d2c314 100644 --- a/runtime/pluginmanager/pluginregistry.go +++ b/runtime/pluginmanager/pluginregistry.go @@ -30,6 +30,10 @@ func GetRegisteredPlugins() []*plugins.Plugin { return globalPluginRegistry.GetAll() } +func GetPluginByName(name string) (*plugins.Plugin, bool) { + return globalPluginRegistry.GetByName(name) +} + type pluginRegistry struct { idRevIndex map[*plugins.Plugin]string idIndex map[string]*plugins.Plugin @@ -105,24 +109,18 @@ func (pr *pluginRegistry) GetById(id string) *plugins.Plugin { return nil } -func (pr *pluginRegistry) GetByName(name string) *plugins.Plugin { +func (pr *pluginRegistry) GetByName(name string) (*plugins.Plugin, bool) { pr.mutex.RLock() defer pr.mutex.RUnlock() - if plugin, found := pr.nameIndex[name]; found { - return plugin - } - - return nil + plugin, found := pr.nameIndex[name] + return plugin, found } -func (pr *pluginRegistry) GetByFile(filename string) *plugins.Plugin { +func (pr *pluginRegistry) GetByFile(filename string) (*plugins.Plugin, bool) { pr.mutex.RLock() defer pr.mutex.RUnlock() - if plugin, found := pr.fileIndex[filename]; found { - return plugin - } - - return nil + plugin, found := pr.fileIndex[filename] + return plugin, found } diff --git a/runtime/protos/messages.proto b/runtime/protos/messages.proto index 42af26c66..57b36a6fc 100644 --- a/runtime/protos/messages.proto +++ b/runtime/protos/messages.proto @@ -7,18 +7,28 @@ option go_package = "github.com/hypermodeinc/modus/runtime/messages"; import "google/protobuf/struct.proto"; import "google/protobuf/timestamp.proto"; -message AgentRequestMessage { +message AgentRequest { string name = 1; optional string data = 2; bool respond = 3; } -message AgentResponseMessage { +message AgentResponse { optional string data = 1; } -message AgentEventMessage { +message AgentEvent { string name = 1; optional google.protobuf.Value data = 2; google.protobuf.Timestamp timestamp = 3; } + +message AgentInfoRequest {} + +message AgentInfoResponse { + string name = 1; + string status = 2; +} + +message RestartAgent {} +message ShutdownAgent {} diff --git a/runtime/utils/sentry.go b/runtime/utils/sentry.go index 4c6c8c4f3..4b983482e 100644 --- a/runtime/utils/sentry.go +++ b/runtime/utils/sentry.go @@ -155,9 +155,7 @@ func sentryAddExtras(event *sentry.Event) { event.Extra = make(map[string]any) } - // Capture the k8s namespace environment variable. - ns := os.Getenv("NAMESPACE") - if ns != "" { + if ns, ok := app.KubernetesNamespace(); ok { event.Extra["namespace"] = ns } } diff --git a/sdk/go/pkg/agents/agents.go b/sdk/go/pkg/agents/agents.go index 41c72bc2d..61c9229fe 100644 --- a/sdk/go/pkg/agents/agents.go +++ b/sdk/go/pkg/agents/agents.go @@ -33,7 +33,7 @@ type AgentEvent interface { EventName() string } -type AgentStatus = string +type AgentStatus string const ( AgentStatusStarting AgentStatus = "starting" @@ -45,7 +45,7 @@ const ( AgentStatusTerminated AgentStatus = "terminated" ) -type agentEventAction = string +type agentEventAction string const ( agentEventActionInitialize agentEventAction = "initialize" @@ -178,7 +178,7 @@ func handleEvent(action string) { } agent := *activeAgent - switch action { + switch agentEventAction(action) { case agentEventActionInitialize: if err := (agent).OnInitialize(); err != nil { console.Errorf("Error initializing agent %s: %v", agent.Name(), err) diff --git a/sdk/go/pkg/agents/imports_mock.go b/sdk/go/pkg/agents/imports_mock.go index 648ec0c08..e087b202d 100644 --- a/sdk/go/pkg/agents/imports_mock.go +++ b/sdk/go/pkg/agents/imports_mock.go @@ -28,7 +28,7 @@ func hostStartAgent(agentName *string) *AgentInfo { return &AgentInfo{ Id: "abc123", Name: *agentName, - Status: AgentStatusStarting, + Status: string(AgentStatusStarting), } } @@ -51,7 +51,7 @@ func hostStopAgent(agentId *string) *AgentInfo { return &AgentInfo{ Id: "abc123", Name: "Counter", - Status: AgentStatusStopping, + Status: string(AgentStatusStopping), } } return nil @@ -64,7 +64,7 @@ func hostGetAgentInfo(agentId *string) *AgentInfo { return &AgentInfo{ Id: "abc123", Name: "Counter", - Status: AgentStatusRunning, + Status: string(AgentStatusRunning), } } @@ -75,8 +75,8 @@ func hostListAgents() *[]AgentInfo { ListAgentsCallStack.Push() return &[]AgentInfo{ - {Id: "abc123", Name: "Counter", Status: AgentStatusRunning}, - {Id: "def456", Name: "Logger", Status: AgentStatusRunning}, + {Id: "abc123", Name: "Counter", Status: string(AgentStatusRunning)}, + {Id: "def456", Name: "Logger", Status: string(AgentStatusRunning)}, } }