Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.

Commit 27331c1

Browse files
feat: stream agent events via subscriptions (#875)
1 parent 3bebc5f commit 27331c1

File tree

40 files changed

+1366
-654
lines changed

40 files changed

+1366
-654
lines changed

.trunk/configs/cspell.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"alloc",
88
"allocs",
99
"Ammar",
10+
"anypb",
1011
"apikey",
1112
"APPDATA",
1213
"appinfo",
@@ -24,6 +25,7 @@
2425
"buger",
2526
"buildmode",
2627
"cconf",
28+
"cespare",
2729
"checklinkname",
2830
"chewxy",
2931
"classid",
@@ -44,6 +46,7 @@
4446
"dsname",
4547
"dspc",
4648
"dynamicmap",
49+
"dynaport",
4750
"envfiles",
4851
"estree",
4952
"euclidian",
@@ -57,6 +60,7 @@
5760
"gitinfo",
5861
"gjson",
5962
"goakt",
63+
"goaktpb",
6064
"goarch",
6165
"GOBIN",
6266
"goccy",
@@ -168,6 +172,7 @@
168172
"sslmode",
169173
"stretchr",
170174
"strs",
175+
"structpb",
171176
"subdirs",
172177
"sublicensable",
173178
"submatches",
@@ -179,9 +184,11 @@
179184
"tetratelabs",
180185
"textgeneration",
181186
"tidwall",
187+
"timestamppb",
182188
"tinygo",
183189
"tochemey",
184190
"toolcalling",
191+
"travisjeffery",
185192
"tseslint",
186193
"tsrv",
187194
"typedarray",

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- tests: implement more extensive testing [#867](https://github.com/hypermodeinc/modus/pull/867)
88
- feat: remove embedded PostgresDB [#870](https://github.com/hypermodeinc/modus/pull/870)
99
- feat: delete collections features [#872](https://github.com/hypermodeinc/modus/pull/872)
10+
- feat: stream agent events via subscriptions [#875](https://github.com/hypermodeinc/modus/pull/875)
1011

1112
## 2025-05-29 - Runtime 0.18.0-alpha.4
1213

runtime/actors/actorsystem.go

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ package actors
1111

1212
import (
1313
"context"
14+
"fmt"
15+
"os"
16+
"strconv"
1417
"time"
1518

1619
"github.com/hypermodeinc/modus/runtime/db"
@@ -20,31 +23,62 @@ import (
2023
"github.com/hypermodeinc/modus/runtime/wasmhost"
2124

2225
goakt "github.com/tochemey/goakt/v3/actor"
26+
goakt_static "github.com/tochemey/goakt/v3/discovery/static"
27+
goakt_remote "github.com/tochemey/goakt/v3/remote"
28+
"github.com/travisjeffery/go-dynaport"
2329
)
2430

2531
var _actorSystem goakt.ActorSystem
2632

2733
func Initialize(ctx context.Context) {
2834

29-
actorLogger := newActorLogger(logger.Get(ctx))
30-
31-
actorSystem, err := goakt.NewActorSystem("modus",
32-
goakt.WithLogger(actorLogger),
35+
opts := []goakt.Option{
36+
goakt.WithLogger(newActorLogger(logger.Get(ctx))),
3337
goakt.WithCoordinatedShutdown(beforeShutdown),
34-
goakt.WithPassivationDisabled(), // TODO: enable passivation. Requires a persistence store in production for agent state.
35-
goakt.WithActorInitTimeout(10*time.Second), // TODO: adjust this value, or make it configurable
36-
goakt.WithActorInitMaxRetries(1)) // TODO: adjust this value, or make it configurable
38+
goakt.WithPubSub(),
39+
goakt.WithPassivation(time.Second * 10), // TODO: adjust this value, or make it configurable
40+
goakt.WithActorInitTimeout(10 * time.Second), // TODO: adjust this value, or make it configurable
41+
goakt.WithActorInitMaxRetries(1), // TODO: adjust this value, or make it configurable
42+
}
3743

38-
if err != nil {
39-
logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.")
44+
// NOTE: we're not relying on cluster mode yet. The below code block is for future use and testing purposes only.
45+
if clusterMode, _ := strconv.ParseBool(os.Getenv("MODUS_USE_CLUSTER_MODE")); clusterMode {
46+
// TODO: static discovery should really only be used for local development and testing.
47+
// In production, we should use a more robust discovery mechanism, such as Kubernetes or NATS.
48+
// See https://tochemey.gitbook.io/goakt/features/service-discovery
49+
50+
// We just get three random ports for now.
51+
// In prod, these will need to be configured so they are consistent across all nodes.
52+
ports := dynaport.Get(3)
53+
var gossip_port = ports[0]
54+
var peers_port = ports[1]
55+
var remoting_port = ports[2]
56+
57+
disco := goakt_static.NewDiscovery(&goakt_static.Config{
58+
Hosts: []string{
59+
fmt.Sprintf("localhost:%d", gossip_port),
60+
},
61+
})
62+
63+
opts = append(opts,
64+
goakt.WithRemote(goakt_remote.NewConfig("localhost", remoting_port)),
65+
goakt.WithCluster(goakt.NewClusterConfig().
66+
WithDiscovery(disco).
67+
WithDiscoveryPort(gossip_port).
68+
WithPeersPort(peers_port).
69+
WithKinds(&wasmAgentActor{}, &subscriptionActor{}),
70+
),
71+
)
4072
}
4173

42-
if err := actorSystem.Start(ctx); err != nil {
74+
if actorSystem, err := goakt.NewActorSystem("modus", opts...); err != nil {
75+
logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.")
76+
} else if err := actorSystem.Start(ctx); err != nil {
4377
logger.Fatal(ctx).Err(err).Msg("Failed to start actor system.")
78+
} else {
79+
_actorSystem = actorSystem
4480
}
4581

46-
_actorSystem = actorSystem
47-
4882
logger.Info(ctx).Msg("Actor system started.")
4983

5084
pluginmanager.RegisterPluginLoadedCallback(loadAgentActors)
@@ -64,7 +98,8 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
6498
}
6599

66100
// spawn actors for agents with state in the database, that are not already running
67-
// TODO: when we scale out with GoAkt cluster mode, we'll need to decide which node is responsible for spawning the actor
101+
// TODO: when we scale out to allow more nodes in the cluster, we'll need to decide
102+
// which node is responsible for spawning each actor.
68103
agents, err := db.QueryActiveAgents(ctx)
69104
if err != nil {
70105
logger.Err(ctx, err).Msg("Failed to query agents from database.")
@@ -73,7 +108,7 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
73108
host := wasmhost.GetWasmHost(ctx)
74109
for _, agent := range agents {
75110
if !runningAgents[agent.Id] {
76-
spawnActorForAgent(host, plugin, agent.Id, agent.Name, true, &agent.Data)
111+
spawnActorForAgentAsync(host, plugin, agent.Id, agent.Name, true, &agent.Data)
77112
}
78113
}
79114

0 commit comments

Comments
 (0)