Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.

Commit 5e780f9

Browse files
fix: more cluster resiliency improvements (#901)
1 parent 8fc98dc commit 5e780f9

File tree

11 files changed

+297
-121
lines changed

11 files changed

+297
-121
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
# Change Log
44

5+
## 2025-06-22 - Runtime 0.18.0-alpha.10
6+
7+
- fix: more cluster resiliency improvements [#901](https://github.com/hypermodeinc/modus/pull/901)
8+
59
## 2025-06-21 - Runtime 0.18.0-alpha.9
610

711
- fix: improve cluster resiliency [#900](https://github.com/hypermodeinc/modus/pull/900)

runtime/actors/actorlogger.go

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,13 @@ import (
1313
"fmt"
1414
"io"
1515
"log"
16-
"slices"
17-
"strings"
1816

1917
"github.com/hypermodeinc/modus/runtime/logger"
2018
"github.com/hypermodeinc/modus/runtime/utils"
2119
"github.com/rs/zerolog"
2220
actorLog "github.com/tochemey/goakt/v3/log"
2321
)
2422

25-
// some messages are ignored during shutdown because they are expected
26-
var shutdownIgnoredMessages = []string{
27-
"Failed to acquire semaphore: context canceled",
28-
" is down. modus is going to shutdown.",
29-
}
30-
3123
func newActorLogger(logger *zerolog.Logger) *actorLogger {
3224

3325
var minLevel zerolog.Level
@@ -43,28 +35,10 @@ func newActorLogger(logger *zerolog.Logger) *actorLogger {
4335
}
4436

4537
type actorLogger struct {
46-
logger *zerolog.Logger
47-
paused bool
48-
shuttingDown bool
49-
}
50-
51-
func (al *actorLogger) Pause() {
52-
al.paused = true
53-
}
54-
55-
func (al *actorLogger) Resume() {
56-
al.paused = false
38+
logger *zerolog.Logger
5739
}
5840

5941
func (al *actorLogger) writeToLog(level zerolog.Level, msg string) {
60-
if al.paused {
61-
return
62-
}
63-
if al.shuttingDown && slices.ContainsFunc(shutdownIgnoredMessages, func(s string) bool {
64-
return strings.Contains(msg, s)
65-
}) {
66-
return
67-
}
6842
al.logger.WithLevel(level).Msg(msg)
6943
}
7044

runtime/actors/actorsystem.go

Lines changed: 67 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/hypermodeinc/modus/runtime/messages"
2020
"github.com/hypermodeinc/modus/runtime/pluginmanager"
2121
"github.com/hypermodeinc/modus/runtime/plugins"
22+
"github.com/hypermodeinc/modus/runtime/utils"
2223
"github.com/hypermodeinc/modus/runtime/wasmhost"
2324

2425
goakt "github.com/tochemey/goakt/v3/actor"
@@ -27,6 +28,8 @@ import (
2728
var _actorSystem goakt.ActorSystem
2829

2930
func Initialize(ctx context.Context) {
31+
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
32+
defer span.Finish()
3033

3134
wasmExt := &wasmExtension{
3235
host: wasmhost.GetWasmHost(ctx),
@@ -42,24 +45,47 @@ func Initialize(ctx context.Context) {
4245
}
4346
opts = append(opts, clusterOptions(ctx)...)
4447

45-
if actorSystem, err := goakt.NewActorSystem("modus", opts...); err != nil {
48+
actorSystem, err := goakt.NewActorSystem("modus", opts...)
49+
if err != nil {
4650
logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.")
47-
} else if err := actorSystem.Start(ctx); err != nil {
51+
}
52+
53+
if err := startActorSystem(ctx, actorSystem); err != nil {
4854
logger.Fatal(ctx).Err(err).Msg("Failed to start actor system.")
49-
} else if err := actorSystem.Inject(&wasmAgentInfo{}); err != nil {
55+
}
56+
57+
if err := actorSystem.Inject(&wasmAgentInfo{}); err != nil {
5058
logger.Fatal(ctx).Err(err).Msg("Failed to inject wasm agent info into actor system.")
51-
} else {
52-
_actorSystem = actorSystem
5359
}
5460

55-
waitForClusterSync()
61+
_actorSystem = actorSystem
5662

5763
logger.Info(ctx).Msg("Actor system started.")
5864

5965
pluginmanager.RegisterPluginLoadedCallback(loadAgentActors)
6066
}
6167

68+
func startActorSystem(ctx context.Context, actorSystem goakt.ActorSystem) error {
69+
maxRetries := getIntFromEnv("MODUS_ACTOR_SYSTEM_START_MAX_RETRIES", 5)
70+
retryInterval := getDurationFromEnv("MODUS_ACTOR_SYSTEM_START_RETRY_INTERVAL_SECONDS", 2, time.Second)
71+
72+
for i := range maxRetries {
73+
if err := actorSystem.Start(ctx); err != nil {
74+
logger.Warn(ctx).Err(err).Int("attempt", i+1).Msgf("Failed to start actor system, retrying in %s...", retryInterval)
75+
time.Sleep(retryInterval)
76+
retryInterval *= 2 // Exponential backoff
77+
continue
78+
}
79+
return nil
80+
}
81+
82+
return fmt.Errorf("failed to start actor system after %d retries", maxRetries)
83+
}
84+
6285
func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
86+
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
87+
defer span.Finish()
88+
6389
// restart local actors that are already running, which will reload the plugin
6490
actors := _actorSystem.Actors()
6591
localAgents := make(map[string]bool, len(actors))
@@ -72,36 +98,44 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
7298
}
7399
}
74100

75-
// spawn actors for agents with state in the database, that are not already running
76-
// check both locally and on remote nodes in the cluster
77-
logger.Debug(ctx).Msg("Restoring agent actors from database.")
78-
agents, err := db.QueryActiveAgents(ctx)
79-
if err != nil {
80-
return fmt.Errorf("failed to query active agents: %w", err)
81-
}
82-
inCluster := _actorSystem.InCluster()
83-
for _, agent := range agents {
84-
if !localAgents[agent.Id] {
85-
if inCluster {
86-
actorName := getActorName(agent.Id)
87-
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
88-
logger.Err(ctx, err).Msgf("Failed to check if actor %s exists in cluster.", actorName)
89-
} else if exists {
90-
// if the actor already exists in the cluster, skip spawning it
91-
continue
101+
// do this next part in a goroutine to avoid blocking the cluster engine startup
102+
go func() {
103+
waitForClusterSync()
104+
105+
// spawn actors for agents with state in the database, that are not already running
106+
// check both locally and on remote nodes in the cluster
107+
logger.Debug(ctx).Msg("Restoring agent actors from database.")
108+
agents, err := db.QueryActiveAgents(ctx)
109+
if err != nil {
110+
logger.Err(ctx, err).Msg("Failed to query active agents from database.")
111+
return
112+
}
113+
inCluster := _actorSystem.InCluster()
114+
for _, agent := range agents {
115+
if !localAgents[agent.Id] {
116+
if inCluster {
117+
actorName := getActorName(agent.Id)
118+
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
119+
logger.Err(ctx, err).Msgf("Failed to check if actor %s exists in cluster.", actorName)
120+
} else if exists {
121+
// if the actor already exists in the cluster, skip spawning it
122+
continue
123+
}
124+
}
125+
if err := spawnActorForAgent(ctx, plugin.Name(), agent.Id, agent.Name, false); err != nil {
126+
logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id)
92127
}
93-
}
94-
if err := spawnActorForAgent(ctx, plugin.Name(), agent.Id, agent.Name, false); err != nil {
95-
logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id)
96128
}
97129
}
98-
}
130+
}()
99131

100132
return nil
101133
}
102134

103135
func beforeShutdown(ctx context.Context) error {
104-
_actorSystem.Logger().(*actorLogger).shuttingDown = true
136+
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
137+
defer span.Finish()
138+
105139
logger.Info(ctx).Msg("Actor system shutting down...")
106140
actors := _actorSystem.Actors()
107141

@@ -127,19 +161,22 @@ func beforeShutdown(ctx context.Context) error {
127161
}
128162
}
129163

130-
waitForClusterSync()
164+
// waitForClusterSync()
131165

132166
// then allow the actor system to continue with its shutdown process
133167
return nil
134168
}
135169

136170
func waitForClusterSync() {
137171
if clusterEnabled() {
138-
time.Sleep(peerSyncInterval() * 2)
172+
time.Sleep(nodesSyncInterval())
139173
}
140174
}
141175

142176
func Shutdown(ctx context.Context) {
177+
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
178+
defer span.Finish()
179+
143180
if _actorSystem == nil {
144181
logger.Fatal(ctx).Msg("Actor system is not initialized, cannot shutdown.")
145182
}

runtime/actors/agents.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ const (
6363
)
6464

6565
func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) {
66+
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
67+
defer span.Finish()
68+
6669
plugin, ok := plugins.GetPluginFromContext(ctx)
6770
if !ok {
6871
return nil, fmt.Errorf("no plugin found in context")
@@ -77,6 +80,8 @@ func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) {
7780
}
7881

7982
func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName string, initializing bool) error {
83+
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
84+
defer span.Finish()
8085

8186
ctx = context.WithoutCancel(ctx)
8287
ctx = context.WithValue(ctx, utils.AgentIdContextKey, agentId)
@@ -99,6 +104,9 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri
99104
}
100105

101106
func StopAgent(ctx context.Context, agentId string) (*AgentInfo, error) {
107+
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
108+
defer span.Finish()
109+
102110
actorName := getActorName(agentId)
103111
if err := tell(ctx, actorName, &messages.ShutdownAgent{}); err != nil {
104112
if !errors.Is(err, goakt.ErrActorNotFound) {
@@ -122,6 +130,9 @@ func StopAgent(ctx context.Context, agentId string) (*AgentInfo, error) {
122130
}
123131

124132
func getAgentInfoFromDatabase(ctx context.Context, agentId string) (*AgentInfo, error) {
133+
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
134+
defer span.Finish()
135+
125136
if agent, e := db.GetAgentState(ctx, agentId); e == nil {
126137
return &AgentInfo{
127138
Id: agent.Id,
@@ -133,6 +144,9 @@ func getAgentInfoFromDatabase(ctx context.Context, agentId string) (*AgentInfo,
133144
}
134145

135146
func GetAgentInfo(ctx context.Context, agentId string) (*AgentInfo, error) {
147+
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
148+
defer span.Finish()
149+
136150
actorName := getActorName(agentId)
137151
request := &messages.AgentInfoRequest{}
138152

@@ -170,6 +184,9 @@ func newAgentMessageErrorResponse(errMsg string) *agentMessageResponse {
170184
}
171185

172186
func SendAgentMessage(ctx context.Context, agentId string, msgName string, data *string, timeout int64) (*agentMessageResponse, error) {
187+
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
188+
defer span.Finish()
189+
173190
actorName := getActorName(agentId)
174191

175192
msg := &messages.AgentRequest{
@@ -202,6 +219,8 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data
202219
}
203220

204221
func PublishAgentEvent(ctx context.Context, agentId, eventName string, eventData *string) error {
222+
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
223+
defer span.Finish()
205224

206225
var data any
207226
if eventData != nil {
@@ -264,6 +283,9 @@ func getAgentTopic(agentId string) string {
264283
}
265284

266285
func ListActiveAgents(ctx context.Context) ([]AgentInfo, error) {
286+
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
287+
defer span.Finish()
288+
267289
agents, err := db.QueryActiveAgents(ctx)
268290
if err != nil {
269291
return nil, fmt.Errorf("error listing active agents: %w", err)
@@ -281,11 +303,14 @@ func ListActiveAgents(ctx context.Context) ([]AgentInfo, error) {
281303
return results, nil
282304
}
283305

284-
func ListLocalAgents() []AgentInfo {
306+
func ListLocalAgents(ctx context.Context) []AgentInfo {
285307
if _actorSystem == nil {
286308
return nil
287309
}
288310

311+
span, _ := utils.NewSentrySpanForCurrentFunc(ctx)
312+
defer span.Finish()
313+
289314
actors := _actorSystem.Actors()
290315
results := make([]AgentInfo, 0, len(actors))
291316

0 commit comments

Comments
 (0)