This repository was archived by the owner on Sep 11, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathactorsystem.go
More file actions
207 lines (170 loc) · 6.5 KB
/
actorsystem.go
File metadata and controls
207 lines (170 loc) · 6.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/*
* Copyright 2025 Hypermode Inc.
* Licensed under the terms of the Apache License, Version 2.0
* See the LICENSE file that accompanied this code for further details.
*
* SPDX-FileCopyrightText: 2025 Hypermode Inc. <hello@hypermode.com>
* SPDX-License-Identifier: Apache-2.0
*/
package actors
import (
"context"
"fmt"
"time"
"github.com/hypermodeinc/modus/runtime/db"
"github.com/hypermodeinc/modus/runtime/logger"
"github.com/hypermodeinc/modus/runtime/messages"
"github.com/hypermodeinc/modus/runtime/pluginmanager"
"github.com/hypermodeinc/modus/runtime/plugins"
"github.com/hypermodeinc/modus/runtime/utils"
"github.com/hypermodeinc/modus/runtime/wasmhost"
goakt "github.com/tochemey/goakt/v3/actor"
)
var _actorSystem goakt.ActorSystem
func Initialize(ctx context.Context) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()
wasmExt := &wasmExtension{
host: wasmhost.GetWasmHost(ctx),
}
opts := []goakt.Option{
goakt.WithLogger(newActorLogger(logger.Get(ctx))),
goakt.WithCoordinatedShutdown(beforeShutdown),
goakt.WithPubSub(),
goakt.WithActorInitTimeout(10 * time.Second), // TODO: adjust this value, or make it configurable
goakt.WithActorInitMaxRetries(1), // TODO: adjust this value, or make it configurable
goakt.WithExtensions(wasmExt),
}
opts = append(opts, clusterOptions(ctx)...)
actorSystem, err := goakt.NewActorSystem("modus", opts...)
if err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.")
}
if err := startActorSystem(ctx, actorSystem); err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to start actor system.")
}
if err := actorSystem.Inject(&wasmAgentInfo{}); err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to inject wasm agent info into actor system.")
}
_actorSystem = actorSystem
logger.Info(ctx).Msg("Actor system started.")
pluginmanager.RegisterPluginLoadedCallback(loadAgentActors)
}
func startActorSystem(ctx context.Context, actorSystem goakt.ActorSystem) error {
maxRetries := getIntFromEnv("MODUS_ACTOR_SYSTEM_START_MAX_RETRIES", 5)
retryInterval := getDurationFromEnv("MODUS_ACTOR_SYSTEM_START_RETRY_INTERVAL_SECONDS", 2, time.Second)
for i := range maxRetries {
if err := actorSystem.Start(ctx); err != nil {
logger.Warn(ctx).Err(err).Int("attempt", i+1).Msgf("Failed to start actor system, retrying in %s...", retryInterval)
time.Sleep(retryInterval)
retryInterval *= 2 // Exponential backoff
continue
}
// important: wait for the actor system to sync with the cluster before proceeding
waitForClusterSync(ctx)
return nil
}
return fmt.Errorf("failed to start actor system after %d retries", maxRetries)
}
func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()
// restart local actors that are already running, which will reload the plugin
actors := _actorSystem.Actors()
localAgents := make(map[string]bool, len(actors))
for _, pid := range actors {
if a, ok := pid.Actor().(*wasmAgentActor); ok {
localAgents[a.agentId] = true
if err := goakt.Tell(ctx, pid, &messages.RestartAgent{}); err != nil {
logger.Err(ctx, err).Str("agent_id", a.agentId).Msg("Failed to send restart agent message to actor.")
}
}
}
// spawn actors for agents with state in the database, that are not already running
// check both locally and on remote nodes in the cluster
// do this in a goroutine to avoid blocking the cluster engine startup
go func() {
logger.Debug(ctx).Msg("Restoring agent actors from database.")
agents, err := db.QueryActiveAgents(ctx)
if err != nil {
logger.Err(ctx, err).Msg("Failed to query active agents from database.")
return
}
inCluster := _actorSystem.InCluster()
for _, agent := range agents {
if !localAgents[agent.Id] {
if inCluster {
actorName := getActorName(agent.Id)
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
logger.Err(ctx, err).Msgf("Failed to check if actor %s exists in cluster.", actorName)
} else if exists {
// if the actor already exists in the cluster, skip spawning it
continue
}
}
if err := spawnActorForAgent(ctx, plugin.Name(), agent.Id, agent.Name, false); err != nil {
logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id)
}
}
}
}()
return nil
}
func beforeShutdown(ctx context.Context) error {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()
logger.Info(ctx).Msg("Actor system shutting down...")
actors := _actorSystem.Actors()
// Suspend all local running agent actors first, which allows them to gracefully stop and persist their state.
// In cluster mode, this will also allow the actor to resume on another node after this node shuts down.
for _, pid := range actors {
if actor, ok := pid.Actor().(*wasmAgentActor); ok && pid.IsRunning() {
if actor.status == AgentStatusRunning {
ctx := actor.augmentContext(ctx, pid)
if err := actor.suspendAgent(ctx); err != nil {
logger.Err(ctx, err).Str("agent_id", actor.agentId).Msg("Failed to suspend agent actor.")
}
}
}
}
// Then shut down subscription actors. They will have received the suspend message already.
for _, pid := range actors {
if _, ok := pid.Actor().(*subscriptionActor); ok && pid.IsRunning() {
if err := pid.Shutdown(ctx); err != nil {
logger.Err(ctx, err).Msgf("Failed to shutdown actor %s.", pid.Name())
}
}
}
// waitForClusterSync()
// then allow the actor system to continue with its shutdown process
return nil
}
// Waits for the peer sync interval to pass, allowing time for the actor system to synchronize its
// list of actors with the remote nodes in the cluster. Cancels early if the context is done.
func waitForClusterSync(ctx context.Context) {
if clusterEnabled() {
select {
case <-time.After(peerSyncInterval()):
case <-ctx.Done():
logger.Warn(context.WithoutCancel(ctx)).Msg("Context cancelled while waiting for cluster sync.")
}
}
}
func Shutdown(ctx context.Context) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()
if _actorSystem == nil {
logger.Fatal(ctx).Msg("Actor system is not initialized, cannot shutdown.")
}
if err := _actorSystem.Stop(ctx); err != nil {
logger.Err(ctx, err).Msg("Failed to shutdown actor system.")
}
logger.Info(ctx).Msg("Actor system shutdown complete.")
}
const wasmExtensionId = "wasm"
type wasmExtension struct {
host wasmhost.WasmHost
}
func (w *wasmExtension) ID() string {
return wasmExtensionId
}