@@ -11,68 +11,30 @@ package actors
1111
1212import (
1313 "context"
14- "fmt"
15- "os"
16- "strconv"
1714 "time"
1815
1916 "github.com/hypermodeinc/modus/runtime/db"
2017 "github.com/hypermodeinc/modus/runtime/logger"
18+ "github.com/hypermodeinc/modus/runtime/messages"
2119 "github.com/hypermodeinc/modus/runtime/pluginmanager"
2220 "github.com/hypermodeinc/modus/runtime/plugins"
23- "github.com/hypermodeinc/modus/runtime/wasmhost"
2421
2522 goakt "github.com/tochemey/goakt/v3/actor"
26- goakt_static "github.com/tochemey/goakt/v3/discovery/static"
27- goakt_remote "github.com/tochemey/goakt/v3/remote"
28- "github.com/travisjeffery/go-dynaport"
2923)
3024
25+ const defaultAskTimeout = 10 * time .Second
26+
3127var _actorSystem goakt.ActorSystem
3228
3329func Initialize (ctx context.Context ) {
3430
3531 opts := []goakt.Option {
3632 goakt .WithLogger (newActorLogger (logger .Get (ctx ))),
37- goakt .WithCoordinatedShutdown (beforeShutdown ),
3833 goakt .WithPubSub (),
3934 goakt .WithActorInitTimeout (10 * time .Second ), // TODO: adjust this value, or make it configurable
4035 goakt .WithActorInitMaxRetries (1 ), // TODO: adjust this value, or make it configurable
41-
42- // for now, keep passivation disabled so that agents can perform long-running tasks without the actor stopping.
43- // TODO: figure out how to deal with this better
44- goakt .WithPassivationDisabled (),
45- }
46-
47- // NOTE: we're not relying on cluster mode yet. The below code block is for future use and testing purposes only.
48- if clusterMode , _ := strconv .ParseBool (os .Getenv ("MODUS_USE_CLUSTER_MODE" )); clusterMode {
49- // TODO: static discovery should really only be used for local development and testing.
50- // In production, we should use a more robust discovery mechanism, such as Kubernetes or NATS.
51- // See https://tochemey.gitbook.io/goakt/features/service-discovery
52-
53- // We just get three random ports for now.
54- // In prod, these will need to be configured so they are consistent across all nodes.
55- ports := dynaport .Get (3 )
56- var gossip_port = ports [0 ]
57- var peers_port = ports [1 ]
58- var remoting_port = ports [2 ]
59-
60- disco := goakt_static .NewDiscovery (& goakt_static.Config {
61- Hosts : []string {
62- fmt .Sprintf ("localhost:%d" , gossip_port ),
63- },
64- })
65-
66- opts = append (opts ,
67- goakt .WithRemote (goakt_remote .NewConfig ("localhost" , remoting_port )),
68- goakt .WithCluster (goakt .NewClusterConfig ().
69- WithDiscovery (disco ).
70- WithDiscoveryPort (gossip_port ).
71- WithPeersPort (peers_port ).
72- WithKinds (& wasmAgentActor {}, & subscriptionActor {}),
73- ),
74- )
7536 }
37+ opts = append (opts , clusterOptions (ctx )... )
7638
7739 if actorSystem , err := goakt .NewActorSystem ("modus" , opts ... ); err != nil {
7840 logger .Fatal (ctx ).Err (err ).Msg ("Failed to create actor system." )
@@ -88,53 +50,73 @@ func Initialize(ctx context.Context) {
8850}
8951
9052func loadAgentActors (ctx context.Context , plugin * plugins.Plugin ) error {
91- // restart actors that are already running, giving them the new plugin instance
53+ // restart local actors that are already running, which will reload the plugin
9254 actors := _actorSystem .Actors ()
93- runningAgents := make (map [string ]bool , len (actors ))
55+ localAgents := make (map [string ]bool , len (actors ))
9456 for _ , pid := range actors {
95- go func (f_ctx context.Context , f_pid * goakt.PID ) {
96- if actor , ok := f_pid .Actor ().(* wasmAgentActor ); ok {
97- runningAgents [actor .agentId ] = true
98- actor .plugin = plugin
99- if err := f_pid .Restart (f_ctx ); err != nil {
100- logger .Err (f_ctx , err ).Msgf ("Failed to restart actor for agent %s." , actor .agentId )
101- }
57+ if a , ok := pid .Actor ().(* wasmAgentActor ); ok {
58+ localAgents [a .agentId ] = true
59+ if err := goakt .Tell (ctx , pid , & messages.RestartAgent {}); err != nil {
60+ logger .Err (ctx , err ).Str ("agent_id" , a .agentId ).Msg ("Failed to send restart agent message to actor." )
10261 }
103- }( ctx , pid )
62+ }
10463 }
10564
10665 // spawn actors for agents with state in the database, that are not already running
107- // TODO: when we scale out to allow more nodes in the cluster, we'll need to decide
108- // which node is responsible for spawning each actor.
66+ // check both locally and on remote nodes in the cluster
10967 agents , err := db .QueryActiveAgents (ctx )
11068 if err != nil {
11169 logger .Err (ctx , err ).Msg ("Failed to query agents from database." )
11270 return err
11371 }
114- host := wasmhost .GetWasmHost (ctx )
11572 for _ , agent := range agents {
116- if ! runningAgents [agent .Id ] {
117- go func (f_ctx context.Context , agentId string , agentName string ) {
118- if _ , err := spawnActorForAgent (host , plugin , agentId , agentName , false ); err != nil {
73+ if ! localAgents [agent .Id ] {
74+ if _actorSystem .InCluster () {
75+ actorName := getActorName (agent .Id )
76+ if _ , err := _actorSystem .RemoteActor (ctx , actorName ); err == nil {
77+ // found actor in cluster, no need to spawn it again
78+ continue
79+ }
80+ }
81+ go func (f_ctx context.Context , pluginName , agentId , agentName string ) {
82+ if err := spawnActorForAgent (f_ctx , pluginName , agentId , agentName , false ); err != nil {
11983 logger .Err (f_ctx , err ).Msgf ("Failed to spawn actor for agent %s." , agentId )
12084 }
121- }(ctx , agent .Id , agent .Name )
85+ }(ctx , plugin . Name (), agent .Id , agent .Name )
12286 }
12387 }
12488
12589 return nil
12690}
12791
128- func beforeShutdown (ctx context.Context ) error {
92+ func beforeShutdown (ctx context.Context ) {
12993 logger .Info (ctx ).Msg ("Actor system shutting down..." )
130- return nil
94+
95+ if _actorSystem .InCluster () {
96+ // TODO: if we're the last node in the cluster, we should continue to shut down agents
97+ return
98+ }
99+
100+ // stop all agent actors before shutdown so they can suspend properly
101+ for _ , pid := range _actorSystem .Actors () {
102+ if _ , ok := pid .Actor ().(* wasmAgentActor ); ok {
103+
104+ // pass the pid so it can be used during shutdown as an event sender
105+ ctx := context .WithValue (ctx , pidContextKey {}, pid )
106+ if err := pid .Shutdown (ctx ); err != nil {
107+ logger .Err (ctx , err ).Msgf ("Failed to shutdown actor %s." , pid .Name ())
108+ }
109+ }
110+ }
131111}
132112
133113func Shutdown (ctx context.Context ) {
134114 if _actorSystem == nil {
135115 return
136116 }
137117
118+ beforeShutdown (ctx )
119+
138120 if err := _actorSystem .Stop (ctx ); err != nil {
139121 logger .Err (ctx , err ).Msg ("Failed to shutdown actor system." )
140122 }
0 commit comments