Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.

Commit d9c393a

Browse files
fix: adjust cluster sync settings and delays (#902)
1 parent 5e780f9 commit d9c393a

File tree

7 files changed

+83
-30
lines changed

7 files changed

+83
-30
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
# Change Log
44

5+
## 2025-06-23 - Runtime 0.18.0-alpha.11
6+
7+
- fix: adjust cluster sync settings and delays [#902](https://github.com/hypermodeinc/modus/pull/902)
8+
59
## 2025-06-22 - Runtime 0.18.0-alpha.10
610

711
- fix: more cluster resiliency improvements [#901](https://github.com/hypermodeinc/modus/pull/901)

runtime/actors/actorlogger.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,14 @@ type actorLogger struct {
3838
logger *zerolog.Logger
3939
}
4040

41+
var ignoredMessages = map[string]bool{
42+
"Failed to acquire semaphore: context canceled": true, // normal during shutdown
43+
}
44+
4145
func (al *actorLogger) writeToLog(level zerolog.Level, msg string) {
42-
al.logger.WithLevel(level).Msg(msg)
46+
if !ignoredMessages[msg] {
47+
al.logger.WithLevel(level).Msg(msg)
48+
}
4349
}
4450

4551
func (al *actorLogger) Debug(v ...any) {

runtime/actors/actorsystem.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ func startActorSystem(ctx context.Context, actorSystem goakt.ActorSystem) error
7676
retryInterval *= 2 // Exponential backoff
7777
continue
7878
}
79+
80+
// important: wait for the actor system to sync with the cluster before proceeding
81+
waitForClusterSync(ctx)
82+
7983
return nil
8084
}
8185

@@ -98,12 +102,10 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
98102
}
99103
}
100104

101-
// do this next part in a goroutine to avoid blocking the cluster engine startup
105+
// spawn actors for agents with state in the database, that are not already running
106+
// check both locally and on remote nodes in the cluster
107+
// do this in a goroutine to avoid blocking the cluster engine startup
102108
go func() {
103-
waitForClusterSync()
104-
105-
// spawn actors for agents with state in the database, that are not already running
106-
// check both locally and on remote nodes in the cluster
107109
logger.Debug(ctx).Msg("Restoring agent actors from database.")
108110
agents, err := db.QueryActiveAgents(ctx)
109111
if err != nil {
@@ -167,9 +169,15 @@ func beforeShutdown(ctx context.Context) error {
167169
return nil
168170
}
169171

170-
func waitForClusterSync() {
172+
// Waits for the peer sync interval to pass, allowing time for the actor system to synchronize its
173+
// list of actors with the remote nodes in the cluster. Cancels early if the context is done.
174+
func waitForClusterSync(ctx context.Context) {
171175
if clusterEnabled() {
172-
time.Sleep(nodesSyncInterval())
176+
select {
177+
case <-time.After(peerSyncInterval()):
178+
case <-ctx.Done():
179+
logger.Warn(context.WithoutCancel(ctx)).Msg("Context cancelled while waiting for cluster sync.")
180+
}
173181
}
174182
}
175183

runtime/actors/agents.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,13 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri
100100
PluginName: pluginName,
101101
}),
102102
)
103+
104+
// Important: Wait for the actor system to sync with the cluster before proceeding.
105+
// This ensures consistency across the cluster, so we don't accidentally spawn the same actor multiple times.
106+
// GoAkt does not resolve such inconsistencies automatically, so we need to handle this manually.
107+
// A short sync time should not be noticeable by the user.
108+
waitForClusterSync(ctx)
109+
103110
return err
104111
}
105112

runtime/actors/cluster.go

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -55,27 +55,15 @@ func clusterOptions(ctx context.Context) []goakt.Option {
5555
logger.Fatal(ctx).Err(err).Msg("Failed to create cluster discovery provider.")
5656
}
5757

58-
var remotingHost string
59-
if app.IsDevEnvironment() {
60-
// only bind to localhost in development
61-
remotingHost = "127.0.0.1"
62-
} else {
63-
// otherwise bind to all interfaces
64-
remotingHost = "0.0.0.0"
65-
}
66-
67-
readTimeout := getDurationFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 2, time.Second)
68-
writeTimeout := getDurationFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 2, time.Second)
69-
7058
return []goakt.Option{
71-
goakt.WithRemote(remote.NewConfig(remotingHost, remotingPort)),
59+
goakt.WithRemote(remote.NewConfig(remotingHost(), remotingPort)),
7260
goakt.WithCluster(goakt.NewClusterConfig().
7361
WithDiscovery(disco).
7462
WithDiscoveryPort(discoveryPort).
7563
WithPeersPort(peersPort).
76-
WithReadTimeout(readTimeout).
77-
WithWriteTimeout(writeTimeout).
78-
// WithPartitionCount(3).
64+
WithReadTimeout(readTimeout()).
65+
WithWriteTimeout(writeTimeout()).
66+
WithPartitionCount(partitionCount()).
7967
WithClusterStateSyncInterval(nodesSyncInterval()).
8068
WithPeersStateSyncInterval(peerSyncInterval()).
8169
WithKinds(&wasmAgentActor{}, &subscriptionActor{}),
@@ -162,6 +150,18 @@ func clusterHost() string {
162150
}
163151
}
164152

153+
// remotingHost returns the host address to bind the remoting system to.
154+
func remotingHost() string {
155+
// only bind to localhost in development
156+
if app.IsDevEnvironment() {
157+
return "127.0.0.1"
158+
}
159+
160+
// otherwise bind to all interfaces
161+
return "0.0.0.0"
162+
}
163+
164+
// clusterPorts returns the ports used for discovery, remoting, and peer communication in the cluster.
165165
func clusterPorts() (discoveryPort, remotingPort, peersPort int) {
166166

167167
// Get default ports dynamically, but use environment variables if set
@@ -173,19 +173,47 @@ func clusterPorts() (discoveryPort, remotingPort, peersPort int) {
173173
return
174174
}
175175

176-
// peerSyncInterval returns the interval at which the cluster peers sync their list of actors across the cluster.
176+
// peerSyncInterval returns the interval at which the actor system will sync its list of actors to other nodes across the cluster.
177177
// We use a tight sync interval of 1 second by default, to ensure quick peer discovery as agents are added or removed.
178+
//
179+
// This value is also used for a sleep both on system startup and when spawning a new agent actor,
180+
// so it needs to be low enough to not be noticed by the user.
178181
func peerSyncInterval() time.Duration {
179182
return getDurationFromEnv("MODUS_CLUSTER_PEER_SYNC_SECONDS", 1, time.Second)
180183
}
181184

182-
// nodesSyncInterval returns the interval at which the cluster syncs the list of active nodes across the cluster.
183-
// On each interval, discovery will be triggered to find new nodes and update the cluster state.
185+
// nodesSyncInterval returns the interval at which the cluster forces a resync of the list of active nodes across the cluster.
186+
// This matters only with regard to nodes going down unexpectedly, as other nodes in the cluster will not be aware of the change until the next sync.
187+
// It does not affect anything if a node is gracefully shut down, as that will be communicated immediately during the shutdown process.
188+
//
189+
// On each interval, the node will sync its list of nodes with the cluster, and update its local state accordingly.
184190
// The default is 10 seconds, which is a reasonable balance between responsiveness and network overhead.
185191
func nodesSyncInterval() time.Duration {
186192
return getDurationFromEnv("MODUS_CLUSTER_NODES_SYNC_SECONDS", 10, time.Second)
187193
}
188194

195+
// partitionCount returns the number of partitions the cluster will use for actor distribution.
196+
// It must be a prime number to work properly with the actor system's hashing algorithm.
197+
// It must be greater than the number of nodes in the cluster, but not too large to avoid excessive overhead.
198+
// In testing, 23 is the highest that works well with the other default timing constraints.
199+
// We'll use a slightly lower default of 13, which is still a prime number and should work well for most clusters.
200+
// The GoAkt default is 271, but this has been found to lead to other errors in practice.
201+
func partitionCount() uint64 {
202+
return uint64(getIntFromEnv("MODUS_CLUSTER_PARTITION_COUNT", 13))
203+
}
204+
205+
// readTimeout returns the duration to wait for a cluster read operation before timing out.
206+
// The default is 1 second, which should usually not need to be changed.
207+
func readTimeout() time.Duration {
208+
return getDurationFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 1, time.Second)
209+
}
210+
211+
// writeTimeout returns the duration to wait for a cluster write operation before timing out.
212+
// The default is 1 second, which should usually not need to be changed.
213+
func writeTimeout() time.Duration {
214+
return getDurationFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 1, time.Second)
215+
}
216+
189217
func getPodLabels() map[string]string {
190218
// example value: "app.kubernetes.io/name=modus,app.kubernetes.io/component=runtime"
191219
if labels := os.Getenv("MODUS_CLUSTER_POD_LABELS"); labels != "" {

runtime/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ require (
4242
github.com/tetratelabs/wazero v1.9.0
4343
github.com/tidwall/gjson v1.18.0
4444
github.com/tidwall/sjson v1.2.5
45-
github.com/tochemey/goakt/v3 v3.6.5
45+
github.com/tochemey/goakt/v3 v3.6.6-0.20250623081618-c24f07caefce
4646
github.com/travisjeffery/go-dynaport v1.0.0
4747
github.com/twpayne/go-geom v1.6.1
4848
github.com/wundergraph/graphql-go-tools/execution v1.3.2-0.20250618131920-dd0d9cc2a919

runtime/go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -860,8 +860,8 @@ github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDW
860860
github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg=
861861
github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po=
862862
github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0=
863-
github.com/tochemey/goakt/v3 v3.6.5 h1:Iun9Tv6L4xLmof0OzY3HH5zUR03AowiZDCTxB+dVaSo=
864-
github.com/tochemey/goakt/v3 v3.6.5/go.mod h1:6HHDMI+BXFPt+Pq59svoGdxXTme/UsqX868FG1gICO4=
863+
github.com/tochemey/goakt/v3 v3.6.6-0.20250623081618-c24f07caefce h1:mxxNUEmw4KRbXgIWQgnv/VyeP95kSJi3kh+vBh//OyM=
864+
github.com/tochemey/goakt/v3 v3.6.6-0.20250623081618-c24f07caefce/go.mod h1:6HHDMI+BXFPt+Pq59svoGdxXTme/UsqX868FG1gICO4=
865865
github.com/tochemey/olric v0.2.3 h1:LGmsHLQBSEs3uasZNLT5MdS2pBMNJ71gSrXnYfkb62M=
866866
github.com/tochemey/olric v0.2.3/go.mod h1:BAD82xys8R8IAWFV+GC0B8I+J4QsYZvmPS5NT/dhmtI=
867867
github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw=

0 commit comments

Comments
 (0)