Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .trunk/configs/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
"mapdata",
"mapstructure",
"mattn",
"memberlist",
"metagen",
"millis",
"minilm",
Expand Down
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"MODUS_ENV": "dev",
"MODUS_CLUSTER_MODE": "NATS",
"MODUS_CLUSTER_NATS_URL": "nats://localhost:4222",
"MODUS_DEBUG_ACTORS": "true",
// "MODUS_DEBUG_ACTORS": "true",
"MODUS_DEBUG": "true",
"MODUS_USE_MODUSDB": "false",
"MODUS_DB": "postgresql://postgres:postgres@localhost:5432/modus?sslmode=disable" // checkov:skip=CKV_SECRET_4
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

# Change Log

## 2025-06-21 - Runtime 0.18.0-alpha.9

- fix: improve cluster resiliency [#900](https://github.com/hypermodeinc/modus/pull/900)

## 2025-06-18 - Runtime 0.18.0-alpha.8

- feat: cluster mode [#895](https://github.com/hypermodeinc/modus/pull/895)
Expand Down
54 changes: 42 additions & 12 deletions runtime/actors/actorlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,61 +13,91 @@ import (
"fmt"
"io"
"log"
"slices"
"strings"

"github.com/hypermodeinc/modus/runtime/logger"
"github.com/hypermodeinc/modus/runtime/utils"
"github.com/rs/zerolog"
actorLog "github.com/tochemey/goakt/v3/log"
)

// some messages are ignored during shutdown because they are expected
var shutdownIgnoredMessages = []string{
"Failed to acquire semaphore: context canceled",
" is down. modus is going to shutdown.",
}

func newActorLogger(logger *zerolog.Logger) *actorLogger {

var minLevel zerolog.Level
if utils.EnvVarFlagEnabled("MODUS_DEBUG_ACTORS") {
minLevel = zerolog.DebugLevel
} else {
// goakt info level is too noisy, so default to show only errors
minLevel = zerolog.ErrorLevel
// goakt info level is too noisy, so default to show warnings and above
minLevel = zerolog.WarnLevel
}

l := logger.Level(minLevel).With().Str("component", "actors").Logger()
return &actorLogger{&l}
return &actorLogger{logger: &l}
}

type actorLogger struct {
logger *zerolog.Logger
logger *zerolog.Logger
paused bool
shuttingDown bool
}

func (al *actorLogger) Pause() {
al.paused = true
}

func (al *actorLogger) Resume() {
al.paused = false
}

func (al *actorLogger) writeToLog(level zerolog.Level, msg string) {
if al.paused {
return
}
if al.shuttingDown && slices.ContainsFunc(shutdownIgnoredMessages, func(s string) bool {
return strings.Contains(msg, s)
}) {
return
}
al.logger.WithLevel(level).Msg(msg)
}

func (al *actorLogger) Debug(v ...any) {
al.logger.Debug().Msg(fmt.Sprint(v...))
al.writeToLog(zerolog.DebugLevel, fmt.Sprint(v...))
}

func (al *actorLogger) Debugf(format string, v ...any) {
al.logger.Debug().Msgf(format, v...)
al.writeToLog(zerolog.DebugLevel, fmt.Sprintf(format, v...))
}

func (al *actorLogger) Info(v ...any) {
al.logger.Info().Msg(fmt.Sprint(v...))
al.writeToLog(zerolog.InfoLevel, fmt.Sprint(v...))
}

func (al *actorLogger) Infof(format string, v ...any) {
al.logger.Info().Msgf(format, v...)
al.writeToLog(zerolog.InfoLevel, fmt.Sprintf(format, v...))
}

func (al *actorLogger) Warn(v ...any) {
al.logger.Warn().Msg(fmt.Sprint(v...))
al.writeToLog(zerolog.WarnLevel, fmt.Sprint(v...))
}

func (al *actorLogger) Warnf(format string, v ...any) {
al.logger.Warn().Msgf(format, v...)
al.writeToLog(zerolog.WarnLevel, fmt.Sprintf(format, v...))
}

func (al *actorLogger) Error(v ...any) {
al.logger.Error().Msg(fmt.Sprint(v...))
al.writeToLog(zerolog.ErrorLevel, fmt.Sprint(v...))
}

func (al *actorLogger) Errorf(format string, v ...any) {
al.logger.Error().Msgf(format, v...)
al.writeToLog(zerolog.ErrorLevel, fmt.Sprintf(format, v...))
}

func (al *actorLogger) Fatal(v ...any) {
Expand Down
64 changes: 45 additions & 19 deletions runtime/actors/actorsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package actors

import (
"context"
"fmt"
"time"

"github.com/hypermodeinc/modus/runtime/db"
Expand All @@ -33,6 +34,7 @@ func Initialize(ctx context.Context) {

opts := []goakt.Option{
goakt.WithLogger(newActorLogger(logger.Get(ctx))),
goakt.WithCoordinatedShutdown(beforeShutdown),
goakt.WithPubSub(),
goakt.WithActorInitTimeout(10 * time.Second), // TODO: adjust this value, or make it configurable
goakt.WithActorInitMaxRetries(1), // TODO: adjust this value, or make it configurable
Expand All @@ -50,6 +52,8 @@ func Initialize(ctx context.Context) {
_actorSystem = actorSystem
}

waitForClusterSync()

logger.Info(ctx).Msg("Actor system started.")

pluginmanager.RegisterPluginLoadedCallback(loadAgentActors)
Expand All @@ -70,54 +74,76 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {

// spawn actors for agents with state in the database, that are not already running
// check both locally and on remote nodes in the cluster
logger.Debug(ctx).Msg("Restoring agent actors from database.")
agents, err := db.QueryActiveAgents(ctx)
if err != nil {
logger.Err(ctx, err).Msg("Failed to query agents from database.")
return err
return fmt.Errorf("failed to query active agents: %w", err)
}
inCluster := _actorSystem.InCluster()
for _, agent := range agents {
if !localAgents[agent.Id] {
if _actorSystem.InCluster() {
if inCluster {
actorName := getActorName(agent.Id)
if _, err := _actorSystem.RemoteActor(ctx, actorName); err == nil {
// found actor in cluster, no need to spawn it again
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
logger.Err(ctx, err).Msgf("Failed to check if actor %s exists in cluster.", actorName)
} else if exists {
// if the actor already exists in the cluster, skip spawning it
continue
}
}
go func(f_ctx context.Context, pluginName, agentId, agentName string) {
if err := spawnActorForAgent(f_ctx, pluginName, agentId, agentName, false); err != nil {
logger.Err(f_ctx, err).Msgf("Failed to spawn actor for agent %s.", agentId)
}
}(ctx, plugin.Name(), agent.Id, agent.Name)
if err := spawnActorForAgent(ctx, plugin.Name(), agent.Id, agent.Name, false); err != nil {
logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id)
}
}
}

return nil
}

func beforeShutdown(ctx context.Context) {
func beforeShutdown(ctx context.Context) error {
_actorSystem.Logger().(*actorLogger).shuttingDown = true
logger.Info(ctx).Msg("Actor system shutting down...")
actors := _actorSystem.Actors()

// stop all agent actors before shutdown so they can suspend properly
for _, pid := range _actorSystem.Actors() {
if _, ok := pid.Actor().(*wasmAgentActor); ok {
// Suspend all local running agent actors first, which allows them to gracefully stop and persist their state.
// In cluster mode, this will also allow the actor to resume on another node after this node shuts down.
for _, pid := range actors {
if actor, ok := pid.Actor().(*wasmAgentActor); ok && pid.IsRunning() {
if actor.status == AgentStatusRunning {
ctx := actor.augmentContext(ctx, pid)
if err := actor.suspendAgent(ctx); err != nil {
logger.Err(ctx, err).Str("agent_id", actor.agentId).Msg("Failed to suspend agent actor.")
}
}
}
}

// pass the pid so it can be used during shutdown as an event sender
ctx := context.WithValue(ctx, pidContextKey{}, pid)
// Then shut down subscription actors. They will have received the suspend message already.
for _, pid := range actors {
if _, ok := pid.Actor().(*subscriptionActor); ok && pid.IsRunning() {
if err := pid.Shutdown(ctx); err != nil {
logger.Err(ctx, err).Msgf("Failed to shutdown actor %s.", pid.Name())
}
}
}

waitForClusterSync()

// then allow the actor system to continue with its shutdown process
return nil
}

func waitForClusterSync() {
if clusterEnabled() {
time.Sleep(peerSyncInterval() * 2)
}
}

func Shutdown(ctx context.Context) {
if _actorSystem == nil {
return
logger.Fatal(ctx).Msg("Actor system is not initialized, cannot shutdown.")
}

beforeShutdown(ctx)

if err := _actorSystem.Stop(ctx); err != nil {
logger.Err(ctx, err).Msg("Failed to shutdown actor system.")
}
Expand Down
44 changes: 20 additions & 24 deletions runtime/actors/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"context"
"net/url"
"os"
"strconv"
"strings"
"time"

"github.com/hypermodeinc/modus/runtime/app"
"github.com/hypermodeinc/modus/runtime/logger"
Expand Down Expand Up @@ -95,12 +95,18 @@ func clusterOptions(ctx context.Context) []goakt.Option {
remotingHost = "0.0.0.0"
}

readTimeout := time.Duration(getIntFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 2)) * time.Second
writeTimeout := time.Duration(getIntFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 2)) * time.Second

return []goakt.Option{
goakt.WithPeerStateLoopInterval(peerSyncInterval()),
goakt.WithRemote(remote.NewConfig(remotingHost, remotingPort)),
goakt.WithCluster(goakt.NewClusterConfig().
WithDiscovery(disco).
WithDiscoveryPort(discoveryPort).
WithPeersPort(peersPort).
WithReadTimeout(readTimeout).
WithWriteTimeout(writeTimeout).
WithKinds(&wasmAgentActor{}, &subscriptionActor{}),
),
}
Expand Down Expand Up @@ -145,6 +151,10 @@ func clusterMode() goaktClusterMode {
return parseClusterMode(os.Getenv("MODUS_CLUSTER_MODE"))
}

func clusterEnabled() bool {
return clusterMode() != clusterModeNone
}

func clusterNatsUrl() string {
const envVar = "MODUS_CLUSTER_NATS_URL"
const defaultNatsUrl = "nats://localhost:4222"
Expand All @@ -171,7 +181,8 @@ func clusterHost() string {
}

if app.IsDevEnvironment() {
return "localhost"
// Note, forcing IPv4 here avoids memberlist attempting to bind to IPv6 that we're not listening on.
return "127.0.0.1"
} else {
// this hack gets the same IP that the remoting system would bind to by default
rc := remote.NewConfig("0.0.0.0", 0)
Expand All @@ -182,33 +193,18 @@ func clusterHost() string {

func clusterPorts() (discoveryPort, remotingPort, peersPort int) {

// Get default ports dynamically
// Get default ports dynamically, but use environment variables if set
ports := dynaport.Get(3)
discoveryPort = ports[0]
remotingPort = ports[1]
peersPort = ports[2]

// Override with environment variables if set
discoveryPort = getPortFromEnv("MODUS_CLUSTER_DISCOVERY_PORT", discoveryPort)
remotingPort = getPortFromEnv("MODUS_CLUSTER_REMOTING_PORT", remotingPort)
peersPort = getPortFromEnv("MODUS_CLUSTER_PEERS_PORT", peersPort)
discoveryPort = getIntFromEnv("MODUS_CLUSTER_DISCOVERY_PORT", ports[0])
remotingPort = getIntFromEnv("MODUS_CLUSTER_REMOTING_PORT", ports[1])
peersPort = getIntFromEnv("MODUS_CLUSTER_PEERS_PORT", ports[2])

return
}

func getPortFromEnv(envVar string, defaultPort int) int {
portStr := os.Getenv(envVar)
if portStr == "" {
return defaultPort
}

port, err := strconv.Atoi(portStr)
if err != nil || port <= 0 {
logger.Warnf("Invalid value for %s. Using %d instead.", envVar, defaultPort)
return defaultPort
}

return port
func peerSyncInterval() time.Duration {
// we use a tight sync interval by default, to ensure quick peer discovery
return time.Duration(getIntFromEnv("MODUS_CLUSTER_PEER_SYNC_MS", 500)) * time.Millisecond
}

func getPodLabels() map[string]string {
Expand Down
19 changes: 19 additions & 0 deletions runtime/actors/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ package actors
import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/hypermodeinc/modus/runtime/logger"
goakt "github.com/tochemey/goakt/v3/actor"

"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -50,3 +53,19 @@ func ask(ctx context.Context, actorName string, message proto.Message, timeout t
}
return nil, fmt.Errorf("failed to get address or PID for actor %s", actorName)
}

// Retrieves an integer value from an environment variable.
func getIntFromEnv(envVar string, defaultValue int) int {
str := os.Getenv(envVar)
if str == "" {
return defaultValue
}

value, err := strconv.Atoi(str)
if err != nil || value <= 0 {
logger.Warnf("Invalid value for %s. Using %d instead.", envVar, defaultValue)
return defaultValue
}

return value
}
8 changes: 4 additions & 4 deletions runtime/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ require (
github.com/tetratelabs/wazero v1.9.0
github.com/tidwall/gjson v1.18.0
github.com/tidwall/sjson v1.2.5
github.com/tochemey/goakt/v3 v3.6.2
github.com/tochemey/goakt/v3 v3.6.4
github.com/travisjeffery/go-dynaport v1.0.0
github.com/twpayne/go-geom v1.6.1
github.com/wundergraph/graphql-go-tools/execution v1.3.2-0.20250618131920-dd0d9cc2a919
Expand All @@ -52,9 +52,9 @@ require (
golang.org/x/sys v0.33.0
google.golang.org/grpc v1.73.0
google.golang.org/protobuf v1.36.6
k8s.io/api v0.33.1
k8s.io/apimachinery v0.33.1
k8s.io/client-go v0.33.1
k8s.io/api v0.33.2
k8s.io/apimachinery v0.33.2
k8s.io/client-go v0.33.2
sigs.k8s.io/controller-runtime v0.21.0
)

Expand Down
Loading
Loading