@@ -12,14 +12,14 @@ package actors
1212import (
1313 "context"
1414 "fmt"
15- "sync"
1615 "time"
1716
1817 "github.com/hypermodeinc/modus/runtime/logger"
1918 "github.com/hypermodeinc/modus/runtime/messages"
2019 "github.com/hypermodeinc/modus/runtime/plugins"
2120 "github.com/hypermodeinc/modus/runtime/utils"
2221 "github.com/hypermodeinc/modus/runtime/wasmhost"
22+ "github.com/puzpuzpuz/xsync/v4"
2323
2424 wasm "github.com/tetratelabs/wazero/api"
2525 goakt "github.com/tochemey/goakt/v3/actor"
@@ -46,10 +46,16 @@ func Activate(ctx context.Context, plugin *plugins.Plugin) error {
4646 return fmt .Errorf ("error registering agents: %w" , err )
4747 }
4848
49- for _ , agent := range agentsRegistered {
50- if err := agent .SpawnActor (ctx , plugin ); err != nil {
51- return err
49+ agentRegistry .Range (func (key string , agent * Agent ) bool {
50+ if agent .Pid == nil {
51+ err = agent .spawnActor (ctx , plugin )
52+ } else {
53+ err = agent .reloadModule (ctx , plugin )
5254 }
55+ return err != nil
56+ })
57+ if err != nil {
58+ return err
5359 }
5460
5561 return nil
@@ -61,7 +67,7 @@ type Agent struct {
6167 Name string
6268}
6369
64- func (agent * Agent ) SpawnActor (ctx context.Context , plugin * plugins.Plugin ) error {
70+ func (agent * Agent ) spawnActor (ctx context.Context , plugin * plugins.Plugin ) error {
6571 host := wasmhost .GetWasmHost (ctx )
6672 buffers := utils .NewOutputBuffers ()
6773 mod , err := host .GetModuleInstance (ctx , plugin , buffers )
@@ -79,21 +85,49 @@ func (agent *Agent) SpawnActor(ctx context.Context, plugin *plugins.Plugin) erro
7985 return nil
8086}
8187
82- var agentsRegistered = make ([] * Agent , 0 )
83- var mu sync. RWMutex
88+ func ( agent * Agent ) reloadModule ( ctx context. Context , plugin * plugins. Plugin ) error {
89+ actor := agent . Pid . Actor ().( * WasmAgentActor )
8490
85- func getAgent (id int32 ) (* Agent , error ) {
86- mu .RLock ()
87- defer mu .RUnlock ()
91+ logger .Info (ctx ).Str ("agent" , agent .Name ).Int32 ("agentId" , agent .Id ).Bool ("user_visible" , true ).Msg ("Reloading module for agent." )
92+
93+ // get the current state and close the module
94+ state , err := actor .getAgentState (ctx )
95+ if err != nil {
96+ return fmt .Errorf ("error getting agent state: %w" , err )
97+ }
98+ actor .module .Close (ctx )
99+
100+ // create a new module instance and assign it to the actor
101+ host := wasmhost .GetWasmHost (ctx )
102+ buffers := utils .NewOutputBuffers ()
103+ mod , err := host .GetModuleInstance (ctx , plugin , buffers )
104+ if err != nil {
105+ return err
106+ }
107+ actor .module = mod
108+
109+ // restore the state in the new module
110+ if err := actor .setAgentState (ctx , * state ); err != nil {
111+ return fmt .Errorf ("error setting agent state: %w" , err )
112+ }
88113
89- if id < 1 || int (id ) > len (agentsRegistered ) {
114+ return nil
115+ }
116+
117+ var agentRegistry = xsync .NewMap [string , * Agent ]()
118+
119+ func getAgent (ctx context.Context , id int32 ) (* Agent , error ) {
120+ if key , err := getAgentKey (ctx , id ); err != nil {
121+ return nil , err
122+ } else if agent , ok := agentRegistry .Load (key ); ok {
123+ return agent , nil
124+ } else {
90125 return nil , fmt .Errorf ("agent with id %d not found" , id )
91126 }
92- return agentsRegistered [id - 1 ], nil
93127}
94128
95- func getActorForAgent (agentId int32 ) (* WasmAgentActor , error ) {
96- agent , err := getAgent (agentId )
129+ func getActorForAgent (ctx context. Context , agentId int32 ) (* WasmAgentActor , error ) {
130+ agent , err := getAgent (ctx , agentId )
97131 if err != nil {
98132 return nil , err
99133 }
@@ -108,19 +142,42 @@ func getActorForAgent(agentId int32) (*WasmAgentActor, error) {
108142 return wasmActor , nil
109143}
110144
145+ func getAgentKey (ctx context.Context , agentId int32 ) (string , error ) {
146+ if plugin , ok := plugins .GetPluginFromContext (ctx ); ! ok {
147+ return "" , fmt .Errorf ("no plugin found in context" )
148+ } else {
149+ return fmt .Sprintf ("%s:%d" , plugin .Name (), agentId ), nil
150+ }
151+ }
152+
111153func RegisterAgent (ctx context.Context , agentId int32 , name string ) error {
112- mu .Lock ()
113- defer mu .Unlock ()
154+ key , err := getAgentKey (ctx , agentId )
155+ if err != nil {
156+ return err
157+ }
114158
115- agentsRegistered = append ( agentsRegistered , & Agent {
159+ agentRegistry . LoadOrStore ( key , & Agent {
116160 Id : agentId ,
117161 Name : name ,
118162 })
163+
164+ // actual, found := agentRegistry.LoadAndStore(key, &Agent{
165+ // Id: agentId,
166+ // Name: name,
167+ // })
168+
169+ // // If the actor already exists, we need to stop it before spawning a new one.
170+ // if found {
171+ // if err := actual.Pid.Shutdown(ctx); err != nil {
172+ // return fmt.Errorf("error shutting down existing agent %d: %w", agentId, err)
173+ // }
174+ // }
175+
119176 return nil
120177}
121178
122179func SendAgentMessage (ctx context.Context , agentId int32 , msgName string , data * string , timeout int64 ) (* string , error ) {
123- agent , err := getAgent (agentId )
180+ agent , err := getAgent (ctx , agentId )
124181 if err != nil {
125182 return nil , err
126183 }
@@ -154,7 +211,7 @@ func SendAgentMessage(ctx context.Context, agentId int32, msgName string, data *
154211}
155212
156213func GetAgentState (ctx context.Context , agentId int32 ) (* string , error ) {
157- actor , err := getActorForAgent (agentId )
214+ actor , err := getActorForAgent (ctx , agentId )
158215 if err != nil {
159216 return nil , err
160217 }
@@ -166,7 +223,7 @@ func GetAgentState(ctx context.Context, agentId int32) (*string, error) {
166223}
167224
168225func SetAgentState (ctx context.Context , agentId int32 , data string ) error {
169- actor , err := getActorForAgent (agentId )
226+ actor , err := getActorForAgent (ctx , agentId )
170227 if err != nil {
171228 return err
172229 }
0 commit comments