diff --git a/CHANGELOG.md b/CHANGELOG.md index 3dfe76659..b74299719 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ # Change Log +## 2025-06-23 - Runtime 0.18.0-alpha.11 + +- fix: adjust cluster sync settings and delays [#902](https://github.com/hypermodeinc/modus/pull/902) + ## 2025-06-22 - Runtime 0.18.0-alpha.10 - fix: more cluster resiliency improvements [#901](https://github.com/hypermodeinc/modus/pull/901) diff --git a/runtime/actors/actorlogger.go b/runtime/actors/actorlogger.go index 38f7a152a..61418fa56 100644 --- a/runtime/actors/actorlogger.go +++ b/runtime/actors/actorlogger.go @@ -38,8 +38,14 @@ type actorLogger struct { logger *zerolog.Logger } +var ignoredMessages = map[string]bool{ + "Failed to acquire semaphore: context canceled": true, // normal during shutdown +} + func (al *actorLogger) writeToLog(level zerolog.Level, msg string) { - al.logger.WithLevel(level).Msg(msg) + if !ignoredMessages[msg] { + al.logger.WithLevel(level).Msg(msg) + } } func (al *actorLogger) Debug(v ...any) { diff --git a/runtime/actors/actorsystem.go b/runtime/actors/actorsystem.go index cb5fe46cf..5217e50d8 100644 --- a/runtime/actors/actorsystem.go +++ b/runtime/actors/actorsystem.go @@ -76,6 +76,10 @@ func startActorSystem(ctx context.Context, actorSystem goakt.ActorSystem) error retryInterval *= 2 // Exponential backoff continue } + + // important: wait for the actor system to sync with the cluster before proceeding + waitForClusterSync(ctx) + return nil } @@ -98,12 +102,10 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { } } - // do this next part in a goroutine to avoid blocking the cluster engine startup + // spawn actors for agents with state in the database, that are not already running + // check both locally and on remote nodes in the cluster + // do this in a goroutine to avoid blocking the cluster engine startup go func() { - waitForClusterSync() - - // spawn actors for agents with state in the database, that are not already running - // check both locally and on remote nodes in the cluster logger.Debug(ctx).Msg("Restoring agent actors from database.") agents, err := db.QueryActiveAgents(ctx) if err != nil { @@ -167,9 +169,15 @@ func beforeShutdown(ctx context.Context) error { return nil } -func waitForClusterSync() { +// Waits for the peer sync interval to pass, allowing time for the actor system to synchronize its +// list of actors with the remote nodes in the cluster. Cancels early if the context is done. +func waitForClusterSync(ctx context.Context) { if clusterEnabled() { - time.Sleep(nodesSyncInterval()) + select { + case <-time.After(peerSyncInterval()): + case <-ctx.Done(): + logger.Warn(context.WithoutCancel(ctx)).Msg("Context cancelled while waiting for cluster sync.") + } } } diff --git a/runtime/actors/agents.go b/runtime/actors/agents.go index b92b8fa06..67897c508 100644 --- a/runtime/actors/agents.go +++ b/runtime/actors/agents.go @@ -100,6 +100,13 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri PluginName: pluginName, }), ) + + // Important: Wait for the actor system to sync with the cluster before proceeding. + // This ensures consistency across the cluster, so we don't accidentally spawn the same actor multiple times. + // GoAkt does not resolve such inconsistencies automatically, so we need to handle this manually. + // A short sync time should not be noticeable by the user. + waitForClusterSync(ctx) + return err } diff --git a/runtime/actors/cluster.go b/runtime/actors/cluster.go index 568920dbb..585990f78 100644 --- a/runtime/actors/cluster.go +++ b/runtime/actors/cluster.go @@ -55,27 +55,15 @@ func clusterOptions(ctx context.Context) []goakt.Option { logger.Fatal(ctx).Err(err).Msg("Failed to create cluster discovery provider.") } - var remotingHost string - if app.IsDevEnvironment() { - // only bind to localhost in development - remotingHost = "127.0.0.1" - } else { - // otherwise bind to all interfaces - remotingHost = "0.0.0.0" - } - - readTimeout := getDurationFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 2, time.Second) - writeTimeout := getDurationFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 2, time.Second) - return []goakt.Option{ - goakt.WithRemote(remote.NewConfig(remotingHost, remotingPort)), + goakt.WithRemote(remote.NewConfig(remotingHost(), remotingPort)), goakt.WithCluster(goakt.NewClusterConfig(). WithDiscovery(disco). WithDiscoveryPort(discoveryPort). WithPeersPort(peersPort). - WithReadTimeout(readTimeout). - WithWriteTimeout(writeTimeout). - // WithPartitionCount(3). + WithReadTimeout(readTimeout()). + WithWriteTimeout(writeTimeout()). + WithPartitionCount(partitionCount()). WithClusterStateSyncInterval(nodesSyncInterval()). WithPeersStateSyncInterval(peerSyncInterval()). WithKinds(&wasmAgentActor{}, &subscriptionActor{}), @@ -162,6 +150,18 @@ func clusterHost() string { } } +// remotingHost returns the host address to bind the remoting system to. +func remotingHost() string { + // only bind to localhost in development + if app.IsDevEnvironment() { + return "127.0.0.1" + } + + // otherwise bind to all interfaces + return "0.0.0.0" +} + +// clusterPorts returns the ports used for discovery, remoting, and peer communication in the cluster. func clusterPorts() (discoveryPort, remotingPort, peersPort int) { // Get default ports dynamically, but use environment variables if set @@ -173,19 +173,47 @@ func clusterPorts() (discoveryPort, remotingPort, peersPort int) { return } -// peerSyncInterval returns the interval at which the cluster peers sync their list of actors across the cluster. +// peerSyncInterval returns the interval at which the actor system will sync its list of actors to other nodes across the cluster. // We use a tight sync interval of 1 second by default, to ensure quick peer discovery as agents are added or removed. +// +// This value is also used for a sleep both on system startup and when spawning a new agent actor, +// so it needs to be low enough to not be noticed by the user. func peerSyncInterval() time.Duration { return getDurationFromEnv("MODUS_CLUSTER_PEER_SYNC_SECONDS", 1, time.Second) } -// nodesSyncInterval returns the interval at which the cluster syncs the list of active nodes across the cluster. -// On each interval, discovery will be triggered to find new nodes and update the cluster state. +// nodesSyncInterval returns the interval at which the cluster forces a resync of the list of active nodes across the cluster. +// This matters only with regard to nodes going down unexpectedly, as other nodes in the cluster will not be aware of the change until the next sync. +// It does not affect anything if a node is gracefully shut down, as that will be communicated immediately during the shutdown process. +// +// On each interval, the node will sync its list of nodes with the cluster, and update its local state accordingly. // The default is 10 seconds, which is a reasonable balance between responsiveness and network overhead. func nodesSyncInterval() time.Duration { return getDurationFromEnv("MODUS_CLUSTER_NODES_SYNC_SECONDS", 10, time.Second) } +// partitionCount returns the number of partitions the cluster will use for actor distribution. +// It must be a prime number to work properly with the actor system's hashing algorithm. +// It must be greater than the number of nodes in the cluster, but not too large to avoid excessive overhead. +// In testing, 23 is the highest that works well with the other default timing constraints. +// We'll use a slightly lower default of 13, which is still a prime number and should work well for most clusters. +// The GoAkt default is 271, but this has been found to lead to other errors in practice. +func partitionCount() uint64 { + return uint64(getIntFromEnv("MODUS_CLUSTER_PARTITION_COUNT", 13)) +} + +// readTimeout returns the duration to wait for a cluster read operation before timing out. +// The default is 1 second, which should usually not need to be changed. +func readTimeout() time.Duration { + return getDurationFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 1, time.Second) +} + +// writeTimeout returns the duration to wait for a cluster write operation before timing out. +// The default is 1 second, which should usually not need to be changed. +func writeTimeout() time.Duration { + return getDurationFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 1, time.Second) +} + func getPodLabels() map[string]string { // example value: "app.kubernetes.io/name=modus,app.kubernetes.io/component=runtime" if labels := os.Getenv("MODUS_CLUSTER_POD_LABELS"); labels != "" { diff --git a/runtime/go.mod b/runtime/go.mod index 4e1f84bb9..8a8928eff 100644 --- a/runtime/go.mod +++ b/runtime/go.mod @@ -42,7 +42,7 @@ require ( github.com/tetratelabs/wazero v1.9.0 github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 - github.com/tochemey/goakt/v3 v3.6.5 + github.com/tochemey/goakt/v3 v3.6.6-0.20250623081618-c24f07caefce github.com/travisjeffery/go-dynaport v1.0.0 github.com/twpayne/go-geom v1.6.1 github.com/wundergraph/graphql-go-tools/execution v1.3.2-0.20250618131920-dd0d9cc2a919 diff --git a/runtime/go.sum b/runtime/go.sum index 487023a93..7dd7fda0d 100644 --- a/runtime/go.sum +++ b/runtime/go.sum @@ -860,8 +860,8 @@ github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDW github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po= github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0= -github.com/tochemey/goakt/v3 v3.6.5 h1:Iun9Tv6L4xLmof0OzY3HH5zUR03AowiZDCTxB+dVaSo= -github.com/tochemey/goakt/v3 v3.6.5/go.mod h1:6HHDMI+BXFPt+Pq59svoGdxXTme/UsqX868FG1gICO4= +github.com/tochemey/goakt/v3 v3.6.6-0.20250623081618-c24f07caefce h1:mxxNUEmw4KRbXgIWQgnv/VyeP95kSJi3kh+vBh//OyM= +github.com/tochemey/goakt/v3 v3.6.6-0.20250623081618-c24f07caefce/go.mod h1:6HHDMI+BXFPt+Pq59svoGdxXTme/UsqX868FG1gICO4= github.com/tochemey/olric v0.2.3 h1:LGmsHLQBSEs3uasZNLT5MdS2pBMNJ71gSrXnYfkb62M= github.com/tochemey/olric v0.2.3/go.mod h1:BAD82xys8R8IAWFV+GC0B8I+J4QsYZvmPS5NT/dhmtI= github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw=