Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .trunk/configs/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@
"sandboxed",
"santhosh",
"schemagen",
"sentryhttp",
"sjson",
"somedbtype",
"sqlclient",
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ NOTE: all releases may include dependency updates, not specifically mentioned
- feat: integrate try-as library [#912](https://github.com/hypermodeinc/modus/pull/912)
- fix: check topic actor status before publishing events [#918](https://github.com/hypermodeinc/modus/pull/918)
- feat: update health endpoint [#924](https://github.com/hypermodeinc/modus/pull/924)
- feat: improve sentry and logging [#925](https://github.com/hypermodeinc/modus/pull/925)

## 2025-06-25 - Runtime v0.18.2

Expand Down
26 changes: 13 additions & 13 deletions runtime/actors/actorsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ func Initialize(ctx context.Context) {

actorSystem, err := goakt.NewActorSystem("modus", opts...)
if err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.")
logger.Fatal(ctx, err).Msg("Failed to create actor system.")
}

if err := startActorSystem(ctx, actorSystem); err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to start actor system.")
logger.Fatal(ctx, err).Msg("Failed to start actor system.")
}

if err := actorSystem.Inject(&wasmAgentInfo{}); err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to inject wasm agent info into actor system.")
logger.Fatal(ctx, err).Msg("Failed to inject wasm agent info into actor system.")
}

_actorSystem = actorSystem
Expand All @@ -67,12 +67,12 @@ func Initialize(ctx context.Context) {
}

func startActorSystem(ctx context.Context, actorSystem goakt.ActorSystem) error {
maxRetries := getIntFromEnv("MODUS_ACTOR_SYSTEM_START_MAX_RETRIES", 5)
retryInterval := getDurationFromEnv("MODUS_ACTOR_SYSTEM_START_RETRY_INTERVAL_SECONDS", 2, time.Second)
maxRetries := utils.GetIntFromEnv("MODUS_ACTOR_SYSTEM_START_MAX_RETRIES", 5)
retryInterval := utils.GetDurationFromEnv("MODUS_ACTOR_SYSTEM_START_RETRY_INTERVAL_SECONDS", 2, time.Second)

for i := range maxRetries {
if err := actorSystem.Start(ctx); err != nil {
logger.Warn(ctx).Err(err).Int("attempt", i+1).Msgf("Failed to start actor system, retrying in %s...", retryInterval)
logger.Warn(ctx, err).Int("attempt", i+1).Msgf("Failed to start actor system, retrying in %s...", retryInterval)
time.Sleep(retryInterval)
retryInterval *= 2 // Exponential backoff
continue
Expand All @@ -96,15 +96,15 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error {
for _, pid := range actors {
if a, ok := pid.Actor().(*wasmAgentActor); ok {
if err := goakt.Tell(ctx, pid, &messages.RestartAgent{}); err != nil {
logger.Err(ctx, err).Str("agent_id", a.agentId).Msg("Failed to send restart agent message to actor.")
logger.Error(ctx, err).Str("agent_id", a.agentId).Msg("Failed to send restart agent message to actor.")
}
}
}

// do this in a goroutine to avoid blocking the cluster engine startup
go func() {
if err := restoreAgentActors(ctx, plugin.Name()); err != nil {
logger.Err(ctx, err).Msg("Failed to restore agent actors.")
logger.Error(ctx, err).Msg("Failed to restore agent actors.")
}
}()

Expand Down Expand Up @@ -133,11 +133,11 @@ func restoreAgentActors(ctx context.Context, pluginName string) error {
for _, agent := range agents {
actorName := getActorName(agent.Id)
if exists, err := _actorSystem.ActorExists(ctx, actorName); err != nil {
logger.Err(ctx, err).Msgf("Failed to check if actor %s exists.", actorName)
logger.Error(ctx, err).Msgf("Failed to check if actor %s exists.", actorName)
} else if !exists {
err := spawnActorForAgent(ctx, pluginName, agent.Id, agent.Name, false)
if err != nil {
logger.Err(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id)
logger.Error(ctx, err).Msgf("Failed to spawn actor for agent %s.", agent.Id)
}
}
}
Expand All @@ -159,7 +159,7 @@ func beforeShutdown(ctx context.Context) error {
if actor.status == AgentStatusRunning {
ctx := actor.augmentContext(ctx, pid)
if err := actor.suspendAgent(ctx); err != nil {
logger.Err(ctx, err).Str("agent_id", actor.agentId).Msg("Failed to suspend agent actor.")
logger.Error(ctx, err).Str("agent_id", actor.agentId).Msg("Failed to suspend agent actor.")
}
}
}
Expand All @@ -169,7 +169,7 @@ func beforeShutdown(ctx context.Context) error {
for _, pid := range actors {
if _, ok := pid.Actor().(*subscriptionActor); ok && pid.IsRunning() {
if err := pid.Shutdown(ctx); err != nil {
logger.Err(ctx, err).Msgf("Failed to shutdown actor %s.", pid.Name())
logger.Error(ctx, err).Msgf("Failed to shutdown actor %s.", pid.Name())
}
}
}
Expand Down Expand Up @@ -201,7 +201,7 @@ func Shutdown(ctx context.Context) {
}

if err := _actorSystem.Stop(ctx); err != nil {
logger.Err(ctx, err).Msg("Failed to shutdown actor system.")
logger.Error(ctx, err).Msg("Failed to shutdown actor system.")
}

logger.Info(ctx).Msg("Actor system shutdown complete.")
Expand Down
18 changes: 9 additions & 9 deletions runtime/actors/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func clusterOptions(ctx context.Context) []goakt.Option {

disco, err := newDiscoveryProvider(ctx, clusterMode, discoveryPort)
if err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to create cluster discovery provider.")
logger.Fatal(ctx, err).Msg("Failed to create cluster discovery provider.")
}

return []goakt.Option{
Expand Down Expand Up @@ -166,9 +166,9 @@ func clusterPorts() (discoveryPort, remotingPort, peersPort int) {

// Get default ports dynamically, but use environment variables if set
ports := dynaport.Get(3)
discoveryPort = getIntFromEnv("MODUS_CLUSTER_DISCOVERY_PORT", ports[0])
remotingPort = getIntFromEnv("MODUS_CLUSTER_REMOTING_PORT", ports[1])
peersPort = getIntFromEnv("MODUS_CLUSTER_PEERS_PORT", ports[2])
discoveryPort = utils.GetIntFromEnv("MODUS_CLUSTER_DISCOVERY_PORT", ports[0])
remotingPort = utils.GetIntFromEnv("MODUS_CLUSTER_REMOTING_PORT", ports[1])
peersPort = utils.GetIntFromEnv("MODUS_CLUSTER_PEERS_PORT", ports[2])

return
}
Expand All @@ -179,7 +179,7 @@ func clusterPorts() (discoveryPort, remotingPort, peersPort int) {
// This value is also used for a sleep both on system startup and when spawning a new agent actor,
// so it needs to be low enough to not be noticed by the user.
func peerSyncInterval() time.Duration {
return getDurationFromEnv("MODUS_CLUSTER_PEER_SYNC_SECONDS", 1, time.Second)
return utils.GetDurationFromEnv("MODUS_CLUSTER_PEER_SYNC_SECONDS", 1, time.Second)
}

// nodesSyncInterval returns the interval at which the cluster forces a resync of the list of active nodes across the cluster.
Expand All @@ -189,7 +189,7 @@ func peerSyncInterval() time.Duration {
// On each interval, the node will sync its list of nodes with the cluster, and update its local state accordingly.
// The default is 10 seconds, which is a reasonable balance between responsiveness and network overhead.
func nodesSyncInterval() time.Duration {
return getDurationFromEnv("MODUS_CLUSTER_NODES_SYNC_SECONDS", 10, time.Second)
return utils.GetDurationFromEnv("MODUS_CLUSTER_NODES_SYNC_SECONDS", 10, time.Second)
}

// partitionCount returns the number of partitions the cluster will use for actor distribution.
Expand All @@ -199,19 +199,19 @@ func nodesSyncInterval() time.Duration {
// We'll use a slightly lower default of 13, which is still a prime number and should work well for most clusters.
// The GoAkt default is 271, but this has been found to lead to other errors in practice.
func partitionCount() uint64 {
return uint64(getIntFromEnv("MODUS_CLUSTER_PARTITION_COUNT", 13))
return uint64(utils.GetIntFromEnv("MODUS_CLUSTER_PARTITION_COUNT", 13))
}

// readTimeout returns the duration to wait for a cluster read operation before timing out.
// The default is 1 second, which should usually not need to be changed.
func readTimeout() time.Duration {
return getDurationFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 1, time.Second)
return utils.GetDurationFromEnv("MODUS_CLUSTER_READ_TIMEOUT_SECONDS", 1, time.Second)
}

// writeTimeout returns the duration to wait for a cluster write operation before timing out.
// The default is 1 second, which should usually not need to be changed.
func writeTimeout() time.Duration {
return getDurationFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 1, time.Second)
return utils.GetDurationFromEnv("MODUS_CLUSTER_WRITE_TIMEOUT_SECONDS", 1, time.Second)
}

func getPodLabels() map[string]string {
Expand Down
30 changes: 0 additions & 30 deletions runtime/actors/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,8 @@ package actors
import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/hypermodeinc/modus/runtime/logger"
"github.com/hypermodeinc/modus/runtime/utils"
goakt "github.com/tochemey/goakt/v3/actor"

Expand Down Expand Up @@ -60,30 +57,3 @@ func ask(ctx context.Context, actorName string, message proto.Message, timeout t
}
return nil, fmt.Errorf("failed to get address or PID for actor %s", actorName)
}

// Retrieves an integer value from an environment variable.
func getIntFromEnv(envVar string, defaultValue int) int {
str := os.Getenv(envVar)
if str == "" {
return defaultValue
}

value, err := strconv.Atoi(str)
if err != nil || value <= 0 {
logger.Warnf("Invalid value for %s. Using %d instead.", envVar, defaultValue)
return defaultValue
}

return value
}

// Retrieves a duration value from an environment variable.
func getDurationFromEnv(envVar string, defaultValue int, unit time.Duration) time.Duration {
intVal := getIntFromEnv(envVar, defaultValue)
if intVal <= 0 {
duration := time.Duration(defaultValue) * unit
logger.Warnf("Invalid value for %s. Using %s instead.", envVar, duration)
return duration
}
return time.Duration(intVal) * unit
}
4 changes: 2 additions & 2 deletions runtime/actors/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ func SubscribeForAgentEvents(ctx context.Context, agentId string, update func(da

unsubscribe := &goaktpb.Unsubscribe{Topic: topic}
if err := subActor.Tell(ctx, _actorSystem.TopicActor(), unsubscribe); err != nil {
logger.Err(ctx, err).Msg("Failed to unsubscribe from topic")
logger.Error(ctx, err).Msg("Failed to unsubscribe from topic")
}

if err := subActor.Shutdown(ctx); err != nil {
logger.Err(ctx, err).Msg("Failed to shut down subscription actor")
logger.Error(ctx, err).Msg("Failed to shut down subscription actor")
}
}()

Expand Down
15 changes: 12 additions & 3 deletions runtime/actors/wasmagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"strings"
"time"

"github.com/getsentry/sentry-go"
"github.com/hypermodeinc/modus/runtime/db"
"github.com/hypermodeinc/modus/runtime/logger"
"github.com/hypermodeinc/modus/runtime/messages"
Expand All @@ -37,12 +38,16 @@ type wasmAgentActor struct {
host wasmhost.WasmHost
module wasm.Module
buffers utils.OutputBuffers
sentryHub *sentry.Hub
initializing bool
}

func (a *wasmAgentActor) PreStart(ac *goakt.Context) error {
ctx := ac.Context()

a.sentryHub = sentry.CurrentHub().Clone()
ctx = sentry.SetHubOnContext(ctx, a.sentryHub)

span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

Expand Down Expand Up @@ -139,13 +144,14 @@ func (a *wasmAgentActor) Receive(rc *goakt.ReceiveContext) {

func (a *wasmAgentActor) PostStop(ac *goakt.Context) error {
ctx := ac.Context()
ctx = sentry.SetHubOnContext(ctx, a.sentryHub)
span, ctx := utils.NewSentrySpanForCurrentFunc(ctx)
defer span.Finish()

// suspend the agent if it's not already suspended or terminated
if a.status != AgentStatusSuspended && a.status != AgentStatusTerminated {
if err := a.suspendAgent(ctx); err != nil {
logger.Err(ctx, err).Msg("Error suspending agent.")
logger.Error(ctx, err).Msg("Error suspending agent.")
// don't return on error - we'll still try to deactivate the agent
}
}
Expand Down Expand Up @@ -194,7 +200,7 @@ func (a *wasmAgentActor) handleAgentRequest(ctx context.Context, rc *goakt.Recei
response.Data = result
default:
err := fmt.Errorf("unexpected result type: %T", result)
logger.Err(ctx, err).Msg("Error handling message.")
logger.Error(ctx, err).Msg("Error handling message.")
return err
}
}
Expand All @@ -209,7 +215,7 @@ func (a *wasmAgentActor) handleAgentRequest(ctx context.Context, rc *goakt.Recei

// save the state after handling the message to ensure the state is up to date in case of hard termination
if err := a.saveState(ctx); err != nil {
logger.Err(ctx, err).Msg("Error saving agent state.")
logger.Error(ctx, err).Msg("Error saving agent state.")
}

return nil
Expand Down Expand Up @@ -299,6 +305,9 @@ func (a *wasmAgentActor) augmentContext(ctx context.Context, pid *goakt.PID) con
if ctx.Value(pidContextKey{}) == nil {
ctx = context.WithValue(ctx, pidContextKey{}, pid)
}
if !sentry.HasHubOnContext(ctx) {
ctx = sentry.SetHubOnContext(ctx, a.sentryHub)
}
return ctx
}

Expand Down
2 changes: 1 addition & 1 deletion runtime/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func SetShuttingDown() {
}

// GetRootSourcePath returns the root path of the source code.
// It is used to trim the paths in stack traces when included in telemetry.
// It is used to trim the paths in stack traces when included in utils.
func GetRootSourcePath() string {
_, filename, _, ok := runtime.Caller(0)
if !ok {
Expand Down
6 changes: 3 additions & 3 deletions runtime/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ func Stop(ctx context.Context) {

func logDbWarningOrError(ctx context.Context, err error, msg string) {
if _, ok := err.(*pgconn.ConnectError); ok {
logger.Warn(ctx).Err(err).Msgf("Database connection error. %s", msg)
logger.Warn(ctx, err).Msgf("Database connection error. %s", msg)
} else if errors.Is(err, errDbNotConfigured) {
if !useModusDB() {
logger.Warn(ctx).Msgf("Database has not been configured. %s", msg)
}
} else {
// not really an error, but we log it as such
// but user-visible so it doesn't flag in Sentry
logger.Err(ctx, err).Bool("user_visible", true).Msg(msg)
logger.Error(ctx, err).Bool("user_visible", true).Msg(msg)
}
}

Expand All @@ -71,7 +71,7 @@ func Initialize(ctx context.Context) {
// this will initialize the pool and start the worker
_, err := globalRuntimePostgresWriter.GetPool(ctx)
if err != nil {
logger.Warn(ctx).Err(err).Msg("Metadata database is not available.")
logger.Warn(ctx, err).Msg("Metadata database is not available.")
}
go globalRuntimePostgresWriter.worker(ctx)
}
Expand Down
10 changes: 5 additions & 5 deletions runtime/db/modusdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func InitModusDb(ctx context.Context) {
}

if eng, err := modusgraph.NewEngine(modusgraph.NewDefaultConfig(dataDir)); err != nil {
logger.Fatal(ctx).Err(err).Msg("Failed to initialize the local modusGraph database.")
logger.Fatal(ctx, err).Msg("Failed to initialize the local modusGraph database.")
} else {
GlobalModusDbEngine = eng
}
Expand All @@ -58,15 +58,15 @@ func addToGitIgnore(ctx context.Context, rootPath, contents string) {
// if .gitignore file does not exist, create it and add contents to it
if _, err := os.Stat(gitIgnorePath); errors.Is(err, os.ErrNotExist) {
if err := os.WriteFile(gitIgnorePath, []byte(contents+"\n"), 0644); err != nil {
logger.Err(ctx, err).Msg("Failed to create .gitignore file.")
logger.Error(ctx, err).Msg("Failed to create .gitignore file.")
}
return
}

// check if contents are already in the .gitignore file
file, err := os.Open(gitIgnorePath)
if err != nil {
logger.Err(ctx, err).Msg("Failed to open .gitignore file.")
logger.Error(ctx, err).Msg("Failed to open .gitignore file.")
return
}
defer file.Close()
Expand All @@ -80,11 +80,11 @@ func addToGitIgnore(ctx context.Context, rootPath, contents string) {
// contents are not in the file, so append them
file, err = os.OpenFile(gitIgnorePath, os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
logger.Err(ctx, err).Msg("Failed to open .gitignore file.")
logger.Error(ctx, err).Msg("Failed to open .gitignore file.")
return
}
defer file.Close()
if _, err := file.WriteString("\n" + contents + "\n"); err != nil {
logger.Err(ctx, err).Msg("Failed to append " + contents + " to .gitignore file.")
logger.Error(ctx, err).Msg("Failed to append " + contents + " to .gitignore file.")
}
}
4 changes: 2 additions & 2 deletions runtime/dgraphclient/dgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (dc *dgraphConnector) execute(ctx context.Context, req *Request) (*Response
tx := dc.dgClient.NewReadOnlyTxn()
defer func() {
if err := tx.Discard(ctx); err != nil {
logger.Warn(ctx).Err(err).Msg("Error discarding transaction.")
logger.Warn(ctx, err).Msg("Error discarding transaction.")
return
}

Expand All @@ -86,7 +86,7 @@ func (dc *dgraphConnector) execute(ctx context.Context, req *Request) (*Response
tx := dc.dgClient.NewTxn()
defer func() {
if err := tx.Discard(ctx); err != nil {
logger.Warn(ctx).Err(err).Msg("Error discarding transaction.")
logger.Warn(ctx, err).Msg("Error discarding transaction.")
return
}

Expand Down
Loading
Loading