Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

# Change Log

## 2025-06-22 - Runtime 0.18.0-alpha.10

- fix: more cluster resiliency improvements [#901](https://github.com/hypermodeinc/modus/pull/901)

## 2025-06-21 - Runtime 0.18.0-alpha.9

- fix: improve cluster resiliency [#900](https://github.com/hypermodeinc/modus/pull/900)
Expand Down
28 changes: 1 addition & 27 deletions runtime/actors/actorlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,13 @@ import (
"fmt"
"io"
"log"
"slices"
"strings"

"github.com/hypermodeinc/modus/runtime/logger"
"github.com/hypermodeinc/modus/runtime/utils"
"github.com/rs/zerolog"
actorLog "github.com/tochemey/goakt/v3/log"
)

// some messages are ignored during shutdown because they are expected
var shutdownIgnoredMessages = []string{
"Failed to acquire semaphore: context canceled",
" is down. modus is going to shutdown.",
}

func newActorLogger(logger *zerolog.Logger) *actorLogger {

var minLevel zerolog.Level
Expand All @@ -43,28 +35,10 @@ func newActorLogger(logger *zerolog.Logger) *actorLogger {
}

type actorLogger struct {
logger *zerolog.Logger
paused bool
shuttingDown bool
}

func (al *actorLogger) Pause() {
al.paused = true
}

func (al *actorLogger) Resume() {
al.paused = false
logger *zerolog.Logger
}

func (al *actorLogger) writeToLog(level zerolog.Level, msg string) {
if al.paused {
return
}
if al.shuttingDown && slices.ContainsFunc(shutdownIgnoredMessages, func(s string) bool {
return strings.Contains(msg, s)
}) {
return
}
al.logger.WithLevel(level).Msg(msg)
}

Expand Down
97 changes: 67 additions & 30 deletions runtime/actors/actorsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hypermodeinc/modus/runtime/messages"
"github.com/hypermodeinc/modus/runtime/pluginmanager"
"github.com/hypermodeinc/modus/runtime/plugins"
"github.com/hypermodeinc/modus/runtime/utils"
"github.com/hypermodeinc/modus/runtime/wasmhost"

goakt "github.com/tochemey/goakt/v3/actor"
Expand All @@ -27,6 +28,8 @@ import (
var _actorSystem goakt.ActorSystem

func Initialize(ctx context.Context) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

wasmExt := &wasmExtension{
host: wasmhost.GetWasmHost(ctx),
Expand All @@ -42,24 +45,47 @@ func Initialize(ctx context.Context) {
}
opts = append(opts, clusterOptions(ctx)...)

if actorSystem, err := goakt.NewActorSystem("modus", opts...); err != nil {
actorSystem, err := goakt.NewActorSystem("modus", opts...)
if err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.")
} else if err := actorSystem.Start(ctx); err != nil {
}

if err := startActorSystem(ctx, actorSystem); err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to start actor system.")
} else if err := actorSystem.Inject(&wasmAgentInfo{}); err != nil {
}

if err := actorSystem.Inject(&wasmAgentInfo{}); err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to inject wasm agent info into actor system.")
} else {
_actorSystem = actorSystem
}

waitForClusterSync()
_actorSystem = actorSystem

logger.Info(ctx).Msg("Actor system started.")

pluginmanager.RegisterPluginLoadedCallback(loadAgentActors)
}

func startActorSystem(ctx context.Context, actorSystem goakt.ActorSystem) error {
maxRetries := getIntFromEnv("MODUS_ACTOR_SYSTEM_START_MAX_RETRIES", 5)
retryInterval := getDurationFromEnv("MODUS_ACTOR_SYSTEM_START_RETRY_INTERVAL_SECONDS", 2, time.Second)

for i := range maxRetries {
if err := actorSystem.Start(ctx); err != nil {
logger.Warn(ctx).Err(err).Int("attempt", i+1).Msgf("Failed to start actor system, retrying in %s...", retryInterval)
time.Sleep(retryInterval)
retryInterval *= 2 // Exponential backoff
continue
}
return nil
}

return fmt.Errorf("failed to start actor system after %d retries", maxRetries)
}

func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

// restart local actors that are already running, which will reload the plugin
actors := _actorSystem.Actors()
localAgents := make(map[string]bool, len(actors))
Expand All @@ -72,36 +98,44 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
}
}

// spawn actors for agents with state in the database, that are not already running
// check both locally and on remote nodes in the cluster
logger.Debug(ctx).Msg("Restoring agent actors from database.")
agents, err := db.QueryActiveAgents(ctx)
if err != nil {
return fmt.Errorf("failed to query active agents: %w", err)
}
inCluster := _actorSystem.InCluster()
for _, agent := range agents {
if !localAgents[agent.Id] {
if inCluster {
actorName := getActorName(agent.Id)
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
logger.Err(ctx, err).Msgf("Failed to check if actor %s exists in cluster.", actorName)
} else if exists {
// if the actor already exists in the cluster, skip spawning it
continue
// do this next part in a goroutine to avoid blocking the cluster engine startup
go func() {
waitForClusterSync()

// spawn actors for agents with state in the database, that are not already running
// check both locally and on remote nodes in the cluster
logger.Debug(ctx).Msg("Restoring agent actors from database.")
agents, err := db.QueryActiveAgents(ctx)
if err != nil {
logger.Err(ctx, err).Msg("Failed to query active agents from database.")
return
}
inCluster := _actorSystem.InCluster()
for _, agent := range agents {
if !localAgents[agent.Id] {
if inCluster {
actorName := getActorName(agent.Id)
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
logger.Err(ctx, err).Msgf("Failed to check if actor %s exists in cluster.", actorName)
} else if exists {
// if the actor already exists in the cluster, skip spawning it
continue
}
}
if err := spawnActorForAgent(ctx, plugin.Name(), agent.Id, agent.Name, false); err != nil {
logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id)
}
}
if err := spawnActorForAgent(ctx, plugin.Name(), agent.Id, agent.Name, false); err != nil {
logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id)
}
}
}
}()

return nil
}

func beforeShutdown(ctx context.Context) error {
_actorSystem.Logger().(*actorLogger).shuttingDown = true
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

logger.Info(ctx).Msg("Actor system shutting down...")
actors := _actorSystem.Actors()

Expand All @@ -127,19 +161,22 @@ func beforeShutdown(ctx context.Context) error {
}
}

waitForClusterSync()
// waitForClusterSync()

// then allow the actor system to continue with its shutdown process
return nil
}

func waitForClusterSync() {
if clusterEnabled() {
time.Sleep(peerSyncInterval() * 2)
time.Sleep(nodesSyncInterval())
}
}

func Shutdown(ctx context.Context) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

if _actorSystem == nil {
logger.Fatal(ctx).Msg("Actor system is not initialized, cannot shutdown.")
}
Expand Down
27 changes: 26 additions & 1 deletion runtime/actors/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ const (
)

func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

plugin, ok := plugins.GetPluginFromContext(ctx)
if !ok {
return nil, fmt.Errorf("no plugin found in context")
Expand All @@ -77,6 +80,8 @@ func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) {
}

func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName string, initializing bool) error {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

ctx = context.WithoutCancel(ctx)
ctx = context.WithValue(ctx, utils.AgentIdContextKey, agentId)
Expand All @@ -99,6 +104,9 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri
}

func StopAgent(ctx context.Context, agentId string) (*AgentInfo, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

actorName := getActorName(agentId)
if err := tell(ctx, actorName, &messages.ShutdownAgent{}); err != nil {
if !errors.Is(err, goakt.ErrActorNotFound) {
Expand All @@ -122,6 +130,9 @@ func StopAgent(ctx context.Context, agentId string) (*AgentInfo, error) {
}

func getAgentInfoFromDatabase(ctx context.Context, agentId string) (*AgentInfo, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

if agent, e := db.GetAgentState(ctx, agentId); e == nil {
return &AgentInfo{
Id: agent.Id,
Expand All @@ -133,6 +144,9 @@ func getAgentInfoFromDatabase(ctx context.Context, agentId string) (*AgentInfo,
}

func GetAgentInfo(ctx context.Context, agentId string) (*AgentInfo, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

actorName := getActorName(agentId)
request := &messages.AgentInfoRequest{}

Expand Down Expand Up @@ -170,6 +184,9 @@ func newAgentMessageErrorResponse(errMsg string) *agentMessageResponse {
}

func SendAgentMessage(ctx context.Context, agentId string, msgName string, data *string, timeout int64) (*agentMessageResponse, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

actorName := getActorName(agentId)

msg := &messages.AgentRequest{
Expand Down Expand Up @@ -202,6 +219,8 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data
}

func PublishAgentEvent(ctx context.Context, agentId, eventName string, eventData *string) error {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

var data any
if eventData != nil {
Expand Down Expand Up @@ -264,6 +283,9 @@ func getAgentTopic(agentId string) string {
}

func ListActiveAgents(ctx context.Context) ([]AgentInfo, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

agents, err := db.QueryActiveAgents(ctx)
if err != nil {
return nil, fmt.Errorf("error listing active agents: %w", err)
Expand All @@ -281,11 +303,14 @@ func ListActiveAgents(ctx context.Context) ([]AgentInfo, error) {
return results, nil
}

func ListLocalAgents() []AgentInfo {
func ListLocalAgents(ctx context.Context) []AgentInfo {
if _actorSystem == nil {
return nil
}

span, _ := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

actors := _actorSystem.Actors()
results := make([]AgentInfo, 0, len(actors))

Expand Down
Loading
Loading