Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

# Change Log

## 2025-06-23 - Runtime 0.18.0-alpha.11

- fix: adjust cluster sync settings and delays [#902](https://github.com/hypermodeinc/modus/pull/902)

## 2025-06-22 - Runtime 0.18.0-alpha.10

- fix: more cluster resiliency improvements [#901](https://github.com/hypermodeinc/modus/pull/901)
Expand Down
8 changes: 7 additions & 1 deletion runtime/actors/actorlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,14 @@ type actorLogger struct {
logger *zerolog.Logger
}

var ignoredMessages = map[string]bool{
"Failed to acquire semaphore: context canceled": true, // normal during shutdown
}

func (al *actorLogger) writeToLog(level zerolog.Level, msg string) {
al.logger.WithLevel(level).Msg(msg)
if !ignoredMessages[msg] {
al.logger.WithLevel(level).Msg(msg)
}
}

func (al *actorLogger) Debug(v ...any) {
Expand Down
22 changes: 15 additions & 7 deletions runtime/actors/actorsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func startActorSystem(ctx context.Context, actorSystem goakt.ActorSystem) error
retryInterval *= 2 // Exponential backoff
continue
}

// important: wait for the actor system to sync with the cluster before proceeding
waitForClusterSync(ctx)

return nil
}

Expand All @@ -98,12 +102,10 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
}
}

// do this next part in a goroutine to avoid blocking the cluster engine startup
// spawn actors for agents with state in the database, that are not already running
// check both locally and on remote nodes in the cluster
// do this in a goroutine to avoid blocking the cluster engine startup
go func() {
waitForClusterSync()

// spawn actors for agents with state in the database, that are not already running
// check both locally and on remote nodes in the cluster
logger.Debug(ctx).Msg("Restoring agent actors from database.")
agents, err := db.QueryActiveAgents(ctx)
if err != nil {
Expand Down Expand Up @@ -167,9 +169,15 @@ func beforeShutdown(ctx context.Context) error {
return nil
}

func waitForClusterSync() {
// Waits for the peer sync interval to pass, allowing time for the actor system to synchronize its
// list of actors with the remote nodes in the cluster. Cancels early if the context is done.
func waitForClusterSync(ctx context.Context) {
if clusterEnabled() {
time.Sleep(nodesSyncInterval())
select {
case <-time.After(peerSyncInterval()):
case <-ctx.Done():
logger.Warn(context.WithoutCancel(ctx)).Msg("Context cancelled while waiting for cluster sync.")
}
}
}

Expand Down
7 changes: 7 additions & 0 deletions runtime/actors/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri
PluginName: pluginName,
}),
)

// Important: Wait for the actor system to sync with the cluster before proceeding.
// This ensures consistency across the cluster, so we don't accidentally spawn the same actor multiple times.
// GoAkt does not resolve such inconsistencies automatically, so we need to handle this manually.
// A short sync time should not be noticeable by the user.
waitForClusterSync(ctx)

return err
}

Expand Down
66 changes: 47 additions & 19 deletions runtime/actors/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,27 +55,15 @@ func clusterOptions(ctx context.Context) []goakt.Option {
logger.Fatal(ctx).Err(err).Msg("Failed to create cluster discovery provider.")
}

var remotingHost string
if app.IsDevEnvironment() {
// only bind to localhost in development
remotingHost = "127.0.0.1"
} else {
// otherwise bind to all interfaces
remotingHost = "0.0.0.0"
}

readTimeout := getDurationFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 2, time.Second)
writeTimeout := getDurationFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 2, time.Second)

return []goakt.Option{
goakt.WithRemote(remote.NewConfig(remotingHost, remotingPort)),
goakt.WithRemote(remote.NewConfig(remotingHost(), remotingPort)),
goakt.WithCluster(goakt.NewClusterConfig().
WithDiscovery(disco).
WithDiscoveryPort(discoveryPort).
WithPeersPort(peersPort).
WithReadTimeout(readTimeout).
WithWriteTimeout(writeTimeout).
// WithPartitionCount(3).
WithReadTimeout(readTimeout()).
WithWriteTimeout(writeTimeout()).
WithPartitionCount(partitionCount()).
WithClusterStateSyncInterval(nodesSyncInterval()).
WithPeersStateSyncInterval(peerSyncInterval()).
WithKinds(&wasmAgentActor{}, &subscriptionActor{}),
Expand Down Expand Up @@ -162,6 +150,18 @@ func clusterHost() string {
}
}

// remotingHost returns the host address to bind the remoting system to.
func remotingHost() string {
// only bind to localhost in development
if app.IsDevEnvironment() {
return "127.0.0.1"
}

// otherwise bind to all interfaces
return "0.0.0.0"
}

// clusterPorts returns the ports used for discovery, remoting, and peer communication in the cluster.
func clusterPorts() (discoveryPort, remotingPort, peersPort int) {

// Get default ports dynamically, but use environment variables if set
Expand All @@ -173,19 +173,47 @@ func clusterPorts() (discoveryPort, remotingPort, peersPort int) {
return
}

// peerSyncInterval returns the interval at which the cluster peers sync their list of actors across the cluster.
// peerSyncInterval returns the interval at which the actor system will sync its list of actors to other nodes across the cluster.
// We use a tight sync interval of 1 second by default, to ensure quick peer discovery as agents are added or removed.
//
// This value is also used for a sleep both on system startup and when spawning a new agent actor,
// so it needs to be low enough to not be noticed by the user.
func peerSyncInterval() time.Duration {
return getDurationFromEnv("MODUS_CLUSTER_PEER_SYNC_SECONDS", 1, time.Second)
}

// nodesSyncInterval returns the interval at which the cluster syncs the list of active nodes across the cluster.
// On each interval, discovery will be triggered to find new nodes and update the cluster state.
// nodesSyncInterval returns the interval at which the cluster forces a resync of the list of active nodes across the cluster.
// This matters only with regard to nodes going down unexpectedly, as other nodes in the cluster will not be aware of the change until the next sync.
// It does not affect anything if a node is gracefully shut down, as that will be communicated immediately during the shutdown process.
//
// On each interval, the node will sync its list of nodes with the cluster, and update its local state accordingly.
// The default is 10 seconds, which is a reasonable balance between responsiveness and network overhead.
func nodesSyncInterval() time.Duration {
return getDurationFromEnv("MODUS_CLUSTER_NODES_SYNC_SECONDS", 10, time.Second)
}

// partitionCount returns the number of partitions the cluster will use for actor distribution.
// It must be a prime number to work properly with the actor system's hashing algorithm.
// It must be greater than the number of nodes in the cluster, but not too large to avoid excessive overhead.
// In testing, 23 is the highest that works well with the other default timing constraints.
// We'll use a slightly lower default of 13, which is still a prime number and should work well for most clusters.
// The GoAkt default is 271, but this has been found to lead to other errors in practice.
func partitionCount() uint64 {
return uint64(getIntFromEnv("MODUS_CLUSTER_PARTITION_COUNT", 13))
}

// readTimeout returns the duration to wait for a cluster read operation before timing out.
// The default is 1 second, which should usually not need to be changed.
func readTimeout() time.Duration {
return getDurationFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 1, time.Second)
}

// writeTimeout returns the duration to wait for a cluster write operation before timing out.
// The default is 1 second, which should usually not need to be changed.
func writeTimeout() time.Duration {
return getDurationFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 1, time.Second)
}

func getPodLabels() map[string]string {
// example value: "app.kubernetes.io/name=modus,app.kubernetes.io/component=runtime"
if labels := os.Getenv("MODUS_CLUSTER_POD_LABELS"); labels != "" {
Expand Down
2 changes: 1 addition & 1 deletion runtime/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/tetratelabs/wazero v1.9.0
github.com/tidwall/gjson v1.18.0
github.com/tidwall/sjson v1.2.5
github.com/tochemey/goakt/v3 v3.6.5
github.com/tochemey/goakt/v3 v3.6.6-0.20250623081618-c24f07caefce
github.com/travisjeffery/go-dynaport v1.0.0
github.com/twpayne/go-geom v1.6.1
github.com/wundergraph/graphql-go-tools/execution v1.3.2-0.20250618131920-dd0d9cc2a919
Expand Down
4 changes: 2 additions & 2 deletions runtime/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -860,8 +860,8 @@ github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDW
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po=
github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0=
github.com/tochemey/goakt/v3 v3.6.5 h1:Iun9Tv6L4xLmof0OzY3HH5zUR03AowiZDCTxB+dVaSo=
github.com/tochemey/goakt/v3 v3.6.5/go.mod h1:6HHDMI+BXFPt+Pq59svoGdxXTme/UsqX868FG1gICO4=
github.com/tochemey/goakt/v3 v3.6.6-0.20250623081618-c24f07caefce h1:mxxNUEmw4KRbXgIWQgnv/VyeP95kSJi3kh+vBh//OyM=
github.com/tochemey/goakt/v3 v3.6.6-0.20250623081618-c24f07caefce/go.mod h1:6HHDMI+BXFPt+Pq59svoGdxXTme/UsqX868FG1gICO4=
github.com/tochemey/olric v0.2.3 h1:LGmsHLQBSEs3uasZNLT5MdS2pBMNJ71gSrXnYfkb62M=
github.com/tochemey/olric v0.2.3/go.mod h1:BAD82xys8R8IAWFV+GC0B8I+J4QsYZvmPS5NT/dhmtI=
github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw=
Expand Down
Loading