diff --git a/.trunk/configs/cspell.json b/.trunk/configs/cspell.json index 22706888a..8468436b0 100644 --- a/.trunk/configs/cspell.json +++ b/.trunk/configs/cspell.json @@ -117,6 +117,7 @@ "mapdata", "mapstructure", "mattn", + "memberlist", "metagen", "millis", "minilm", diff --git a/.vscode/launch.json b/.vscode/launch.json index 1d700c767..61dc41cfe 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -29,7 +29,7 @@ "MODUS_ENV": "dev", "MODUS_CLUSTER_MODE": "NATS", "MODUS_CLUSTER_NATS_URL": "nats://localhost:4222", - "MODUS_DEBUG_ACTORS": "true", + // "MODUS_DEBUG_ACTORS": "true", "MODUS_DEBUG": "true", "MODUS_USE_MODUSDB": "false", "MODUS_DB": "postgresql://postgres:postgres@localhost:5432/modus?sslmode=disable" // checkov:skip=CKV_SECRET_4 diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f3b868e2..e043cadc4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ # Change Log +## 2025-06-21 - Runtime 0.18.0-alpha.9 + +- fix: improve cluster resiliency [#900](https://github.com/hypermodeinc/modus/pull/900) + ## 2025-06-18 - Runtime 0.18.0-alpha.8 - feat: cluster mode [#895](https://github.com/hypermodeinc/modus/pull/895) diff --git a/runtime/actors/actorlogger.go b/runtime/actors/actorlogger.go index 1df5b7f39..d940890d6 100644 --- a/runtime/actors/actorlogger.go +++ b/runtime/actors/actorlogger.go @@ -13,6 +13,8 @@ import ( "fmt" "io" "log" + "slices" + "strings" "github.com/hypermodeinc/modus/runtime/logger" "github.com/hypermodeinc/modus/runtime/utils" @@ -20,54 +22,82 @@ import ( actorLog "github.com/tochemey/goakt/v3/log" ) +// some messages are ignored during shutdown because they are expected +var shutdownIgnoredMessages = []string{ + "Failed to acquire semaphore: context canceled", + " is down. modus is going to shutdown.", +} + func newActorLogger(logger *zerolog.Logger) *actorLogger { var minLevel zerolog.Level if utils.EnvVarFlagEnabled("MODUS_DEBUG_ACTORS") { minLevel = zerolog.DebugLevel } else { - // goakt info level is too noisy, so default to show only errors - minLevel = zerolog.ErrorLevel + // goakt info level is too noisy, so default to show warnings and above + minLevel = zerolog.WarnLevel } l := logger.Level(minLevel).With().Str("component", "actors").Logger() - return &actorLogger{&l} + return &actorLogger{logger: &l} } type actorLogger struct { - logger *zerolog.Logger + logger *zerolog.Logger + paused bool + shuttingDown bool +} + +func (al *actorLogger) Pause() { + al.paused = true +} + +func (al *actorLogger) Resume() { + al.paused = false +} + +func (al *actorLogger) writeToLog(level zerolog.Level, msg string) { + if al.paused { + return + } + if al.shuttingDown && slices.ContainsFunc(shutdownIgnoredMessages, func(s string) bool { + return strings.Contains(msg, s) + }) { + return + } + al.logger.WithLevel(level).Msg(msg) } func (al *actorLogger) Debug(v ...any) { - al.logger.Debug().Msg(fmt.Sprint(v...)) + al.writeToLog(zerolog.DebugLevel, fmt.Sprint(v...)) } func (al *actorLogger) Debugf(format string, v ...any) { - al.logger.Debug().Msgf(format, v...) + al.writeToLog(zerolog.DebugLevel, fmt.Sprintf(format, v...)) } func (al *actorLogger) Info(v ...any) { - al.logger.Info().Msg(fmt.Sprint(v...)) + al.writeToLog(zerolog.InfoLevel, fmt.Sprint(v...)) } func (al *actorLogger) Infof(format string, v ...any) { - al.logger.Info().Msgf(format, v...) + al.writeToLog(zerolog.InfoLevel, fmt.Sprintf(format, v...)) } func (al *actorLogger) Warn(v ...any) { - al.logger.Warn().Msg(fmt.Sprint(v...)) + al.writeToLog(zerolog.WarnLevel, fmt.Sprint(v...)) } func (al *actorLogger) Warnf(format string, v ...any) { - al.logger.Warn().Msgf(format, v...) + al.writeToLog(zerolog.WarnLevel, fmt.Sprintf(format, v...)) } func (al *actorLogger) Error(v ...any) { - al.logger.Error().Msg(fmt.Sprint(v...)) + al.writeToLog(zerolog.ErrorLevel, fmt.Sprint(v...)) } func (al *actorLogger) Errorf(format string, v ...any) { - al.logger.Error().Msgf(format, v...) + al.writeToLog(zerolog.ErrorLevel, fmt.Sprintf(format, v...)) } func (al *actorLogger) Fatal(v ...any) { diff --git a/runtime/actors/actorsystem.go b/runtime/actors/actorsystem.go index df4f4d183..10f1d82e9 100644 --- a/runtime/actors/actorsystem.go +++ b/runtime/actors/actorsystem.go @@ -11,6 +11,7 @@ package actors import ( "context" + "fmt" "time" "github.com/hypermodeinc/modus/runtime/db" @@ -33,6 +34,7 @@ func Initialize(ctx context.Context) { opts := []goakt.Option{ goakt.WithLogger(newActorLogger(logger.Get(ctx))), + goakt.WithCoordinatedShutdown(beforeShutdown), goakt.WithPubSub(), goakt.WithActorInitTimeout(10 * time.Second), // TODO: adjust this value, or make it configurable goakt.WithActorInitMaxRetries(1), // TODO: adjust this value, or make it configurable @@ -50,6 +52,8 @@ func Initialize(ctx context.Context) { _actorSystem = actorSystem } + waitForClusterSync() + logger.Info(ctx).Msg("Actor system started.") pluginmanager.RegisterPluginLoadedCallback(loadAgentActors) @@ -70,54 +74,76 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { // spawn actors for agents with state in the database, that are not already running // check both locally and on remote nodes in the cluster + logger.Debug(ctx).Msg("Restoring agent actors from database.") agents, err := db.QueryActiveAgents(ctx) if err != nil { - logger.Err(ctx, err).Msg("Failed to query agents from database.") - return err + return fmt.Errorf("failed to query active agents: %w", err) } + inCluster := _actorSystem.InCluster() for _, agent := range agents { if !localAgents[agent.Id] { - if _actorSystem.InCluster() { + if inCluster { actorName := getActorName(agent.Id) - if _, err := _actorSystem.RemoteActor(ctx, actorName); err == nil { - // found actor in cluster, no need to spawn it again + if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil { + logger.Err(ctx, err).Msgf("Failed to check if actor %s exists in cluster.", actorName) + } else if exists { + // if the actor already exists in the cluster, skip spawning it continue } } - go func(f_ctx context.Context, pluginName, agentId, agentName string) { - if err := spawnActorForAgent(f_ctx, pluginName, agentId, agentName, false); err != nil { - logger.Err(f_ctx, err).Msgf("Failed to spawn actor for agent %s.", agentId) - } - }(ctx, plugin.Name(), agent.Id, agent.Name) + if err := spawnActorForAgent(ctx, plugin.Name(), agent.Id, agent.Name, false); err != nil { + logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id) + } } } return nil } -func beforeShutdown(ctx context.Context) { +func beforeShutdown(ctx context.Context) error { + _actorSystem.Logger().(*actorLogger).shuttingDown = true logger.Info(ctx).Msg("Actor system shutting down...") + actors := _actorSystem.Actors() - // stop all agent actors before shutdown so they can suspend properly - for _, pid := range _actorSystem.Actors() { - if _, ok := pid.Actor().(*wasmAgentActor); ok { + // Suspend all local running agent actors first, which allows them to gracefully stop and persist their state. + // In cluster mode, this will also allow the actor to resume on another node after this node shuts down. + for _, pid := range actors { + if actor, ok := pid.Actor().(*wasmAgentActor); ok && pid.IsRunning() { + if actor.status == AgentStatusRunning { + ctx := actor.augmentContext(ctx, pid) + if err := actor.suspendAgent(ctx); err != nil { + logger.Err(ctx, err).Str("agent_id", actor.agentId).Msg("Failed to suspend agent actor.") + } + } + } + } - // pass the pid so it can be used during shutdown as an event sender - ctx := context.WithValue(ctx, pidContextKey{}, pid) + // Then shut down subscription actors. They will have received the suspend message already. + for _, pid := range actors { + if _, ok := pid.Actor().(*subscriptionActor); ok && pid.IsRunning() { if err := pid.Shutdown(ctx); err != nil { logger.Err(ctx, err).Msgf("Failed to shutdown actor %s.", pid.Name()) } } } + + waitForClusterSync() + + // then allow the actor system to continue with its shutdown process + return nil +} + +func waitForClusterSync() { + if clusterEnabled() { + time.Sleep(peerSyncInterval() * 2) + } } func Shutdown(ctx context.Context) { if _actorSystem == nil { - return + logger.Fatal(ctx).Msg("Actor system is not initialized, cannot shutdown.") } - beforeShutdown(ctx) - if err := _actorSystem.Stop(ctx); err != nil { logger.Err(ctx, err).Msg("Failed to shutdown actor system.") } diff --git a/runtime/actors/cluster.go b/runtime/actors/cluster.go index a6d5a3bd0..b831fdf4e 100644 --- a/runtime/actors/cluster.go +++ b/runtime/actors/cluster.go @@ -13,8 +13,8 @@ import ( "context" "net/url" "os" - "strconv" "strings" + "time" "github.com/hypermodeinc/modus/runtime/app" "github.com/hypermodeinc/modus/runtime/logger" @@ -95,12 +95,18 @@ func clusterOptions(ctx context.Context) []goakt.Option { remotingHost = "0.0.0.0" } + readTimeout := time.Duration(getIntFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 2)) * time.Second + writeTimeout := time.Duration(getIntFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 2)) * time.Second + return []goakt.Option{ + goakt.WithPeerStateLoopInterval(peerSyncInterval()), goakt.WithRemote(remote.NewConfig(remotingHost, remotingPort)), goakt.WithCluster(goakt.NewClusterConfig(). WithDiscovery(disco). WithDiscoveryPort(discoveryPort). WithPeersPort(peersPort). + WithReadTimeout(readTimeout). + WithWriteTimeout(writeTimeout). WithKinds(&wasmAgentActor{}, &subscriptionActor{}), ), } @@ -145,6 +151,10 @@ func clusterMode() goaktClusterMode { return parseClusterMode(os.Getenv("MODUS_CLUSTER_MODE")) } +func clusterEnabled() bool { + return clusterMode() != clusterModeNone +} + func clusterNatsUrl() string { const envVar = "MODUS_CLUSTER_NATS_URL" const defaultNatsUrl = "nats://localhost:4222" @@ -171,7 +181,8 @@ func clusterHost() string { } if app.IsDevEnvironment() { - return "localhost" + // Note, forcing IPv4 here avoids memberlist attempting to bind to IPv6 that we're not listening on. + return "127.0.0.1" } else { // this hack gets the same IP that the remoting system would bind to by default rc := remote.NewConfig("0.0.0.0", 0) @@ -182,33 +193,18 @@ func clusterHost() string { func clusterPorts() (discoveryPort, remotingPort, peersPort int) { - // Get default ports dynamically + // Get default ports dynamically, but use environment variables if set ports := dynaport.Get(3) - discoveryPort = ports[0] - remotingPort = ports[1] - peersPort = ports[2] - - // Override with environment variables if set - discoveryPort = getPortFromEnv("MODUS_CLUSTER_DISCOVERY_PORT", discoveryPort) - remotingPort = getPortFromEnv("MODUS_CLUSTER_REMOTING_PORT", remotingPort) - peersPort = getPortFromEnv("MODUS_CLUSTER_PEERS_PORT", peersPort) + discoveryPort = getIntFromEnv("MODUS_CLUSTER_DISCOVERY_PORT", ports[0]) + remotingPort = getIntFromEnv("MODUS_CLUSTER_REMOTING_PORT", ports[1]) + peersPort = getIntFromEnv("MODUS_CLUSTER_PEERS_PORT", ports[2]) return } -func getPortFromEnv(envVar string, defaultPort int) int { - portStr := os.Getenv(envVar) - if portStr == "" { - return defaultPort - } - - port, err := strconv.Atoi(portStr) - if err != nil || port <= 0 { - logger.Warnf("Invalid value for %s. Using %d instead.", envVar, defaultPort) - return defaultPort - } - - return port +func peerSyncInterval() time.Duration { + // we use a tight sync interval by default, to ensure quick peer discovery + return time.Duration(getIntFromEnv("MODUS_CLUSTER_PEER_SYNC_MS", 500)) * time.Millisecond } func getPodLabels() map[string]string { diff --git a/runtime/actors/misc.go b/runtime/actors/misc.go index 139ea10bf..e76506f29 100644 --- a/runtime/actors/misc.go +++ b/runtime/actors/misc.go @@ -12,8 +12,11 @@ package actors import ( "context" "fmt" + "os" + "strconv" "time" + "github.com/hypermodeinc/modus/runtime/logger" goakt "github.com/tochemey/goakt/v3/actor" "google.golang.org/protobuf/proto" @@ -50,3 +53,19 @@ func ask(ctx context.Context, actorName string, message proto.Message, timeout t } return nil, fmt.Errorf("failed to get address or PID for actor %s", actorName) } + +// Retrieves an integer value from an environment variable. +func getIntFromEnv(envVar string, defaultValue int) int { + str := os.Getenv(envVar) + if str == "" { + return defaultValue + } + + value, err := strconv.Atoi(str) + if err != nil || value <= 0 { + logger.Warnf("Invalid value for %s. Using %d instead.", envVar, defaultValue) + return defaultValue + } + + return value +} diff --git a/runtime/go.mod b/runtime/go.mod index 7f9e8ae83..b7e9e3fbc 100644 --- a/runtime/go.mod +++ b/runtime/go.mod @@ -42,7 +42,7 @@ require ( github.com/tetratelabs/wazero v1.9.0 github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 - github.com/tochemey/goakt/v3 v3.6.2 + github.com/tochemey/goakt/v3 v3.6.4 github.com/travisjeffery/go-dynaport v1.0.0 github.com/twpayne/go-geom v1.6.1 github.com/wundergraph/graphql-go-tools/execution v1.3.2-0.20250618131920-dd0d9cc2a919 @@ -52,9 +52,9 @@ require ( golang.org/x/sys v0.33.0 google.golang.org/grpc v1.73.0 google.golang.org/protobuf v1.36.6 - k8s.io/api v0.33.1 - k8s.io/apimachinery v0.33.1 - k8s.io/client-go v0.33.1 + k8s.io/api v0.33.2 + k8s.io/apimachinery v0.33.2 + k8s.io/client-go v0.33.2 sigs.k8s.io/controller-runtime v0.21.0 ) diff --git a/runtime/go.sum b/runtime/go.sum index 7c42da23e..a2be3e935 100644 --- a/runtime/go.sum +++ b/runtime/go.sum @@ -860,8 +860,16 @@ github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDW github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po= github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0= -github.com/tochemey/goakt/v3 v3.6.2 h1:oAucnRBY9KX1MmGkTR8vmr/eOcxXqrR+RC/NgMHIgVY= -github.com/tochemey/goakt/v3 v3.6.2/go.mod h1:4DZZVaEUZmgyRV+AMl+V1Q0mnGSnVvMfgSqE773Y2Rk= +github.com/tochemey/goakt/v3 v3.6.3-0.20250618223233-7f41e6a9633c h1:LG2v6TCSPGqhbzSEfaDpcF8xi2q+qQLOycvTDczf0f4= +github.com/tochemey/goakt/v3 v3.6.3-0.20250618223233-7f41e6a9633c/go.mod h1:k/i5dX6WqSMDzbFi/JY5+KRiL7TZQ1by/DeeSGZwWig= +github.com/tochemey/goakt/v3 v3.6.3-0.20250619184937-bee36bdbb97f h1:urkE4+wMItr8SgZy0vcbUjTdQzcnn++7Fsmp8fE1AC8= +github.com/tochemey/goakt/v3 v3.6.3-0.20250619184937-bee36bdbb97f/go.mod h1:k/i5dX6WqSMDzbFi/JY5+KRiL7TZQ1by/DeeSGZwWig= +github.com/tochemey/goakt/v3 v3.6.3-0.20250619212444-09365547acb0 h1:jlX5bfpRPZmL/mC5kSziVfoz345PcY/gCV4ubUoo2FU= +github.com/tochemey/goakt/v3 v3.6.3-0.20250619212444-09365547acb0/go.mod h1:k/i5dX6WqSMDzbFi/JY5+KRiL7TZQ1by/DeeSGZwWig= +github.com/tochemey/goakt/v3 v3.6.3 h1:OuIf65TMKmrjU/Hmkpdlc3Criv2ImFrUbypJKkoVU2M= +github.com/tochemey/goakt/v3 v3.6.3/go.mod h1:k/i5dX6WqSMDzbFi/JY5+KRiL7TZQ1by/DeeSGZwWig= +github.com/tochemey/goakt/v3 v3.6.4 h1:VyFTP1Ng1It0YQp/rYbdUPAxYN/VJWAm3w2rJQlfQVc= +github.com/tochemey/goakt/v3 v3.6.4/go.mod h1:6HHDMI+BXFPt+Pq59svoGdxXTme/UsqX868FG1gICO4= github.com/tochemey/olric v0.2.3 h1:LGmsHLQBSEs3uasZNLT5MdS2pBMNJ71gSrXnYfkb62M= github.com/tochemey/olric v0.2.3/go.mod h1:BAD82xys8R8IAWFV+GC0B8I+J4QsYZvmPS5NT/dhmtI= github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw= @@ -1349,12 +1357,15 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= k8s.io/api v0.33.1 h1:tA6Cf3bHnLIrUK4IqEgb2v++/GYUtqiu9sRVk3iBXyw= k8s.io/api v0.33.1/go.mod h1:87esjTn9DRSRTD4fWMXamiXxJhpOIREjWOSjsW1kEHw= +k8s.io/api v0.33.2/go.mod h1:fhrbphQJSM2cXzCWgqU29xLDuks4mu7ti9vveEnpSXs= k8s.io/apiextensions-apiserver v0.33.0 h1:d2qpYL7Mngbsc1taA4IjJPRJ9ilnsXIrndH+r9IimOs= k8s.io/apiextensions-apiserver v0.33.0/go.mod h1:VeJ8u9dEEN+tbETo+lFkwaaZPg6uFKLGj5vyNEwwSzc= k8s.io/apimachinery v0.33.1 h1:mzqXWV8tW9Rw4VeW9rEkqvnxj59k1ezDUl20tFK/oM4= k8s.io/apimachinery v0.33.1/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM= +k8s.io/apimachinery v0.33.2/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM= k8s.io/client-go v0.33.1 h1:ZZV/Ks2g92cyxWkRRnfUDsnhNn28eFpt26aGc8KbXF4= k8s.io/client-go v0.33.1/go.mod h1:JAsUrl1ArO7uRVFWfcj6kOomSlCv+JpvIsp6usAGefA= +k8s.io/client-go v0.33.2/go.mod h1:9mCgT4wROvL948w6f6ArJNb7yQd7QsvqavDeZHvNmHo= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250610211856-8b98d1ed966a h1:ZV3Zr+/7s7aVbjNGICQt+ppKWsF1tehxggNfbM7XnG8=