Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.

Commit 8fc98dc

Browse files
fix: improve cluster resiliency (#900)
1 parent ec965ba commit 8fc98dc

File tree

9 files changed

+149
-62
lines changed

9 files changed

+149
-62
lines changed

.trunk/configs/cspell.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
"mapdata",
118118
"mapstructure",
119119
"mattn",
120+
"memberlist",
120121
"metagen",
121122
"millis",
122123
"minilm",

.vscode/launch.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"MODUS_ENV": "dev",
3030
"MODUS_CLUSTER_MODE": "NATS",
3131
"MODUS_CLUSTER_NATS_URL": "nats://localhost:4222",
32-
"MODUS_DEBUG_ACTORS": "true",
32+
// "MODUS_DEBUG_ACTORS": "true",
3333
"MODUS_DEBUG": "true",
3434
"MODUS_USE_MODUSDB": "false",
3535
"MODUS_DB": "postgresql://postgres:postgres@localhost:5432/modus?sslmode=disable" // checkov:skip=CKV_SECRET_4

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
# Change Log
44

5+
## 2025-06-21 - Runtime 0.18.0-alpha.9
6+
7+
- fix: improve cluster resiliency [#900](https://github.com/hypermodeinc/modus/pull/900)
8+
59
## 2025-06-18 - Runtime 0.18.0-alpha.8
610

711
- feat: cluster mode [#895](https://github.com/hypermodeinc/modus/pull/895)

runtime/actors/actorlogger.go

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,61 +13,91 @@ import (
1313
"fmt"
1414
"io"
1515
"log"
16+
"slices"
17+
"strings"
1618

1719
"github.com/hypermodeinc/modus/runtime/logger"
1820
"github.com/hypermodeinc/modus/runtime/utils"
1921
"github.com/rs/zerolog"
2022
actorLog "github.com/tochemey/goakt/v3/log"
2123
)
2224

25+
// some messages are ignored during shutdown because they are expected
26+
var shutdownIgnoredMessages = []string{
27+
"Failed to acquire semaphore: context canceled",
28+
" is down. modus is going to shutdown.",
29+
}
30+
2331
func newActorLogger(logger *zerolog.Logger) *actorLogger {
2432

2533
var minLevel zerolog.Level
2634
if utils.EnvVarFlagEnabled("MODUS_DEBUG_ACTORS") {
2735
minLevel = zerolog.DebugLevel
2836
} else {
29-
// goakt info level is too noisy, so default to show only errors
30-
minLevel = zerolog.ErrorLevel
37+
// goakt info level is too noisy, so default to show warnings and above
38+
minLevel = zerolog.WarnLevel
3139
}
3240

3341
l := logger.Level(minLevel).With().Str("component", "actors").Logger()
34-
return &actorLogger{&l}
42+
return &actorLogger{logger: &l}
3543
}
3644

3745
type actorLogger struct {
38-
logger *zerolog.Logger
46+
logger *zerolog.Logger
47+
paused bool
48+
shuttingDown bool
49+
}
50+
51+
func (al *actorLogger) Pause() {
52+
al.paused = true
53+
}
54+
55+
func (al *actorLogger) Resume() {
56+
al.paused = false
57+
}
58+
59+
func (al *actorLogger) writeToLog(level zerolog.Level, msg string) {
60+
if al.paused {
61+
return
62+
}
63+
if al.shuttingDown && slices.ContainsFunc(shutdownIgnoredMessages, func(s string) bool {
64+
return strings.Contains(msg, s)
65+
}) {
66+
return
67+
}
68+
al.logger.WithLevel(level).Msg(msg)
3969
}
4070

4171
func (al *actorLogger) Debug(v ...any) {
42-
al.logger.Debug().Msg(fmt.Sprint(v...))
72+
al.writeToLog(zerolog.DebugLevel, fmt.Sprint(v...))
4373
}
4474

4575
func (al *actorLogger) Debugf(format string, v ...any) {
46-
al.logger.Debug().Msgf(format, v...)
76+
al.writeToLog(zerolog.DebugLevel, fmt.Sprintf(format, v...))
4777
}
4878

4979
func (al *actorLogger) Info(v ...any) {
50-
al.logger.Info().Msg(fmt.Sprint(v...))
80+
al.writeToLog(zerolog.InfoLevel, fmt.Sprint(v...))
5181
}
5282

5383
func (al *actorLogger) Infof(format string, v ...any) {
54-
al.logger.Info().Msgf(format, v...)
84+
al.writeToLog(zerolog.InfoLevel, fmt.Sprintf(format, v...))
5585
}
5686

5787
func (al *actorLogger) Warn(v ...any) {
58-
al.logger.Warn().Msg(fmt.Sprint(v...))
88+
al.writeToLog(zerolog.WarnLevel, fmt.Sprint(v...))
5989
}
6090

6191
func (al *actorLogger) Warnf(format string, v ...any) {
62-
al.logger.Warn().Msgf(format, v...)
92+
al.writeToLog(zerolog.WarnLevel, fmt.Sprintf(format, v...))
6393
}
6494

6595
func (al *actorLogger) Error(v ...any) {
66-
al.logger.Error().Msg(fmt.Sprint(v...))
96+
al.writeToLog(zerolog.ErrorLevel, fmt.Sprint(v...))
6797
}
6898

6999
func (al *actorLogger) Errorf(format string, v ...any) {
70-
al.logger.Error().Msgf(format, v...)
100+
al.writeToLog(zerolog.ErrorLevel, fmt.Sprintf(format, v...))
71101
}
72102

73103
func (al *actorLogger) Fatal(v ...any) {

runtime/actors/actorsystem.go

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package actors
1111

1212
import (
1313
"context"
14+
"fmt"
1415
"time"
1516

1617
"github.com/hypermodeinc/modus/runtime/db"
@@ -33,6 +34,7 @@ func Initialize(ctx context.Context) {
3334

3435
opts := []goakt.Option{
3536
goakt.WithLogger(newActorLogger(logger.Get(ctx))),
37+
goakt.WithCoordinatedShutdown(beforeShutdown),
3638
goakt.WithPubSub(),
3739
goakt.WithActorInitTimeout(10 * time.Second), // TODO: adjust this value, or make it configurable
3840
goakt.WithActorInitMaxRetries(1), // TODO: adjust this value, or make it configurable
@@ -50,6 +52,8 @@ func Initialize(ctx context.Context) {
5052
_actorSystem = actorSystem
5153
}
5254

55+
waitForClusterSync()
56+
5357
logger.Info(ctx).Msg("Actor system started.")
5458

5559
pluginmanager.RegisterPluginLoadedCallback(loadAgentActors)
@@ -70,54 +74,76 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
7074

7175
// spawn actors for agents with state in the database, that are not already running
7276
// check both locally and on remote nodes in the cluster
77+
logger.Debug(ctx).Msg("Restoring agent actors from database.")
7378
agents, err := db.QueryActiveAgents(ctx)
7479
if err != nil {
75-
logger.Err(ctx, err).Msg("Failed to query agents from database.")
76-
return err
80+
return fmt.Errorf("failed to query active agents: %w", err)
7781
}
82+
inCluster := _actorSystem.InCluster()
7883
for _, agent := range agents {
7984
if !localAgents[agent.Id] {
80-
if _actorSystem.InCluster() {
85+
if inCluster {
8186
actorName := getActorName(agent.Id)
82-
if _, err := _actorSystem.RemoteActor(ctx, actorName); err == nil {
83-
// found actor in cluster, no need to spawn it again
87+
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
88+
logger.Err(ctx, err).Msgf("Failed to check if actor %s exists in cluster.", actorName)
89+
} else if exists {
90+
// if the actor already exists in the cluster, skip spawning it
8491
continue
8592
}
8693
}
87-
go func(f_ctx context.Context, pluginName, agentId, agentName string) {
88-
if err := spawnActorForAgent(f_ctx, pluginName, agentId, agentName, false); err != nil {
89-
logger.Err(f_ctx, err).Msgf("Failed to spawn actor for agent %s.", agentId)
90-
}
91-
}(ctx, plugin.Name(), agent.Id, agent.Name)
94+
if err := spawnActorForAgent(ctx, plugin.Name(), agent.Id, agent.Name, false); err != nil {
95+
logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id)
96+
}
9297
}
9398
}
9499

95100
return nil
96101
}
97102

98-
func beforeShutdown(ctx context.Context) {
103+
func beforeShutdown(ctx context.Context) error {
104+
_actorSystem.Logger().(*actorLogger).shuttingDown = true
99105
logger.Info(ctx).Msg("Actor system shutting down...")
106+
actors := _actorSystem.Actors()
100107

101-
// stop all agent actors before shutdown so they can suspend properly
102-
for _, pid := range _actorSystem.Actors() {
103-
if _, ok := pid.Actor().(*wasmAgentActor); ok {
108+
// Suspend all local running agent actors first, which allows them to gracefully stop and persist their state.
109+
// In cluster mode, this will also allow the actor to resume on another node after this node shuts down.
110+
for _, pid := range actors {
111+
if actor, ok := pid.Actor().(*wasmAgentActor); ok && pid.IsRunning() {
112+
if actor.status == AgentStatusRunning {
113+
ctx := actor.augmentContext(ctx, pid)
114+
if err := actor.suspendAgent(ctx); err != nil {
115+
logger.Err(ctx, err).Str("agent_id", actor.agentId).Msg("Failed to suspend agent actor.")
116+
}
117+
}
118+
}
119+
}
104120

105-
// pass the pid so it can be used during shutdown as an event sender
106-
ctx := context.WithValue(ctx, pidContextKey{}, pid)
121+
// Then shut down subscription actors. They will have received the suspend message already.
122+
for _, pid := range actors {
123+
if _, ok := pid.Actor().(*subscriptionActor); ok && pid.IsRunning() {
107124
if err := pid.Shutdown(ctx); err != nil {
108125
logger.Err(ctx, err).Msgf("Failed to shutdown actor %s.", pid.Name())
109126
}
110127
}
111128
}
129+
130+
waitForClusterSync()
131+
132+
// then allow the actor system to continue with its shutdown process
133+
return nil
134+
}
135+
136+
func waitForClusterSync() {
137+
if clusterEnabled() {
138+
time.Sleep(peerSyncInterval() * 2)
139+
}
112140
}
113141

114142
func Shutdown(ctx context.Context) {
115143
if _actorSystem == nil {
116-
return
144+
logger.Fatal(ctx).Msg("Actor system is not initialized, cannot shutdown.")
117145
}
118146

119-
beforeShutdown(ctx)
120-
121147
if err := _actorSystem.Stop(ctx); err != nil {
122148
logger.Err(ctx, err).Msg("Failed to shutdown actor system.")
123149
}

runtime/actors/cluster.go

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import (
1313
"context"
1414
"net/url"
1515
"os"
16-
"strconv"
1716
"strings"
17+
"time"
1818

1919
"github.com/hypermodeinc/modus/runtime/app"
2020
"github.com/hypermodeinc/modus/runtime/logger"
@@ -95,12 +95,18 @@ func clusterOptions(ctx context.Context) []goakt.Option {
9595
remotingHost = "0.0.0.0"
9696
}
9797

98+
readTimeout := time.Duration(getIntFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 2)) * time.Second
99+
writeTimeout := time.Duration(getIntFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 2)) * time.Second
100+
98101
return []goakt.Option{
102+
goakt.WithPeerStateLoopInterval(peerSyncInterval()),
99103
goakt.WithRemote(remote.NewConfig(remotingHost, remotingPort)),
100104
goakt.WithCluster(goakt.NewClusterConfig().
101105
WithDiscovery(disco).
102106
WithDiscoveryPort(discoveryPort).
103107
WithPeersPort(peersPort).
108+
WithReadTimeout(readTimeout).
109+
WithWriteTimeout(writeTimeout).
104110
WithKinds(&wasmAgentActor{}, &subscriptionActor{}),
105111
),
106112
}
@@ -145,6 +151,10 @@ func clusterMode() goaktClusterMode {
145151
return parseClusterMode(os.Getenv("MODUS_CLUSTER_MODE"))
146152
}
147153

154+
func clusterEnabled() bool {
155+
return clusterMode() != clusterModeNone
156+
}
157+
148158
func clusterNatsUrl() string {
149159
const envVar = "MODUS_CLUSTER_NATS_URL"
150160
const defaultNatsUrl = "nats://localhost:4222"
@@ -171,7 +181,8 @@ func clusterHost() string {
171181
}
172182

173183
if app.IsDevEnvironment() {
174-
return "localhost"
184+
// Note, forcing IPv4 here avoids memberlist attempting to bind to IPv6 that we're not listening on.
185+
return "127.0.0.1"
175186
} else {
176187
// this hack gets the same IP that the remoting system would bind to by default
177188
rc := remote.NewConfig("0.0.0.0", 0)
@@ -182,33 +193,18 @@ func clusterHost() string {
182193

183194
func clusterPorts() (discoveryPort, remotingPort, peersPort int) {
184195

185-
// Get default ports dynamically
196+
// Get default ports dynamically, but use environment variables if set
186197
ports := dynaport.Get(3)
187-
discoveryPort = ports[0]
188-
remotingPort = ports[1]
189-
peersPort = ports[2]
190-
191-
// Override with environment variables if set
192-
discoveryPort = getPortFromEnv("MODUS_CLUSTER_DISCOVERY_PORT", discoveryPort)
193-
remotingPort = getPortFromEnv("MODUS_CLUSTER_REMOTING_PORT", remotingPort)
194-
peersPort = getPortFromEnv("MODUS_CLUSTER_PEERS_PORT", peersPort)
198+
discoveryPort = getIntFromEnv("MODUS_CLUSTER_DISCOVERY_PORT", ports[0])
199+
remotingPort = getIntFromEnv("MODUS_CLUSTER_REMOTING_PORT", ports[1])
200+
peersPort = getIntFromEnv("MODUS_CLUSTER_PEERS_PORT", ports[2])
195201

196202
return
197203
}
198204

199-
func getPortFromEnv(envVar string, defaultPort int) int {
200-
portStr := os.Getenv(envVar)
201-
if portStr == "" {
202-
return defaultPort
203-
}
204-
205-
port, err := strconv.Atoi(portStr)
206-
if err != nil || port <= 0 {
207-
logger.Warnf("Invalid value for %s. Using %d instead.", envVar, defaultPort)
208-
return defaultPort
209-
}
210-
211-
return port
205+
func peerSyncInterval() time.Duration {
206+
// we use a tight sync interval by default, to ensure quick peer discovery
207+
return time.Duration(getIntFromEnv("MODUS_CLUSTER_PEER_SYNC_MS", 500)) * time.Millisecond
212208
}
213209

214210
func getPodLabels() map[string]string {

runtime/actors/misc.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@ package actors
1212
import (
1313
"context"
1414
"fmt"
15+
"os"
16+
"strconv"
1517
"time"
1618

19+
"github.com/hypermodeinc/modus/runtime/logger"
1720
goakt "github.com/tochemey/goakt/v3/actor"
1821

1922
"google.golang.org/protobuf/proto"
@@ -50,3 +53,19 @@ func ask(ctx context.Context, actorName string, message proto.Message, timeout t
5053
}
5154
return nil, fmt.Errorf("failed to get address or PID for actor %s", actorName)
5255
}
56+
57+
// Retrieves an integer value from an environment variable.
58+
func getIntFromEnv(envVar string, defaultValue int) int {
59+
str := os.Getenv(envVar)
60+
if str == "" {
61+
return defaultValue
62+
}
63+
64+
value, err := strconv.Atoi(str)
65+
if err != nil || value <= 0 {
66+
logger.Warnf("Invalid value for %s. Using %d instead.", envVar, defaultValue)
67+
return defaultValue
68+
}
69+
70+
return value
71+
}

runtime/go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ require (
4242
github.com/tetratelabs/wazero v1.9.0
4343
github.com/tidwall/gjson v1.18.0
4444
github.com/tidwall/sjson v1.2.5
45-
github.com/tochemey/goakt/v3 v3.6.2
45+
github.com/tochemey/goakt/v3 v3.6.4
4646
github.com/travisjeffery/go-dynaport v1.0.0
4747
github.com/twpayne/go-geom v1.6.1
4848
github.com/wundergraph/graphql-go-tools/execution v1.3.2-0.20250618131920-dd0d9cc2a919
@@ -52,9 +52,9 @@ require (
5252
golang.org/x/sys v0.33.0
5353
google.golang.org/grpc v1.73.0
5454
google.golang.org/protobuf v1.36.6
55-
k8s.io/api v0.33.1
56-
k8s.io/apimachinery v0.33.1
57-
k8s.io/client-go v0.33.1
55+
k8s.io/api v0.33.2
56+
k8s.io/apimachinery v0.33.2
57+
k8s.io/client-go v0.33.2
5858
sigs.k8s.io/controller-runtime v0.21.0
5959
)
6060

0 commit comments

Comments
 (0)