Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .trunk/configs/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"alloc",
"allocs",
"Ammar",
"anypb",
"apikey",
"APPDATA",
"appinfo",
Expand All @@ -24,6 +25,7 @@
"buger",
"buildmode",
"cconf",
"cespare",
"checklinkname",
"chewxy",
"classid",
Expand All @@ -44,6 +46,7 @@
"dsname",
"dspc",
"dynamicmap",
"dynaport",
"envfiles",
"estree",
"euclidian",
Expand All @@ -57,6 +60,7 @@
"gitinfo",
"gjson",
"goakt",
"goaktpb",
"goarch",
"GOBIN",
"goccy",
Expand Down Expand Up @@ -168,6 +172,7 @@
"sslmode",
"stretchr",
"strs",
"structpb",
"subdirs",
"sublicensable",
"submatches",
Expand All @@ -179,9 +184,11 @@
"tetratelabs",
"textgeneration",
"tidwall",
"timestamppb",
"tinygo",
"tochemey",
"toolcalling",
"travisjeffery",
"tseslint",
"tsrv",
"typedarray",
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- tests: implement more extensive testing [#867](https://github.com/hypermodeinc/modus/pull/867)
- feat: remove embedded PostgresDB [#870](https://github.com/hypermodeinc/modus/pull/870)
- feat: delete collections features [#872](https://github.com/hypermodeinc/modus/pull/872)
- feat: stream agent events via subscriptions [#875](https://github.com/hypermodeinc/modus/pull/875)

## 2025-05-29 - Runtime 0.18.0-alpha.4

Expand Down
63 changes: 49 additions & 14 deletions runtime/actors/actorsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ package actors

import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/hypermodeinc/modus/runtime/db"
Expand All @@ -20,31 +23,62 @@ import (
"github.com/hypermodeinc/modus/runtime/wasmhost"

goakt "github.com/tochemey/goakt/v3/actor"
goakt_static "github.com/tochemey/goakt/v3/discovery/static"
goakt_remote "github.com/tochemey/goakt/v3/remote"
"github.com/travisjeffery/go-dynaport"
)

var _actorSystem goakt.ActorSystem

func Initialize(ctx context.Context) {

actorLogger := newActorLogger(logger.Get(ctx))

actorSystem, err := goakt.NewActorSystem("modus",
goakt.WithLogger(actorLogger),
opts := []goakt.Option{
goakt.WithLogger(newActorLogger(logger.Get(ctx))),
goakt.WithCoordinatedShutdown(beforeShutdown),
goakt.WithPassivationDisabled(), // TODO: enable passivation. Requires a persistence store in production for agent state.
goakt.WithActorInitTimeout(10*time.Second), // TODO: adjust this value, or make it configurable
goakt.WithActorInitMaxRetries(1)) // TODO: adjust this value, or make it configurable
goakt.WithPubSub(),
goakt.WithPassivation(time.Second * 10), // TODO: adjust this value, or make it configurable
goakt.WithActorInitTimeout(10 * time.Second), // TODO: adjust this value, or make it configurable
goakt.WithActorInitMaxRetries(1), // TODO: adjust this value, or make it configurable
}

if err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.")
// NOTE: we're not relying on cluster mode yet. The below code block is for future use and testing purposes only.
if clusterMode, _ := strconv.ParseBool(os.Getenv("MODUS_USE_CLUSTER_MODE")); clusterMode {
// TODO: static discovery should really only be used for local development and testing.
// In production, we should use a more robust discovery mechanism, such as Kubernetes or NATS.
// See https://tochemey.gitbook.io/goakt/features/service-discovery

// We just get three random ports for now.
// In prod, these will need to be configured so they are consistent across all nodes.
ports := dynaport.Get(3)
var gossip_port = ports[0]
var peers_port = ports[1]
var remoting_port = ports[2]

disco := goakt_static.NewDiscovery(&goakt_static.Config{
Hosts: []string{
fmt.Sprintf("localhost:%d", gossip_port),
},
})

opts = append(opts,
goakt.WithRemote(goakt_remote.NewConfig("localhost", remoting_port)),
goakt.WithCluster(goakt.NewClusterConfig().
WithDiscovery(disco).
WithDiscoveryPort(gossip_port).
WithPeersPort(peers_port).
WithKinds(&wasmAgentActor{}, &subscriptionActor{}),
),
)
}

if err := actorSystem.Start(ctx); err != nil {
if actorSystem, err := goakt.NewActorSystem("modus", opts...); err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.")
} else if err := actorSystem.Start(ctx); err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to start actor system.")
} else {
_actorSystem = actorSystem
}

_actorSystem = actorSystem

logger.Info(ctx).Msg("Actor system started.")

pluginmanager.RegisterPluginLoadedCallback(loadAgentActors)
Expand All @@ -64,7 +98,8 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
}

// spawn actors for agents with state in the database, that are not already running
// TODO: when we scale out with GoAkt cluster mode, we'll need to decide which node is responsible for spawning the actor
// TODO: when we scale out to allow more nodes in the cluster, we'll need to decide
// which node is responsible for spawning each actor.
agents, err := db.QueryActiveAgents(ctx)
if err != nil {
logger.Err(ctx, err).Msg("Failed to query agents from database.")
Expand All @@ -73,7 +108,7 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
host := wasmhost.GetWasmHost(ctx)
for _, agent := range agents {
if !runningAgents[agent.Id] {
spawnActorForAgent(host, plugin, agent.Id, agent.Name, true, &agent.Data)
spawnActorForAgentAsync(host, plugin, agent.Id, agent.Name, true, &agent.Data)
}
}

Expand Down
Loading
Loading