Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .trunk/configs/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@
"santhosh",
"schemagen",
"sentryhttp",
"sentryutils",
"sjson",
"somedbtype",
"sqlclient",
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ NOTE: all releases may include dependency updates, not specifically mentioned
## UNRELEASED

- feat: integrate try-as library [#912](https://github.com/hypermodeinc/modus/pull/912)
- feat: improve sentry usage [#931](https://github.com/hypermodeinc/modus/pull/931)

## 2025-07-03 - Runtime v0.18.3

Expand Down
2 changes: 0 additions & 2 deletions runtime/.goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ builds:
- arm64
mod_timestamp: "{{ .CommitTimestamp }}"
ldflags:
- -s
- -w
- -X github.com/hypermodeinc/modus/runtime/app.version={{.Version}}
- >-
{{- if eq .Os "windows"}}
Expand Down
2 changes: 1 addition & 1 deletion runtime/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
EXECUTABLE := modus_runtime
VERSION := $(shell git describe --tags --always --match 'runtime/*' | sed 's/^runtime\///')
LDFLAGS := -s -w -X github.com/hypermodeinc/modus/runtime/app.version=$(VERSION)
LDFLAGS := -X github.com/hypermodeinc/modus/runtime/app.version=$(VERSION)

ifneq ($(OS), Windows_NT)
OS := $(shell uname -s)
Expand Down
65 changes: 43 additions & 22 deletions runtime/actors/actorsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hypermodeinc/modus/runtime/messages"
"github.com/hypermodeinc/modus/runtime/pluginmanager"
"github.com/hypermodeinc/modus/runtime/plugins"
"github.com/hypermodeinc/modus/runtime/sentryutils"
"github.com/hypermodeinc/modus/runtime/utils"
"github.com/hypermodeinc/modus/runtime/wasmhost"

Expand All @@ -29,7 +30,7 @@ import (
var _actorSystem goakt.ActorSystem

func Initialize(ctx context.Context) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

wasmExt := &wasmExtension{
Expand All @@ -48,15 +49,21 @@ func Initialize(ctx context.Context) {

actorSystem, err := goakt.NewActorSystem("modus", opts...)
if err != nil {
logger.Fatal(ctx, err).Msg("Failed to create actor system.")
const msg = "Failed to create actor system."
sentryutils.CaptureError(ctx, err, msg)
logger.Fatal(ctx, err).Msg(msg)
}

if err := startActorSystem(ctx, actorSystem); err != nil {
logger.Fatal(ctx, err).Msg("Failed to start actor system.")
const msg = "Failed to start actor system."
sentryutils.CaptureError(ctx, err, msg)
logger.Fatal(ctx, err).Msg(msg)
}

if err := actorSystem.Inject(&wasmAgentInfo{}); err != nil {
logger.Fatal(ctx, err).Msg("Failed to inject wasm agent info into actor system.")
const msg = "Failed to inject wasm agent info into actor system."
sentryutils.CaptureError(ctx, err, msg)
logger.Fatal(ctx, err).Msg(msg)
}

_actorSystem = actorSystem
Expand Down Expand Up @@ -88,23 +95,27 @@ func startActorSystem(ctx context.Context, actorSystem goakt.ActorSystem) error
}

func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

// restart local actors that are already running, which will reload the plugin
actors := _actorSystem.Actors()
for _, pid := range actors {
if a, ok := pid.Actor().(*wasmAgentActor); ok {
if err := goakt.Tell(ctx, pid, &messages.RestartAgent{}); err != nil {
logger.Error(ctx, err).Str("agent_id", a.agentId).Msg("Failed to send restart agent message to actor.")
const msg = "Failed to send restart agent message to actor."
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", a.agentId))
logger.Error(ctx, err).Str("agent_id", a.agentId).Msg(msg)
}
}
}

// do this in a goroutine to avoid blocking the cluster engine startup
go func() {
if err := restoreAgentActors(ctx, plugin.Name()); err != nil {
logger.Error(ctx, err).Msg("Failed to restore agent actors.")
const msg = "Failed to restore agent actors."
sentryutils.CaptureError(ctx, err, msg)
logger.Error(ctx, err).Msg(msg)
}
}()

Expand All @@ -113,7 +124,7 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {

// restoreAgentActors spawn actors for agents with state in the database, that are not already running
func restoreAgentActors(ctx context.Context, pluginName string) error {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

logger.Debug(ctx).Msg("Restoring agent actors from database.")
Expand All @@ -133,11 +144,15 @@ func restoreAgentActors(ctx context.Context, pluginName string) error {
for _, agent := range agents {
actorName := getActorName(agent.Id)
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
logger.Error(ctx, err).Msgf("Failed to check if actor %s exists.", actorName)
const msg = "Failed to check if agent actor exists."
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", agent.Id))
logger.Error(ctx, err).Str("agent_id", agent.Id).Msg(msg)
} else if !exists {
err := spawnActorForAgent(ctx, pluginName, agent.Id, agent.Name, false)
if err != nil {
logger.Error(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id)
const msg = "Failed to spawn actor for agent."
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", agent.Id))
logger.Error(ctx, err).Str("agent_id", agent.Id).Msg(msg)
}
}
}
Expand All @@ -146,7 +161,7 @@ func restoreAgentActors(ctx context.Context, pluginName string) error {
}

func beforeShutdown(ctx context.Context) error {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

logger.Info(ctx).Msg("Actor system shutting down...")
Expand All @@ -159,29 +174,31 @@ func beforeShutdown(ctx context.Context) error {
if actor.status == AgentStatusRunning {
ctx := actor.augmentContext(ctx, pid)
if err := actor.suspendAgent(ctx); err != nil {
logger.Error(ctx, err).Str("agent_id", actor.agentId).Msg("Failed to suspend agent actor.")
const msg = "Failed to suspend agent actor."
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", actor.agentId))
logger.Error(ctx, err).Str("agent_id", actor.agentId).Msg(msg)
}
}
}
}

// Then shut down subscription actors. They will have received the suspend message already.
// Then shut down subscription actors. They will have received the suspend message already.
for _, pid := range actors {
if _, ok := pid.Actor().(*subscriptionActor); ok && pid.IsRunning() {
if a, ok := pid.Actor().(*subscriptionActor); ok && pid.IsRunning() {
if err := pid.Shutdown(ctx); err != nil {
logger.Error(ctx, err).Msgf("Failed to shutdown actor %s.", pid.Name())
const msg = "Failed to shut down subscription actor."
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("agent_id", a.agentId))
logger.Error(ctx, err).Str("agent_id", a.agentId).Msg(msg)
}
}
}

// waitForClusterSync()

// then allow the actor system to continue with its shutdown process
// Then allow the actor system to continue with its shutdown process.
return nil
}

// Waits for the peer sync interval to pass, allowing time for the actor system to synchronize its
// list of actors with the remote nodes in the cluster. Cancels early if the context is done.
// list of actors with the remote nodes in the cluster. Cancels early if the context is done.
func waitForClusterSync(ctx context.Context) {
if clusterEnabled() {
select {
Expand All @@ -193,15 +210,19 @@ func waitForClusterSync(ctx context.Context) {
}

func Shutdown(ctx context.Context) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

if _actorSystem == nil {
logger.Fatal(ctx).Msg("Actor system is not initialized, cannot shutdown.")
const msg = "Actor system is not initialized, cannot shutdown."
sentryutils.CaptureError(ctx, nil, msg)
logger.Fatal(ctx).Msg(msg)
}

if err := _actorSystem.Stop(ctx); err != nil {
logger.Error(ctx, err).Msg("Failed to shutdown actor system.")
const msg = "Failed to shutdown actor system."
sentryutils.CaptureError(ctx, err, msg)
logger.Error(ctx, err).Msg(msg)
}

logger.Info(ctx).Msg("Actor system shutdown complete.")
Expand Down
17 changes: 9 additions & 8 deletions runtime/actors/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/hypermodeinc/modus/runtime/logger"
"github.com/hypermodeinc/modus/runtime/messages"
"github.com/hypermodeinc/modus/runtime/plugins"
"github.com/hypermodeinc/modus/runtime/sentryutils"
"github.com/hypermodeinc/modus/runtime/utils"

goakt "github.com/tochemey/goakt/v3/actor"
Expand Down Expand Up @@ -63,7 +64,7 @@ const (
)

func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

plugin, ok := plugins.GetPluginFromContext(ctx)
Expand All @@ -80,7 +81,7 @@ func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) {
}

func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName string, initializing bool) error {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

ctx = context.WithoutCancel(ctx)
Expand Down Expand Up @@ -111,7 +112,7 @@ func spawnActorForAgent(ctx context.Context, pluginName, agentId, agentName stri
}

func StopAgent(ctx context.Context, agentId string) (*AgentInfo, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

actorName := getActorName(agentId)
Expand All @@ -137,7 +138,7 @@ func StopAgent(ctx context.Context, agentId string) (*AgentInfo, error) {
}

func getAgentInfoFromDatabase(ctx context.Context, agentId string) (*AgentInfo, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

if agent, e := db.GetAgentState(ctx, agentId); e == nil {
Expand All @@ -151,7 +152,7 @@ func getAgentInfoFromDatabase(ctx context.Context, agentId string) (*AgentInfo,
}

func GetAgentInfo(ctx context.Context, agentId string) (*AgentInfo, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

actorName := getActorName(agentId)
Expand Down Expand Up @@ -199,7 +200,7 @@ func newAgentMessageErrorResponse(errMsg string) *agentMessageResponse {
}

func SendAgentMessage(ctx context.Context, agentId string, msgName string, data *string, timeout int64) (*agentMessageResponse, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

actorName := getActorName(agentId)
Expand Down Expand Up @@ -234,7 +235,7 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data
}

func PublishAgentEvent(ctx context.Context, agentId, eventName string, eventData *string) error {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

var data any
Expand Down Expand Up @@ -304,7 +305,7 @@ func getAgentTopic(agentId string) string {
}

func ListActiveAgents(ctx context.Context) ([]AgentInfo, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

agents, err := db.QueryActiveAgents(ctx)
Expand Down
19 changes: 11 additions & 8 deletions runtime/actors/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/hypermodeinc/modus/runtime/app"
"github.com/hypermodeinc/modus/runtime/logger"
"github.com/hypermodeinc/modus/runtime/sentryutils"
"github.com/hypermodeinc/modus/runtime/utils"

goakt "github.com/tochemey/goakt/v3/actor"
Expand All @@ -31,7 +32,7 @@ import (
)

func clusterOptions(ctx context.Context) []goakt.Option {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

clusterMode := clusterMode()
Expand All @@ -52,7 +53,9 @@ func clusterOptions(ctx context.Context) []goakt.Option {

disco, err := newDiscoveryProvider(ctx, clusterMode, discoveryPort)
if err != nil {
logger.Fatal(ctx, err).Msg("Failed to create cluster discovery provider.")
const msg = "Failed to create cluster discovery provider."
sentryutils.CaptureError(ctx, err, msg, sentryutils.WithData("cluster_mode", clusterMode.String()))
logger.Fatal(ctx, err).Msg(msg)
}

return []goakt.Option{
Expand Down Expand Up @@ -237,7 +240,7 @@ func getPodLabels() map[string]string {
}

func newDiscoveryProvider(ctx context.Context, clusterMode goaktClusterMode, discoveryPort int) (discovery.Provider, error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

switch clusterMode {
Expand Down Expand Up @@ -299,21 +302,21 @@ type providerWrapper struct {
}

func (w *providerWrapper) Close() error {
span, _ := utils.NewSentrySpanForCurrentFunc(w.ctx)
span, _ := sentryutils.NewSpanForCurrentFunc(w.ctx)
defer span.Finish()

return w.provider.Close()
}

func (w *providerWrapper) Deregister() error {
span, _ := utils.NewSentrySpanForCurrentFunc(w.ctx)
span, _ := sentryutils.NewSpanForCurrentFunc(w.ctx)
defer span.Finish()

return w.provider.Deregister()
}

func (w *providerWrapper) DiscoverPeers() ([]string, error) {
span, _ := utils.NewSentrySpanForCurrentFunc(w.ctx)
span, _ := sentryutils.NewSpanForCurrentFunc(w.ctx)
defer span.Finish()

return w.provider.DiscoverPeers()
Expand All @@ -324,14 +327,14 @@ func (w *providerWrapper) ID() string {
}

func (w *providerWrapper) Initialize() error {
span, _ := utils.NewSentrySpanForCurrentFunc(w.ctx)
span, _ := sentryutils.NewSpanForCurrentFunc(w.ctx)
defer span.Finish()

return w.provider.Initialize()
}

func (w *providerWrapper) Register() error {
span, _ := utils.NewSentrySpanForCurrentFunc(w.ctx)
span, _ := sentryutils.NewSpanForCurrentFunc(w.ctx)
defer span.Finish()

return w.provider.Register()
Expand Down
6 changes: 3 additions & 3 deletions runtime/actors/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"fmt"
"time"

"github.com/hypermodeinc/modus/runtime/utils"
"github.com/hypermodeinc/modus/runtime/sentryutils"
goakt "github.com/tochemey/goakt/v3/actor"

"google.golang.org/protobuf/proto"
Expand All @@ -23,7 +23,7 @@ import (
// Sends a message to an actor identified by its name.
// Uses either Tell or RemoteTell based on whether the actor is local or remote.
func tell(ctx context.Context, actorName string, message proto.Message) error {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

addr, pid, err := _actorSystem.ActorOf(ctx, actorName)
Expand All @@ -40,7 +40,7 @@ func tell(ctx context.Context, actorName string, message proto.Message) error {
// Sends a message to an actor identified by its name, then waits for a response within the timeout duration.
// Uses either Ask or RemoteAsk based on whether the actor is local or remote.
func ask(ctx context.Context, actorName string, message proto.Message, timeout time.Duration) (response proto.Message, err error) {
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
span, ctx := sentryutils.NewSpanForCurrentFunc(ctx)
defer span.Finish()

addr, pid, err := _actorSystem.ActorOf(ctx, actorName)
Expand Down
Loading
Loading