diff --git a/.trunk/configs/cspell.json b/.trunk/configs/cspell.json index ea39cc23b..ca1706466 100644 --- a/.trunk/configs/cspell.json +++ b/.trunk/configs/cspell.json @@ -7,6 +7,7 @@ "alloc", "allocs", "Ammar", + "anypb", "apikey", "APPDATA", "appinfo", @@ -24,6 +25,7 @@ "buger", "buildmode", "cconf", + "cespare", "checklinkname", "chewxy", "classid", @@ -44,6 +46,7 @@ "dsname", "dspc", "dynamicmap", + "dynaport", "envfiles", "estree", "euclidian", @@ -57,6 +60,7 @@ "gitinfo", "gjson", "goakt", + "goaktpb", "goarch", "GOBIN", "goccy", @@ -168,6 +172,7 @@ "sslmode", "stretchr", "strs", + "structpb", "subdirs", "sublicensable", "submatches", @@ -179,9 +184,11 @@ "tetratelabs", "textgeneration", "tidwall", + "timestamppb", "tinygo", "tochemey", "toolcalling", + "travisjeffery", "tseslint", "tsrv", "typedarray", diff --git a/CHANGELOG.md b/CHANGELOG.md index f25651f90..d4cd227bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - tests: implement more extensive testing [#867](https://github.com/hypermodeinc/modus/pull/867) - feat: remove embedded PostgresDB [#870](https://github.com/hypermodeinc/modus/pull/870) - feat: delete collections features [#872](https://github.com/hypermodeinc/modus/pull/872) +- feat: stream agent events via subscriptions [#875](https://github.com/hypermodeinc/modus/pull/875) ## 2025-05-29 - Runtime 0.18.0-alpha.4 diff --git a/runtime/actors/actorsystem.go b/runtime/actors/actorsystem.go index b4a894ad4..53aebaddc 100644 --- a/runtime/actors/actorsystem.go +++ b/runtime/actors/actorsystem.go @@ -11,6 +11,9 @@ package actors import ( "context" + "fmt" + "os" + "strconv" "time" "github.com/hypermodeinc/modus/runtime/db" @@ -20,31 +23,62 @@ import ( "github.com/hypermodeinc/modus/runtime/wasmhost" goakt "github.com/tochemey/goakt/v3/actor" + goakt_static "github.com/tochemey/goakt/v3/discovery/static" + goakt_remote "github.com/tochemey/goakt/v3/remote" + "github.com/travisjeffery/go-dynaport" ) var _actorSystem goakt.ActorSystem func Initialize(ctx context.Context) { - actorLogger := newActorLogger(logger.Get(ctx)) - - actorSystem, err := goakt.NewActorSystem("modus", - goakt.WithLogger(actorLogger), + opts := []goakt.Option{ + goakt.WithLogger(newActorLogger(logger.Get(ctx))), goakt.WithCoordinatedShutdown(beforeShutdown), - goakt.WithPassivationDisabled(), // TODO: enable passivation. Requires a persistence store in production for agent state. - goakt.WithActorInitTimeout(10*time.Second), // TODO: adjust this value, or make it configurable - goakt.WithActorInitMaxRetries(1)) // TODO: adjust this value, or make it configurable + goakt.WithPubSub(), + goakt.WithPassivation(time.Second * 10), // TODO: adjust this value, or make it configurable + goakt.WithActorInitTimeout(10 * time.Second), // TODO: adjust this value, or make it configurable + goakt.WithActorInitMaxRetries(1), // TODO: adjust this value, or make it configurable + } - if err != nil { - logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.") + // NOTE: we're not relying on cluster mode yet. The below code block is for future use and testing purposes only. + if clusterMode, _ := strconv.ParseBool(os.Getenv("MODUS_USE_CLUSTER_MODE")); clusterMode { + // TODO: static discovery should really only be used for local development and testing. + // In production, we should use a more robust discovery mechanism, such as Kubernetes or NATS. + // See https://tochemey.gitbook.io/goakt/features/service-discovery + + // We just get three random ports for now. + // In prod, these will need to be configured so they are consistent across all nodes. + ports := dynaport.Get(3) + var gossip_port = ports[0] + var peers_port = ports[1] + var remoting_port = ports[2] + + disco := goakt_static.NewDiscovery(&goakt_static.Config{ + Hosts: []string{ + fmt.Sprintf("localhost:%d", gossip_port), + }, + }) + + opts = append(opts, + goakt.WithRemote(goakt_remote.NewConfig("localhost", remoting_port)), + goakt.WithCluster(goakt.NewClusterConfig(). + WithDiscovery(disco). + WithDiscoveryPort(gossip_port). + WithPeersPort(peers_port). + WithKinds(&wasmAgentActor{}, &subscriptionActor{}), + ), + ) } - if err := actorSystem.Start(ctx); err != nil { + if actorSystem, err := goakt.NewActorSystem("modus", opts...); err != nil { + logger.Fatal(ctx).Err(err).Msg("Failed to create actor system.") + } else if err := actorSystem.Start(ctx); err != nil { logger.Fatal(ctx).Err(err).Msg("Failed to start actor system.") + } else { + _actorSystem = actorSystem } - _actorSystem = actorSystem - logger.Info(ctx).Msg("Actor system started.") pluginmanager.RegisterPluginLoadedCallback(loadAgentActors) @@ -64,7 +98,8 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { } // spawn actors for agents with state in the database, that are not already running - // TODO: when we scale out with GoAkt cluster mode, we'll need to decide which node is responsible for spawning the actor + // TODO: when we scale out to allow more nodes in the cluster, we'll need to decide + // which node is responsible for spawning each actor. agents, err := db.QueryActiveAgents(ctx) if err != nil { logger.Err(ctx, err).Msg("Failed to query agents from database.") @@ -73,7 +108,7 @@ func loadAgentActors(ctx context.Context, plugin *plugins.Plugin) error { host := wasmhost.GetWasmHost(ctx) for _, agent := range agents { if !runningAgents[agent.Id] { - spawnActorForAgent(host, plugin, agent.Id, agent.Name, true, &agent.Data) + spawnActorForAgentAsync(host, plugin, agent.Id, agent.Name, true, &agent.Data) } } diff --git a/runtime/actors/agents.go b/runtime/actors/agents.go index b11f51a6a..537b40834 100644 --- a/runtime/actors/agents.go +++ b/runtime/actors/agents.go @@ -11,8 +11,8 @@ package actors import ( "context" + "errors" "fmt" - "strings" "time" "github.com/hypermodeinc/modus/runtime/db" @@ -21,10 +21,13 @@ import ( "github.com/hypermodeinc/modus/runtime/plugins" "github.com/hypermodeinc/modus/runtime/utils" "github.com/hypermodeinc/modus/runtime/wasmhost" - "github.com/rs/xid" - wasm "github.com/tetratelabs/wazero/api" + "github.com/rs/xid" goakt "github.com/tochemey/goakt/v3/actor" + "github.com/tochemey/goakt/v3/goaktpb" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" ) type AgentInfo struct { @@ -45,6 +48,8 @@ const ( AgentStatusTerminated AgentStatus = "terminated" ) +const agentStatusEventName = "agentStatusUpdated" + func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) { plugin, ok := plugins.GetPluginFromContext(ctx) if !ok { @@ -53,7 +58,7 @@ func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) { agentId := xid.New().String() host := wasmhost.GetWasmHost(ctx) - spawnActorForAgent(host, plugin, agentId, agentName, false, nil) + spawnActorForAgentAsync(host, plugin, agentId, agentName, false, nil) info := &AgentInfo{ Id: agentId, @@ -64,35 +69,42 @@ func StartAgent(ctx context.Context, agentName string) (*AgentInfo, error) { return info, nil } -func spawnActorForAgent(host wasmhost.WasmHost, plugin *plugins.Plugin, agentId, agentName string, resuming bool, initialState *string) { - // The actor needs to spawn in its own context, so we don't pass one in to this function. - // If we did, then when the original context was cancelled or completed, the actor initialization would be cancelled too. - +func spawnActorForAgentAsync(host wasmhost.WasmHost, plugin *plugins.Plugin, agentId, agentName string, resuming bool, initialState *string) { // We spawn the actor in a goroutine to avoid blocking while the actor is being spawned. // This allows many agents to be spawned in parallel, if needed. + // Errors are logged but not returned, as the actor system will handle them. go func() { - ctx := context.Background() - ctx = context.WithValue(ctx, utils.WasmHostContextKey, host) - ctx = context.WithValue(ctx, utils.PluginContextKey, plugin) - ctx = context.WithValue(ctx, utils.AgentIdContextKey, agentId) - ctx = context.WithValue(ctx, utils.AgentNameContextKey, agentName) - - actor := newWasmAgentActor(agentId, agentName, host, plugin) - actorName := getActorName(agentId) - - if resuming { - actor.status = AgentStatusResuming - actor.initialState = initialState - } else { - actor.status = AgentStatusStarting - } - - if _, err := _actorSystem.Spawn(ctx, actorName, actor); err != nil { - logger.Err(ctx, err).Msg("Error spawning actor for agent.") - } + _, _ = spawnActorForAgent(host, plugin, agentId, agentName, resuming, initialState) }() } +func spawnActorForAgent(host wasmhost.WasmHost, plugin *plugins.Plugin, agentId, agentName string, resuming bool, initialState *string) (*goakt.PID, error) { + // The actor needs to spawn in its own context, so we don't pass one in to this function. + // If we did, then when the original context was cancelled or completed, the actor initialization would be cancelled too. + ctx := context.Background() + ctx = context.WithValue(ctx, utils.WasmHostContextKey, host) + ctx = context.WithValue(ctx, utils.PluginContextKey, plugin) + ctx = context.WithValue(ctx, utils.AgentIdContextKey, agentId) + ctx = context.WithValue(ctx, utils.AgentNameContextKey, agentName) + + actor := newWasmAgentActor(agentId, agentName, host, plugin) + actorName := getActorName(agentId) + + // note, don't call updateStatus here, because we can't publish an event yet + if resuming { + actor.status = AgentStatusResuming + actor.initialState = initialState + } else { + actor.status = AgentStatusStarting + } + + pid, err := _actorSystem.Spawn(ctx, actorName, actor) + if err != nil { + logger.Err(ctx, err).Msg("Error spawning actor for agent.") + } + return pid, err +} + func StopAgent(ctx context.Context, agentId string) (*AgentInfo, error) { pid, err := getActorPid(ctx, agentId) if err != nil { @@ -110,7 +122,7 @@ func StopAgent(ctx context.Context, agentId string) (*AgentInfo, error) { // it was found, so we can stop it actor := pid.Actor().(*wasmAgentActor) - actor.status = AgentStatusStopping + actor.updateStatus(ctx, AgentStatusStopping) if err := pid.Shutdown(ctx); err != nil { return nil, fmt.Errorf("error stopping agent %s: %w", agentId, err) } @@ -148,14 +160,16 @@ func GetAgentInfo(ctx context.Context, agentId string) (*AgentInfo, error) { } type agentMessageResponse struct { - Data *string + Data *string + Error *string } func SendAgentMessage(ctx context.Context, agentId string, msgName string, data *string, timeout int64) (*agentMessageResponse, error) { pid, err := getActorPid(ctx, agentId) if err != nil { - return nil, err + e := err.Error() + return &agentMessageResponse{Error: &e}, err } msg := &messages.AgentRequestMessage{ @@ -177,412 +191,142 @@ func SendAgentMessage(ctx context.Context, agentId string, msgName string, data } if response, ok := res.(*messages.AgentResponseMessage); ok { - return &agentMessageResponse{response.Data}, nil + return &agentMessageResponse{Data: response.Data}, nil } else { return nil, fmt.Errorf("unexpected response type: %T", res) } } -func getActorName(agentId string) string { - return "agent-" + agentId -} +func PublishAgentEvent(ctx context.Context, agentId, eventName string, eventData *string) error { -func getActorPid(ctx context.Context, agentId string) (*goakt.PID, error) { - - addr, pid, err := _actorSystem.ActorOf(ctx, getActorName(agentId)) - if err != nil { - if strings.HasSuffix(err.Error(), " not found") { - return nil, fmt.Errorf("agent %s not found", agentId) - } else { - return nil, fmt.Errorf("error getting actor for agent %s: %w", agentId, err) + var data any + if eventData != nil { + if err := utils.JsonDeserialize([]byte(*eventData), &data); err != nil { + return fmt.Errorf("error deserializing event data: %w", err) } } - _ = addr // TODO: this will be used when we implement remote actors with clustering - - return pid, nil -} - -func ListActiveAgents(ctx context.Context) ([]AgentInfo, error) { - agents, err := db.QueryActiveAgents(ctx) + dataValue, err := structpb.NewValue(data) if err != nil { - return nil, fmt.Errorf("error listing active agents: %w", err) - } - - results := make([]AgentInfo, 0, len(agents)) - for _, agent := range agents { - results = append(results, AgentInfo{ - Id: agent.Id, - Name: agent.Name, - Status: agent.Status, - }) - } - - return results, nil -} - -func ListLocalAgents() []AgentInfo { - if _actorSystem == nil { - return nil - } - - actors := _actorSystem.Actors() - results := make([]AgentInfo, 0, len(actors)) - - for _, pid := range actors { - if actor, ok := pid.Actor().(*wasmAgentActor); ok { - results = append(results, AgentInfo{ - Id: actor.agentId, - Name: actor.agentName, - Status: actor.status, - }) - } + return fmt.Errorf("error creating event data value: %w", err) } - return results -} - -type wasmAgentActor struct { - agentId string - agentName string - status AgentStatus - plugin *plugins.Plugin - host wasmhost.WasmHost - module wasm.Module - buffers utils.OutputBuffers - initialState *string -} - -func newWasmAgentActor(agentId, agentName string, host wasmhost.WasmHost, plugin *plugins.Plugin) *wasmAgentActor { - return &wasmAgentActor{ - agentId: agentId, - agentName: agentName, - host: host, - plugin: plugin, - } -} - -func (a *wasmAgentActor) PreStart(ac *goakt.Context) error { - ctx := a.newContext() - - switch a.status { - case AgentStatusStarting: - logger.Info(ctx).Msg("Starting agent.") - case AgentStatusResuming, AgentStatusSuspended: - a.status = AgentStatusResuming - logger.Info(ctx).Msg("Resuming agent.") - default: - return fmt.Errorf("invalid agent status for actor PreStart: %s", a.status) + event := &messages.AgentEventMessage{ + Name: eventName, + Data: dataValue, + Timestamp: timestamppb.Now(), } - if err := a.saveState(ctx); err != nil { - logger.Err(ctx, err).Msg("Error saving agent state.") + eventMsg, err := anypb.New(event) + if err != nil { + return fmt.Errorf("error creating event message: %w", err) } - start := time.Now() - - a.buffers = utils.NewOutputBuffers() - if mod, err := a.host.GetModuleInstance(ctx, a.plugin, a.buffers); err != nil { - return err - } else { - a.module = mod + pubMsg := &goaktpb.Publish{ + Id: xid.New().String(), + Topic: getAgentTopic(agentId), + Message: eventMsg, } - if err := a.activateAgent(ctx); err != nil { - logger.Err(ctx, err).Msg("Error activating agent.") + pid, err := getActorPid(ctx, agentId) + if err != nil { return err } - if a.status == AgentStatusResuming { - if err := a.setAgentState(ctx, a.initialState); err != nil { - logger.Err(ctx, err).Msg("Error resuming agent state.") - } - a.initialState = nil - } - - duration := time.Since(start) - if a.status == AgentStatusResuming { - logger.Info(ctx).Msg("Agent resumed successfully.") - } else { - logger.Info(ctx).Dur("duration_ms", duration).Msg("Agent started successfully.") - } - - a.status = AgentStatusRunning - - if err := a.saveState(ctx); err != nil { - logger.Err(ctx, err).Msg("Error saving agent state.") - } - - return nil + topicActor := _actorSystem.TopicActor() + return pid.Tell(ctx, topicActor, pubMsg) } -func (a *wasmAgentActor) PostStop(ac *goakt.Context) error { - ctx := a.newContext() - defer a.module.Close(ctx) - - switch a.status { - case AgentStatusRunning, AgentStatusSuspending: - a.status = AgentStatusSuspending - logger.Info(ctx).Msg("Suspending agent.") - case AgentStatusStopping: - logger.Info(ctx).Msg("Stopping agent.") - - default: - return fmt.Errorf("invalid agent status for actor PostStop: %s", a.status) - } - - if err := a.saveState(ctx); err != nil { - logger.Err(ctx, err).Msg("Error saving agent state.") - } - - start := time.Now() - - if err := a.shutdownAgent(ctx); err != nil { - logger.Err(ctx, err).Msg("Error shutting down agent.") - return err - } - - duration := time.Since(start) - switch a.status { - case AgentStatusSuspending: - a.status = AgentStatusSuspended - if err := a.saveState(ctx); err != nil { - return err - } - logger.Info(ctx).Msg("Agent suspended successfully.") - case AgentStatusStopping: - a.status = AgentStatusTerminated - if err := a.saveState(ctx); err != nil { - return err - } - logger.Info(ctx).Dur("duration_ms", duration).Msg("Agent terminated successfully.") - default: - return fmt.Errorf("invalid agent status for actor PostStop: %s", a.status) - } +func getActorName(agentId string) string { + return "agent-" + agentId +} - return nil +func getAgentTopic(agentId string) string { + return getActorName(agentId) + ".events" } -func (a *wasmAgentActor) saveState(ctx context.Context) error { - var data string - if a.module != nil { - if d, err := a.getAgentState(ctx); err != nil { - return fmt.Errorf("error getting state from agent: %w", err) - } else if d != nil { - data = *d - } +func getActorPid(ctx context.Context, agentId string) (*goakt.PID, error) { + if _, err := xid.FromString(agentId); err != nil { + return nil, fmt.Errorf("invalid agent ID format: %s", agentId) } - if err := db.WriteAgentState(ctx, db.AgentState{ - Id: a.agentId, - Name: a.agentName, - Status: a.status, - Data: data, - UpdatedAt: time.Now().UTC().Format(time.RFC3339), - }); err != nil { - return fmt.Errorf("error saving state to database: %w", err) + actorName := getActorName(agentId) + _, pid, err := _actorSystem.ActorOf(ctx, actorName) + if err == nil { + return pid, nil } - return nil -} - -func (a *wasmAgentActor) Receive(rc *goakt.ReceiveContext) { - ctx := a.newContext() - - switch msg := rc.Message().(type) { - case *messages.AgentRequestMessage: + if errors.Is(err, goakt.ErrActorNotFound) { + // the actor might have been suspended or terminated, so check the database + if agent, dbErr := db.GetAgentState(ctx, agentId); dbErr == nil { + switch agent.Status { - logger.Info(ctx).Str("msg_name", msg.Name).Msg("Received message.") - start := time.Now() - - fnInfo, err := a.host.GetFunctionInfo("_modus_agent_handle_message") - if err != nil { - rc.Err(err) - return - } + case AgentStatusSuspended: + // the actor is suspended, so we can resume it (if we're in the right context) + if host, ok := wasmhost.TryGetWasmHost(ctx); ok { + plugin, ok := plugins.GetPluginFromContext(ctx) + if !ok { + return nil, fmt.Errorf("no plugin found in context for agent %s", agentId) + } + return spawnActorForAgent(host, plugin, agentId, agent.Name, true, &agent.Data) + } - params := map[string]any{ - "msgName": msg.Name, - "data": msg.Data, - } + return nil, fmt.Errorf("agent %s is suspended", agentId) - execInfo, err := a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) - if err != nil { - rc.Err(err) - return - } + case AgentStatusTerminated: + return nil, fmt.Errorf("agent %s is terminated", agentId) - if msg.Respond { - result := execInfo.Result() - response := &messages.AgentResponseMessage{} - if result != nil { - switch result := result.(type) { - case string: - response.Data = &result - case *string: - response.Data = result - default: - err := fmt.Errorf("unexpected result type: %T", result) - logger.Err(ctx, err).Msg("Error handling message.") - rc.Err(err) - return + default: + // try one more time in case the actor was activated just after the initial check + if _, pid, err := _actorSystem.ActorOf(ctx, actorName); err == nil { + return pid, nil } - } - rc.Response(response) - } - - duration := time.Since(start) - logger.Info(ctx).Str("msg_name", msg.Name).Dur("duration_ms", duration).Msg("Message handled successfully.") - // save the state after handling the message to ensure the state is up to date in case of hard termination - if err := a.saveState(ctx); err != nil { - logger.Err(ctx, err).Msg("Error saving agent state.") + // this means the agent is running on another node - TODO: handle this somehow + return nil, fmt.Errorf("agent %s is %s, but not found in local actor system", agentId, agent.Status) + } } - - default: - rc.Unhandled() + return nil, fmt.Errorf("agent %s not found", agentId) } -} -func (a *wasmAgentActor) newContext() context.Context { - ctx := context.Background() - ctx = context.WithValue(ctx, utils.WasmHostContextKey, a.host) - ctx = context.WithValue(ctx, utils.PluginContextKey, a.plugin) - ctx = context.WithValue(ctx, utils.AgentIdContextKey, a.agentId) - ctx = context.WithValue(ctx, utils.AgentNameContextKey, a.agentName) - return ctx + return nil, fmt.Errorf("error getting actor for agent %s: %w", agentId, err) } -func (a *wasmAgentActor) activateAgent(ctx context.Context) error { - - fnInfo, err := a.host.GetFunctionInfo("_modus_agent_activate") - if err != nil { - return err - } - - params := map[string]any{ - "name": a.agentName, - "id": a.agentId, - "reloading": a.status == AgentStatusResuming, - } - - execInfo, err := a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) - - _ = execInfo // TODO - return err -} - -func (a *wasmAgentActor) shutdownAgent(ctx context.Context) error { - - fnInfo, err := a.host.GetFunctionInfo("_modus_agent_shutdown") - if err != nil { - return err - } - - params := map[string]any{ - "suspending": a.status == AgentStatusSuspending, - } - - execInfo, err := a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) - - _ = execInfo // TODO - return err -} - -func (a *wasmAgentActor) getAgentState(ctx context.Context) (*string, error) { - - fnInfo, err := a.host.GetFunctionInfo("_modus_agent_get_state") - if err != nil { - return nil, err - } - - execInfo, err := a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, nil) - if err != nil { - return nil, err - } - - result := execInfo.Result() - if result == nil { - return nil, nil - } - - switch state := result.(type) { - case string: - return &state, nil - case *string: - return state, nil - default: - return nil, fmt.Errorf("unexpected result type: %T", result) - } -} - -func (a *wasmAgentActor) setAgentState(ctx context.Context, data *string) error { - fnInfo, err := a.host.GetFunctionInfo("_modus_agent_set_state") +func ListActiveAgents(ctx context.Context) ([]AgentInfo, error) { + agents, err := db.QueryActiveAgents(ctx) if err != nil { - return err - } - - params := map[string]any{ - "data": data, + return nil, fmt.Errorf("error listing active agents: %w", err) } - execInfo, err := a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) - if err != nil { - return err + results := make([]AgentInfo, 0, len(agents)) + for _, agent := range agents { + results = append(results, AgentInfo{ + Id: agent.Id, + Name: agent.Name, + Status: agent.Status, + }) } - _ = execInfo // TODO - return err + return results, nil } -func (a *wasmAgentActor) reloadModule(ctx context.Context, plugin *plugins.Plugin) error { - - // the context may not have these values set - ctx = context.WithValue(ctx, utils.PluginContextKey, a.plugin) - ctx = context.WithValue(ctx, utils.AgentIdContextKey, a.agentId) - ctx = context.WithValue(ctx, utils.AgentNameContextKey, a.agentName) - - logger.Info(ctx).Msg("Reloading module for agent.") - - a.status = AgentStatusSuspending - if err := a.shutdownAgent(ctx); err != nil { - logger.Err(ctx, err).Msg("Error shutting down agent.") - return err - } - - // get the current state and close the module instance - state, err := a.getAgentState(ctx) - if err != nil { - logger.Err(ctx, err).Msg("Error getting agent state.") - return err - } - a.module.Close(ctx) - a.status = AgentStatusSuspended - - // create a new module instance and assign it to the actor - a.plugin = plugin - a.buffers = utils.NewOutputBuffers() - mod, err := a.host.GetModuleInstance(ctx, a.plugin, a.buffers) - if err != nil { - return err +func ListLocalAgents() []AgentInfo { + if _actorSystem == nil { + return nil } - a.module = mod - // activate the agent in the new module instance - a.status = AgentStatusResuming - if err := a.activateAgent(ctx); err != nil { - logger.Err(ctx, err).Msg("Error reloading agent.") - return err - } + actors := _actorSystem.Actors() + results := make([]AgentInfo, 0, len(actors)) - // restore the state in the new module instance - if err := a.setAgentState(ctx, state); err != nil { - logger.Err(ctx, err).Msg("Error setting agent state.") - return err + for _, pid := range actors { + if actor, ok := pid.Actor().(*wasmAgentActor); ok { + results = append(results, AgentInfo{ + Id: actor.agentId, + Name: actor.agentName, + Status: actor.status, + }) + } } - a.status = AgentStatusRunning - logger.Info(ctx).Msg("Agent reloaded module successfully.") - - return nil + return results } diff --git a/runtime/actors/subscriber.go b/runtime/actors/subscriber.go new file mode 100644 index 000000000..655d75916 --- /dev/null +++ b/runtime/actors/subscriber.go @@ -0,0 +1,124 @@ +/* + * Copyright 2025 Hypermode Inc. + * Licensed under the terms of the Apache License, Version 2.0 + * See the LICENSE file that accompanied this code for further details. + * + * SPDX-FileCopyrightText: 2025 Hypermode Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package actors + +import ( + "context" + "fmt" + "time" + + "github.com/hypermodeinc/modus/runtime/logger" + "github.com/hypermodeinc/modus/runtime/messages" + "github.com/hypermodeinc/modus/runtime/utils" + + "github.com/rs/xid" + goakt "github.com/tochemey/goakt/v3/actor" + "github.com/tochemey/goakt/v3/goaktpb" +) + +type agentEvent struct { + Name string `json:"name"` + Data any `json:"data"` + Timestamp string `json:"timestamp"` +} + +func SubscribeForAgentEvents(ctx context.Context, agentId string, update func(data []byte), done func()) error { + + if a, err := GetAgentInfo(ctx, agentId); err != nil { + return err + } else if a.Status == AgentStatusStopping || a.Status == AgentStatusTerminated { + return fmt.Errorf("agent %s is %s, cannot subscribe to events", agentId, a.Status) + } + + if update == nil { + update = func(data []byte) {} + } + if done == nil { + done = func() {} + } + actor := &subscriptionActor{ + update: update, + done: done, + } + + // Spawn a subscription actor that is bound to the graphql subscription on this node. + // It needs to be long-lived, because it will need to stay alive as long as the client is connected. + // It cannot be relocated to another node, because it is bound to http request of the GraphQL subscription on this node. + // It cannot be spawned as a child of the agent actor, because the subscription needs to be maintained even if the agent actor is suspended. + + actorName := "subscription-" + xid.New().String() + subActor, err := _actorSystem.Spawn(ctx, actorName, actor, goakt.WithLongLived(), goakt.WithRelocationDisabled()) + if err != nil { + return fmt.Errorf("failed to spawn subscription actor: %w", err) + } + + subMsg := &goaktpb.Subscribe{ + Topic: getAgentTopic(agentId), + } + + if err := subActor.Tell(ctx, _actorSystem.TopicActor(), subMsg); err != nil { + return fmt.Errorf("failed to subscribe to topic: %w", err) + } + + // When the context is done, we will stop the subscription actor. + // For example, the GraphQL subscription is closed or the client disconnects. + go func() { + <-ctx.Done() + + // a new context is needed because the original context is already done + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if err := subActor.Shutdown(ctx); err != nil { + logger.Err(ctx, err).Msg("Failed to shut down subscription actor") + } + }() + + return nil +} + +type subscriptionActor struct { + update func(data []byte) + done func() +} + +func (a *subscriptionActor) PreStart(ac *goakt.Context) error { + return nil +} + +func (a *subscriptionActor) PostStop(ac *goakt.Context) error { + a.done() + return nil +} + +func (a *subscriptionActor) Receive(rc *goakt.ReceiveContext) { + if msg, ok := rc.Message().(*messages.AgentEventMessage); ok { + event := &agentEvent{ + Name: msg.Name, + Data: msg.Data, + Timestamp: msg.Timestamp.AsTime().Format(time.RFC3339), + } + if data, err := utils.JsonSerialize(event); err != nil { + rc.Err(fmt.Errorf("failed to serialize agent event message: %w", err)) + } else { + a.update(data) + } + + if msg.Name == agentStatusEventName { + status := msg.Data.GetStructValue().Fields["status"].GetStringValue() + if status == AgentStatusTerminated { + rc.Shutdown() + } + } + + return + } + + rc.Unhandled() +} diff --git a/runtime/actors/wasmagent.go b/runtime/actors/wasmagent.go new file mode 100644 index 000000000..e86d71b27 --- /dev/null +++ b/runtime/actors/wasmagent.go @@ -0,0 +1,378 @@ +/* + * Copyright 2025 Hypermode Inc. + * Licensed under the terms of the Apache License, Version 2.0 + * See the LICENSE file that accompanied this code for further details. + * + * SPDX-FileCopyrightText: 2025 Hypermode Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package actors + +import ( + "context" + "fmt" + "time" + + "github.com/hypermodeinc/modus/runtime/db" + "github.com/hypermodeinc/modus/runtime/logger" + "github.com/hypermodeinc/modus/runtime/messages" + "github.com/hypermodeinc/modus/runtime/plugins" + "github.com/hypermodeinc/modus/runtime/utils" + "github.com/hypermodeinc/modus/runtime/wasmhost" + + wasm "github.com/tetratelabs/wazero/api" + goakt "github.com/tochemey/goakt/v3/actor" + "github.com/tochemey/goakt/v3/goaktpb" +) + +type wasmAgentActor struct { + agentId string + agentName string + status AgentStatus + plugin *plugins.Plugin + host wasmhost.WasmHost + module wasm.Module + buffers utils.OutputBuffers + initialState *string +} + +func newWasmAgentActor(agentId, agentName string, host wasmhost.WasmHost, plugin *plugins.Plugin) *wasmAgentActor { + return &wasmAgentActor{ + agentId: agentId, + agentName: agentName, + host: host, + plugin: plugin, + } +} + +func (a *wasmAgentActor) PreStart(ac *goakt.Context) error { + ctx := a.newContext() + + switch a.status { + case AgentStatusStarting: + logger.Info(ctx).Msg("Starting agent.") + case AgentStatusResuming: + logger.Info(ctx).Msg("Resuming agent.") + case AgentStatusSuspended: + // note, don't call updateStatus here, because we can't publish an event yet + a.status = AgentStatusResuming + logger.Info(ctx).Msg("Resuming agent.") + default: + return fmt.Errorf("invalid agent status for actor PreStart: %s", a.status) + } + + if err := a.saveState(ctx); err != nil { + logger.Err(ctx, err).Msg("Error saving agent state.") + } + + a.buffers = utils.NewOutputBuffers() + if mod, err := a.host.GetModuleInstance(ctx, a.plugin, a.buffers); err != nil { + return err + } else { + a.module = mod + } + + return nil +} + +func (a *wasmAgentActor) PostStop(ac *goakt.Context) error { + ctx := a.newContext() + defer a.module.Close(ctx) + + switch a.status { + case AgentStatusRunning: + a.updateStatus(ctx, AgentStatusSuspending) + logger.Info(ctx).Msg("Suspending agent.") + case AgentStatusSuspending: + logger.Info(ctx).Msg("Suspending agent.") + case AgentStatusStopping: + logger.Info(ctx).Msg("Stopping agent.") + + default: + return fmt.Errorf("invalid agent status for actor PostStop: %s", a.status) + } + + if err := a.saveState(ctx); err != nil { + logger.Err(ctx, err).Msg("Error saving agent state.") + } + + start := time.Now() + + if err := a.shutdownAgent(ctx); err != nil { + logger.Err(ctx, err).Msg("Error shutting down agent.") + return err + } + + duration := time.Since(start) + switch a.status { + case AgentStatusSuspending: + a.updateStatus(ctx, AgentStatusSuspended) + if err := a.saveState(ctx); err != nil { + return err + } + logger.Info(ctx).Msg("Agent suspended successfully.") + case AgentStatusStopping: + a.updateStatus(ctx, AgentStatusTerminated) + if err := a.saveState(ctx); err != nil { + return err + } + logger.Info(ctx).Dur("duration_ms", duration).Msg("Agent terminated successfully.") + default: + return fmt.Errorf("invalid agent status for actor PostStop: %s", a.status) + } + + return nil +} + +func (a *wasmAgentActor) updateStatus(ctx context.Context, status AgentStatus) { + // set the agent status (this is the important part) + a.status = status + + // try to publish an event, but it's ok if it fails + data := fmt.Sprintf(`{"status":"%s"}`, a.status) + _ = PublishAgentEvent(ctx, a.agentId, agentStatusEventName, &data) +} + +func (a *wasmAgentActor) saveState(ctx context.Context) error { + var data string + if a.module != nil { + if d, err := a.getAgentState(ctx); err != nil { + return fmt.Errorf("error getting state from agent: %w", err) + } else if d != nil { + data = *d + } + } + + if err := db.WriteAgentState(ctx, db.AgentState{ + Id: a.agentId, + Name: a.agentName, + Status: a.status, + Data: data, + UpdatedAt: time.Now().UTC().Format(time.RFC3339), + }); err != nil { + return fmt.Errorf("error saving state to database: %w", err) + } + + return nil +} + +func (a *wasmAgentActor) Receive(rc *goakt.ReceiveContext) { + ctx := a.newContext() + + switch msg := rc.Message().(type) { + case *goaktpb.PostStart: + + if err := a.activateAgent(ctx); err != nil { + logger.Err(ctx, err).Msg("Error activating agent.") + rc.Err(err) + return + } + + if a.status == AgentStatusResuming { + if err := a.setAgentState(ctx, a.initialState); err != nil { + logger.Warn(ctx).Err(err).Msg("Unable to restore agent state.") + } + a.initialState = nil + } + + if a.status == AgentStatusResuming { + logger.Info(ctx).Msg("Agent resumed successfully.") + } else { + logger.Info(ctx).Msg("Agent started successfully.") + } + + a.updateStatus(ctx, AgentStatusRunning) + if err := a.saveState(ctx); err != nil { + logger.Err(ctx, err).Msg("Error saving agent state.") + rc.Err(err) + return + } + + case *messages.AgentRequestMessage: + + logger.Info(ctx).Str("msg_name", msg.Name).Msg("Received message.") + start := time.Now() + + fnInfo, err := a.host.GetFunctionInfo("_modus_agent_handle_message") + if err != nil { + rc.Err(err) + return + } + + params := map[string]any{ + "msgName": msg.Name, + "data": msg.Data, + } + + execInfo, err := a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) + if err != nil { + rc.Err(err) + return + } + + if msg.Respond { + result := execInfo.Result() + response := &messages.AgentResponseMessage{} + if result != nil { + switch result := result.(type) { + case string: + response.Data = &result + case *string: + response.Data = result + default: + err := fmt.Errorf("unexpected result type: %T", result) + logger.Err(ctx, err).Msg("Error handling message.") + rc.Err(err) + return + } + } + rc.Response(response) + } + + duration := time.Since(start) + logger.Info(ctx).Str("msg_name", msg.Name).Dur("duration_ms", duration).Msg("Message handled successfully.") + + // save the state after handling the message to ensure the state is up to date in case of hard termination + if err := a.saveState(ctx); err != nil { + logger.Err(ctx, err).Msg("Error saving agent state.") + } + + default: + rc.Unhandled() + } +} + +func (a *wasmAgentActor) newContext() context.Context { + ctx := context.Background() + ctx = context.WithValue(ctx, utils.WasmHostContextKey, a.host) + ctx = context.WithValue(ctx, utils.PluginContextKey, a.plugin) + ctx = context.WithValue(ctx, utils.AgentIdContextKey, a.agentId) + ctx = context.WithValue(ctx, utils.AgentNameContextKey, a.agentName) + return ctx +} + +func (a *wasmAgentActor) activateAgent(ctx context.Context) error { + + fnInfo, err := a.host.GetFunctionInfo("_modus_agent_activate") + if err != nil { + return err + } + + params := map[string]any{ + "name": a.agentName, + "id": a.agentId, + "reloading": a.status == AgentStatusResuming, + } + + _, err = a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) + return err +} + +func (a *wasmAgentActor) shutdownAgent(ctx context.Context) error { + + fnInfo, err := a.host.GetFunctionInfo("_modus_agent_shutdown") + if err != nil { + return err + } + + params := map[string]any{ + "suspending": a.status == AgentStatusSuspending, + } + + _, err = a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) + return err +} + +func (a *wasmAgentActor) getAgentState(ctx context.Context) (*string, error) { + + fnInfo, err := a.host.GetFunctionInfo("_modus_agent_get_state") + if err != nil { + return nil, err + } + + execInfo, err := a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, nil) + if err != nil { + return nil, err + } + + result := execInfo.Result() + if result == nil { + return nil, nil + } + + switch state := result.(type) { + case string: + return &state, nil + case *string: + return state, nil + default: + return nil, fmt.Errorf("unexpected result type: %T", result) + } +} + +func (a *wasmAgentActor) setAgentState(ctx context.Context, data *string) error { + fnInfo, err := a.host.GetFunctionInfo("_modus_agent_set_state") + if err != nil { + return err + } + + params := map[string]any{ + "data": data, + } + + _, err = a.host.CallFunctionInModule(ctx, a.module, a.buffers, fnInfo, params) + return err +} + +func (a *wasmAgentActor) reloadModule(ctx context.Context, plugin *plugins.Plugin) error { + + // the context may not have these values set + ctx = context.WithValue(ctx, utils.PluginContextKey, a.plugin) + ctx = context.WithValue(ctx, utils.AgentIdContextKey, a.agentId) + ctx = context.WithValue(ctx, utils.AgentNameContextKey, a.agentName) + + logger.Info(ctx).Msg("Reloading module for agent.") + + a.updateStatus(ctx, AgentStatusSuspending) + if err := a.shutdownAgent(ctx); err != nil { + logger.Err(ctx, err).Msg("Error shutting down agent.") + return err + } + + // get the current state and close the module instance + state, err := a.getAgentState(ctx) + if err != nil { + logger.Err(ctx, err).Msg("Error getting agent state.") + return err + } + a.module.Close(ctx) + a.updateStatus(ctx, AgentStatusSuspended) + + // create a new module instance and assign it to the actor + a.plugin = plugin + a.buffers = utils.NewOutputBuffers() + mod, err := a.host.GetModuleInstance(ctx, a.plugin, a.buffers) + if err != nil { + return err + } + a.module = mod + + // activate the agent in the new module instance + a.updateStatus(ctx, AgentStatusResuming) + if err := a.activateAgent(ctx); err != nil { + logger.Err(ctx, err).Msg("Error reloading agent.") + return err + } + + // restore the state in the new module instance + if err := a.setAgentState(ctx, state); err != nil { + logger.Err(ctx, err).Msg("Error setting agent state.") + return err + } + + a.updateStatus(ctx, AgentStatusRunning) + logger.Info(ctx).Msg("Agent reloaded module successfully.") + + return nil +} diff --git a/runtime/app/app.go b/runtime/app/app.go index 1c893ccf3..73261b68b 100644 --- a/runtime/app/app.go +++ b/runtime/app/app.go @@ -20,10 +20,10 @@ import ( ) var mu = &sync.RWMutex{} -var config *AppConfig +var config *AppConfig = NewAppConfig() var shuttingDown = false -func init() { +func Initialize() { // Set the global color mode SetOutputColorMode() diff --git a/runtime/explorer/explorer.go b/runtime/explorer/explorer.go index 835a8d618..052a44d07 100644 --- a/runtime/explorer/explorer.go +++ b/runtime/explorer/explorer.go @@ -55,7 +55,7 @@ func endpointsHandler(w http.ResponseWriter, r *http.Request) { } } - utils.WriteJsonContentHeader(w) + w.Header().Set("Content-Type", "application/json") j, _ := utils.JsonSerialize(endpoints) _, _ = w.Write(j) } @@ -70,7 +70,7 @@ func inferenceHistoryHandler(w http.ResponseWriter, r *http.Request) { return } - utils.WriteJsonContentHeader(w) + w.Header().Set("Content-Type", "application/json") j, err := utils.JsonSerialize(inferences) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/runtime/go.mod b/runtime/go.mod index dd7d4d835..726d818a2 100644 --- a/runtime/go.mod +++ b/runtime/go.mod @@ -11,6 +11,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.80.0 github.com/aws/aws-sdk-go-v2/service/sts v1.33.19 github.com/buger/jsonparser v1.1.1 + github.com/cespare/xxhash/v2 v2.3.0 github.com/dgraph-io/dgo/v240 v240.2.0 github.com/docker/docker v28.2.2+incompatible github.com/docker/go-connections v0.5.0 @@ -41,10 +42,11 @@ require ( github.com/tetratelabs/wazero v1.9.0 github.com/tidwall/gjson v1.18.0 github.com/tidwall/sjson v1.2.5 - github.com/tochemey/goakt/v3 v3.5.0 + github.com/tochemey/goakt/v3 v3.5.1 + github.com/travisjeffery/go-dynaport v1.0.0 github.com/twpayne/go-geom v1.6.1 - github.com/wundergraph/graphql-go-tools/execution v1.2.0 - github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.183 + github.com/wundergraph/graphql-go-tools/execution v1.3.1 + github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.186 github.com/xo/dburl v0.23.8 golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 golang.org/x/sys v0.33.0 @@ -94,7 +96,6 @@ require ( github.com/bufbuild/protocompile v0.14.1 // indirect github.com/buraksezer/consistent v0.10.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chewxy/math32 v1.11.1 // indirect github.com/containerd/errdefs v1.0.0 // indirect github.com/containerd/errdefs/pkg v0.3.0 // indirect @@ -124,7 +125,7 @@ require ( github.com/flowchartsman/retry v1.2.0 // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/go-jose/go-jose/v4 v4.0.5 // indirect - github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/gobwas/httphead v0.1.0 // indirect @@ -168,7 +169,7 @@ require ( github.com/jcmturner/gofork v1.7.6 // indirect github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/jensneuse/byte-template v0.0.0-20200214152254-4f3cf06e5c68 // indirect + github.com/jensneuse/byte-template v0.0.0-20231025215717-69252eb3ed56 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kingledion/go-tools v0.6.0 // indirect github.com/klauspost/compress v1.18.0 // indirect @@ -250,13 +251,13 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.38.0 // indirect - golang.org/x/mod v0.24.0 // indirect + golang.org/x/crypto v0.39.0 // indirect + golang.org/x/mod v0.25.0 // indirect golang.org/x/net v0.40.0 // indirect - golang.org/x/sync v0.14.0 // indirect + golang.org/x/sync v0.15.0 // indirect golang.org/x/term v0.32.0 // indirect - golang.org/x/text v0.25.0 // indirect - golang.org/x/time v0.11.0 // indirect + golang.org/x/text v0.26.0 // indirect + golang.org/x/time v0.12.0 // indirect golang.org/x/tools v0.33.0 // indirect google.golang.org/api v0.219.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect diff --git a/runtime/go.sum b/runtime/go.sum index c2b47777c..77c217d49 100644 --- a/runtime/go.sum +++ b/runtime/go.sum @@ -295,8 +295,8 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= -github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= @@ -440,6 +440,8 @@ github.com/hashicorp/go-msgpack/v2 v2.1.3 h1:cB1w4Zrk0O3jQBTcFMKqYQWRFfsSQ/TYKNy github.com/hashicorp/go-msgpack/v2 v2.1.3/go.mod h1:SjlwKKFnwBXvxD/I1bEcfJIBbEJ+MCUn39TxymNR5ZU= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-plugin v1.6.3 h1:xgHB+ZUSYeuJi96WtxEjzi23uh7YQpznjGh0U0UUrwg= +github.com/hashicorp/go-plugin v1.6.3/go.mod h1:MRobyh+Wc/nYy1V4KAXUiYfzxoYhs7V1mlH1Z7iY2h0= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= @@ -469,6 +471,8 @@ github.com/hashicorp/memberlist v0.5.3 h1:tQ1jOCypD0WvMemw/ZhhtH+PWpzcftQvgCorLu github.com/hashicorp/memberlist v0.5.3/go.mod h1:h60o12SZn/ua/j0B6iKAZezA4eDaGsIuPO70eOaJ6WE= github.com/hashicorp/vault/api v1.16.0 h1:nbEYGJiAPGzT9U4oWgaaB0g+Rj8E59QuHKyA5LhwQN4= github.com/hashicorp/vault/api v1.16.0/go.mod h1:KhuUhzOD8lDSk29AtzNjgAu2kxRA9jL9NAbkFlqvkBA= +github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE= +github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= github.com/hypermodeinc/dgraph/v24 v24.1.2 h1:fRj+yH36D9Hv79X9fiY31B5ud9g28dbKlOngcEv0JXI= @@ -509,8 +513,8 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jensneuse/abstractlogger v0.0.4 h1:sa4EH8fhWk3zlTDbSncaWKfwxYM8tYSlQ054ETLyyQY= github.com/jensneuse/abstractlogger v0.0.4/go.mod h1:6WuamOHuykJk8zED/R0LNiLhWR6C7FIAo43ocUEB3mo= -github.com/jensneuse/byte-template v0.0.0-20200214152254-4f3cf06e5c68 h1:E80wOd3IFQcoBxLkAUpUQ3BoGrZ4DxhQdP21+HH1s6A= -github.com/jensneuse/byte-template v0.0.0-20200214152254-4f3cf06e5c68/go.mod h1:0D5r/VSW6D/o65rKLL9xk7sZxL2+oku2HvFPYeIMFr4= +github.com/jensneuse/byte-template v0.0.0-20231025215717-69252eb3ed56 h1:wo26fh6a6Za0cOMZIopD2sfH/kq83SJ89ixUWl7pCWc= +github.com/jensneuse/byte-template v0.0.0-20231025215717-69252eb3ed56/go.mod h1:0D5r/VSW6D/o65rKLL9xk7sZxL2+oku2HvFPYeIMFr4= github.com/jensneuse/diffview v1.0.0 h1:4b6FQJ7y3295JUHU3tRko6euyEboL825ZsXeZZM47Z4= github.com/jensneuse/diffview v1.0.0/go.mod h1:i6IacuD8LnEaPuiyzMHA+Wfz5mAuycMOf3R/orUY9y4= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= @@ -629,8 +633,8 @@ github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI= github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= github.com/nats-io/nats-server/v2 v2.11.4 h1:oQhvy6He6ER926sGqIKBKuYHH4BGnUQCNb0Y5Qa+M54= github.com/nats-io/nats-server/v2 v2.11.4/go.mod h1:jFnKKwbNeq6IfLHq+OMnl7vrFRihQ/MkhRbiWfjLdjU= -github.com/nats-io/nats.go v1.42.0 h1:ynIMupIOvf/ZWH/b2qda6WGKGNSjwOUutTpWRvAmhaM= -github.com/nats-io/nats.go v1.42.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nats.go v1.43.0 h1:uRFZ2FEoRvP64+UUhaTokyS18XBCR/xM2vQZKO4i8ug= +github.com/nats-io/nats.go v1.43.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -638,6 +642,8 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS github.com/neo4j/neo4j-go-driver/v5 v5.28.1 h1:RKWQW7wTgYAY2fU9S+9LaJ9OwRPbRc0I17tlT7nDmAY= github.com/neo4j/neo4j-go-driver/v5 v5.28.1/go.mod h1:Vff8OwT7QpLm7L2yYr85XNWe9Rbqlbeb9asNXJTHO4k= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= +github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= @@ -819,8 +825,8 @@ github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDW github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po= github.com/tinylib/msgp v1.2.5/go.mod h1:ykjzy2wzgrlvpDCRc4LA8UXy6D8bzMSuAF3WD57Gok0= -github.com/tochemey/goakt/v3 v3.5.0 h1:cjFWryLQDjKqVpUisopUL6/+A3xgKfYw8yVLbe1nJiI= -github.com/tochemey/goakt/v3 v3.5.0/go.mod h1:DENJmmolABUcOFlUOhY+8eaJj6v8nNcsVp2SEwMHnZo= +github.com/tochemey/goakt/v3 v3.5.1 h1:WCZUviuQ3pw0lyXx0nKmpGOKOUhXDTdJmFDvWc4P9do= +github.com/tochemey/goakt/v3 v3.5.1/go.mod h1:S+nNYJJZZX5MG0xlontuo9ObXZPHkSuw3LBGOHxOgqY= github.com/tochemey/olric v0.2.3 h1:LGmsHLQBSEs3uasZNLT5MdS2pBMNJ71gSrXnYfkb62M= github.com/tochemey/olric v0.2.3/go.mod h1:BAD82xys8R8IAWFV+GC0B8I+J4QsYZvmPS5NT/dhmtI= github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw= @@ -851,10 +857,10 @@ github.com/wundergraph/cosmo/composition-go v0.0.0-20241020204711-78f240a77c99 h github.com/wundergraph/cosmo/composition-go v0.0.0-20241020204711-78f240a77c99/go.mod h1:fUuOAUAXUFB/mlSkAaImGeE4A841AKR5dTMWhV4ibxI= github.com/wundergraph/cosmo/router v0.0.0-20240729154441-b20b00e892c6 h1:oXnHjPyl2Wes+mnCQbR0F1v0WAUOf974PdANFMA4CmI= github.com/wundergraph/cosmo/router v0.0.0-20240729154441-b20b00e892c6/go.mod h1:0FOIMzc1cY8c4rUczyhmI2es84HGoEJV2MIbDbiEWzg= -github.com/wundergraph/graphql-go-tools/execution v1.2.0 h1:9PXcNSN2n231q/YZZS3kFJ2kpd67LA8QMgGlUFson84= -github.com/wundergraph/graphql-go-tools/execution v1.2.0/go.mod h1:sv2LtqCiTCdiK0P6x3KUYLb9C1V8RW9H/9eqEdfgktY= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.183 h1:NsUlVUR0sZWKSi5Sbgsx6c527+R0wqXj5WDithbg4TQ= -github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.183/go.mod h1:8N4c4JRDknFdph84ZCcHxgNBjnW7f3aiIycmpG/IhCc= +github.com/wundergraph/graphql-go-tools/execution v1.3.1 h1:0W2ft+0jSAo//8se+nt9JAgGIdesw+J9QyP2cxO9V04= +github.com/wundergraph/graphql-go-tools/execution v1.3.1/go.mod h1:W/qne08dAC/E0MrwOFBkeFujKTNFv0w8XHfJb+9Y5AM= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.186 h1:q0lNSN6RRwINn7QLNvghMt0gqbeA5aOGiuOobaxz3ww= +github.com/wundergraph/graphql-go-tools/v2 v2.0.0-rc.186/go.mod h1:DaBrBCMgKGd3t7zg7z11jKm+0mVJiesr/IQCRG9qgP0= github.com/xdg/scram v1.0.5 h1:TuS0RFmt5Is5qm9Tm2SoD89OPqe4IRiFtyFY4iwWXsw= github.com/xdg/scram v1.0.5/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.3 h1:cmL5Enob4W83ti/ZHuZLuKD/xqJfus4fVPwE+/BDm+4= @@ -927,8 +933,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= -golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= +golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= +golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -966,8 +972,8 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.24.0 h1:ZfthKaKaT4NrhGVZHO1/WDTwGES4De8KtWO0SIbNJMU= -golang.org/x/mod v0.24.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= +golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= +golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1030,8 +1036,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= +golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1100,13 +1106,13 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= -golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= -golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= +golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/runtime/graphql/datasource/configuration.go b/runtime/graphql/datasource/configuration.go index 4bd5e9e2e..7d803191e 100644 --- a/runtime/graphql/datasource/configuration.go +++ b/runtime/graphql/datasource/configuration.go @@ -11,10 +11,12 @@ package datasource import ( "github.com/hypermodeinc/modus/runtime/wasmhost" + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan" ) -type HypDSConfig struct { +type ModusDataSourceConfig struct { WasmHost wasmhost.WasmHost FieldsToFunctions map[string]string MapTypes []string + Metadata *plan.DataSourceMetadata } diff --git a/runtime/graphql/datasource/eventsds.go b/runtime/graphql/datasource/eventsds.go new file mode 100644 index 000000000..08fc671b7 --- /dev/null +++ b/runtime/graphql/datasource/eventsds.go @@ -0,0 +1,71 @@ +/* + * Copyright 2025 Hypermode Inc. + * Licensed under the terms of the Apache License, Version 2.0 + * See the LICENSE file that accompanied this code for further details. + * + * SPDX-FileCopyrightText: 2025 Hypermode Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package datasource + +import ( + "fmt" + + "github.com/cespare/xxhash/v2" + "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" + + "github.com/hypermodeinc/modus/runtime/actors" + "github.com/hypermodeinc/modus/runtime/utils" +) + +type eventsDataSource struct{} + +func (s *eventsDataSource) Start(rc *resolve.Context, input []byte, updater resolve.SubscriptionUpdater) error { + err := startEventsDataSource(rc, input, updater) + if err != nil { + // We should be able to return the error, but there appears to be a bug in the GraphQL-Go-Tools library + // where it doesn't handle the error correctly and causes a panic. + // TODO: investigate this further. + errMsg, _ := utils.JsonSerialize(err.Error()) + updater.Update(fmt.Appendf(nil, `{"errors":[{"message":%s}]}`, errMsg)) + updater.Done() + } + return nil +} + +func startEventsDataSource(rc *resolve.Context, input []byte, updater resolve.SubscriptionUpdater) error { + var ci callInfo + if err := utils.JsonDeserialize(input, &ci); err != nil { + return fmt.Errorf("error parsing input: %w", err) + } + + // Current there is only one hardcoded subscription for agent events. + if subName := ci.FieldInfo.Name; subName != "agentEvent" { + return fmt.Errorf("unknown subscription: %s", subName) + } + + var agentId string + if value, ok := ci.Parameters["agentId"]; !ok { + return fmt.Errorf("missing required parameter 'agentId'") + } else if id, ok := value.(string); !ok { + return fmt.Errorf("invalid type for 'agentId', expected string, got %T", value) + } else { + agentId = id + } + + fieldName := ci.FieldInfo.AliasOrName() + return actors.SubscribeForAgentEvents( + rc.Context(), + agentId, + func(data []byte) { + updater.Update(fmt.Appendf(nil, `{"data":{"%s":%s}}`, fieldName, data)) + }, + updater.Done, + ) +} + +func (s *eventsDataSource) UniqueRequestID(rc *resolve.Context, input []byte, xxh *xxhash.Digest) error { + _, err := xxh.Write(input) + return err +} diff --git a/runtime/graphql/datasource/factory.go b/runtime/graphql/datasource/factory.go index 2352663a7..cf198b8a8 100644 --- a/runtime/graphql/datasource/factory.go +++ b/runtime/graphql/datasource/factory.go @@ -17,26 +17,26 @@ import ( "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan" ) -func NewHypDSFactory(ctx context.Context) plan.PlannerFactory[HypDSConfig] { - return &hypDSFactory{ +func NewModusDataSourceFactory(ctx context.Context) plan.PlannerFactory[ModusDataSourceConfig] { + return &modusDataSourceFactory{ ctx: ctx, } } -type hypDSFactory struct { +type modusDataSourceFactory struct { ctx context.Context } -func (f *hypDSFactory) Planner(logger abstractlogger.Logger) plan.DataSourcePlanner[HypDSConfig] { - return &HypDSPlanner{ +func (f *modusDataSourceFactory) Planner(logger abstractlogger.Logger) plan.DataSourcePlanner[ModusDataSourceConfig] { + return &modusDataSourcePlanner{ ctx: f.ctx, } } -func (f *hypDSFactory) Context() context.Context { +func (f *modusDataSourceFactory) Context() context.Context { return f.ctx } -func (f *hypDSFactory) UpstreamSchema(dataSourceConfig plan.DataSourceConfiguration[HypDSConfig]) (*ast.Document, bool) { +func (f *modusDataSourceFactory) UpstreamSchema(dataSourceConfig plan.DataSourceConfiguration[ModusDataSourceConfig]) (*ast.Document, bool) { return nil, false } diff --git a/runtime/graphql/datasource/source.go b/runtime/graphql/datasource/functionsds.go similarity index 92% rename from runtime/graphql/datasource/source.go rename to runtime/graphql/datasource/functionsds.go index 9a8242f89..c35ff0b28 100644 --- a/runtime/graphql/datasource/source.go +++ b/runtime/graphql/datasource/functionsds.go @@ -27,45 +27,37 @@ import ( "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" ) -const DataSourceName = "ModusDataSource" - type callInfo struct { FieldInfo fieldInfo `json:"field"` - FunctionName string `json:"function"` + FunctionName string `json:"function,omitempty"` Parameters map[string]any `json:"data"` } -type ModusDataSource struct { +type functionsDataSource struct { WasmHost wasmhost.WasmHost } -func (ds *ModusDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) error { - - // Parse the input to get the function call info +func (ds *functionsDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) error { var ci callInfo - err := utils.JsonDeserialize(input, &ci) - if err != nil { + if err := utils.JsonDeserialize(input, &ci); err != nil { return fmt.Errorf("error parsing input: %w", err) } - // Load the data - result, gqlErrors, err := ds.callFunction(ctx, &ci) + result, gqlErrors, fnErr := ds.callFunction(ctx, &ci) - // Write the response - err = writeGraphQLResponse(ctx, out, result, gqlErrors, err, &ci) - if err != nil { - logger.Error(ctx).Err(err).Msg("Error creating GraphQL response.") + if err := writeGraphQLResponse(ctx, out, result, gqlErrors, fnErr, &ci); err != nil { + return fmt.Errorf("error creating GraphQL response: %w", err) } - return err + return nil } -func (*ModusDataSource) LoadWithFiles(ctx context.Context, input []byte, files []*httpclient.FileUpload, out *bytes.Buffer) (err error) { +func (*functionsDataSource) LoadWithFiles(ctx context.Context, input []byte, files []*httpclient.FileUpload, out *bytes.Buffer) (err error) { // See https://github.com/wundergraph/graphql-go-tools/pull/758 panic("not implemented") } -func (ds *ModusDataSource) callFunction(ctx context.Context, callInfo *callInfo) (any, []resolve.GraphQLError, error) { +func (ds *functionsDataSource) callFunction(ctx context.Context, callInfo *callInfo) (any, []resolve.GraphQLError, error) { // Handle special case for __typename on root Query or Mutation if callInfo.FieldInfo.Name == "__typename" { diff --git a/runtime/graphql/datasource/planner.go b/runtime/graphql/datasource/planner.go index 06b146023..1f977ef24 100644 --- a/runtime/graphql/datasource/planner.go +++ b/runtime/graphql/datasource/planner.go @@ -12,21 +12,22 @@ package datasource import ( "bytes" "context" - "fmt" "slices" + "strings" "github.com/hypermodeinc/modus/runtime/logger" "github.com/hypermodeinc/modus/runtime/utils" + "github.com/tidwall/gjson" "github.com/wundergraph/graphql-go-tools/v2/pkg/ast" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan" "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve" ) -type HypDSPlanner struct { +type modusDataSourcePlanner struct { id int ctx context.Context - config HypDSConfig + config plan.DataSourceConfiguration[ModusDataSourceConfig] visitor *plan.Visitor variables resolve.Variables fields map[int]fieldInfo @@ -46,6 +47,7 @@ type fieldInfo struct { Fields []fieldInfo `json:"fields,omitempty"` IsMapType bool `json:"isMapType,omitempty"` fieldRefs []int `json:"-"` + depth int `json:"-"` } func (t *fieldInfo) AliasOrName() string { @@ -55,23 +57,23 @@ func (t *fieldInfo) AliasOrName() string { return t.Name } -func (p *HypDSPlanner) SetID(id int) { +func (p *modusDataSourcePlanner) SetID(id int) { p.id = id } -func (p *HypDSPlanner) ID() (id int) { +func (p *modusDataSourcePlanner) ID() (id int) { return p.id } -func (p *HypDSPlanner) UpstreamSchema(dataSourceConfig plan.DataSourceConfiguration[HypDSConfig]) (*ast.Document, bool) { +func (p *modusDataSourcePlanner) UpstreamSchema(dataSourceConfig plan.DataSourceConfiguration[ModusDataSourceConfig]) (*ast.Document, bool) { return nil, false } -func (p *HypDSPlanner) DownstreamResponseFieldAlias(downstreamFieldRef int) (alias string, exists bool) { +func (p *modusDataSourcePlanner) DownstreamResponseFieldAlias(downstreamFieldRef int) (alias string, exists bool) { return } -func (p *HypDSPlanner) DataSourcePlanningBehavior() plan.DataSourcePlanningBehavior { +func (p *modusDataSourcePlanner) DataSourcePlanningBehavior() plan.DataSourcePlanningBehavior { return plan.DataSourcePlanningBehavior{ // This needs to be true, so we can distinguish results for multiple function calls in the same operation. // Example: @@ -88,31 +90,39 @@ func (p *HypDSPlanner) DataSourcePlanningBehavior() plan.DataSourcePlanningBehav } } -func (p *HypDSPlanner) Register(visitor *plan.Visitor, configuration plan.DataSourceConfiguration[HypDSConfig], dspc plan.DataSourcePlannerConfiguration) error { +func (p *modusDataSourcePlanner) Register(visitor *plan.Visitor, configuration plan.DataSourceConfiguration[ModusDataSourceConfig], dspc plan.DataSourcePlannerConfiguration) error { p.visitor = visitor visitor.Walker.RegisterEnterDocumentVisitor(p) visitor.Walker.RegisterEnterFieldVisitor(p) visitor.Walker.RegisterLeaveDocumentVisitor(p) - p.config = HypDSConfig(configuration.CustomConfiguration()) + p.config = configuration + return nil } -func (p *HypDSPlanner) EnterDocument(operation, definition *ast.Document) { +func (p *modusDataSourcePlanner) EnterDocument(operation, definition *ast.Document) { p.fields = make(map[int]fieldInfo, len(operation.Fields)) } -func (p *HypDSPlanner) EnterField(ref int) { +func (p *modusDataSourcePlanner) EnterField(ref int) { + ds := p.config.(plan.DataSource) + config := p.config.CustomConfiguration() // Capture information about every field in the operation. f := p.captureField(ref) p.fields[ref] = *f - // Capture only the fields that represent function calls. - if p.currentNodeIsFunctionCall() { - + // Capture input data from only the fields that represent function calls or event subscriptions. + // These fields are identified by their depth (3), and the fact that they have + // a parent type that is a root node in the data source configuration. + // + // The depth is 3 because the tree structure of the GraphQL operation looks like this: + // 0: root node (query, mutation, or subscription) + // 1: selection set node + // 2: this field's node + if f.depth == 3 && ds.HasRootNode(f.ParentType, f.Name) { p.template.fieldInfo = f - p.template.functionName = p.config.FieldsToFunctions[f.Name] - + p.template.functionName = config.FieldsToFunctions[f.Name] if err := p.captureInputData(ref); err != nil { logger.Err(p.ctx, err).Msg("Error capturing input data.") return @@ -120,12 +130,12 @@ func (p *HypDSPlanner) EnterField(ref int) { } } -func (p *HypDSPlanner) LeaveDocument(operation, definition *ast.Document) { +func (p *modusDataSourcePlanner) LeaveDocument(operation, definition *ast.Document) { // Stitch the captured fields together to form a tree. p.stitchFields(p.template.fieldInfo) } -func (p *HypDSPlanner) stitchFields(f *fieldInfo) { +func (p *modusDataSourcePlanner) stitchFields(f *fieldInfo) { if f == nil || len(f.fieldRefs) == 0 { return } @@ -138,30 +148,15 @@ func (p *HypDSPlanner) stitchFields(f *fieldInfo) { } } -func (p *HypDSPlanner) currentNodeIsFunctionCall() bool { - if p.visitor.Walker.CurrentKind != ast.NodeKindField { - return false - } - - enclosingTypeDef := p.visitor.Walker.EnclosingTypeDefinition - if enclosingTypeDef.Kind != ast.NodeKindObjectTypeDefinition { - return false - } - - // TODO: This works, but it's a hack. We should find a better way to determine if the field is a function call. - // The previous approach of root node testing worked for queries, but not for mutations. - // The enclosing type name should not be relevant. - enclosingTypeName := p.visitor.Definition.ObjectTypeDefinitionNameString(enclosingTypeDef.Ref) - return enclosingTypeName == "Query" || enclosingTypeName == "Mutation" -} - -func (p *HypDSPlanner) captureField(ref int) *fieldInfo { +func (p *modusDataSourcePlanner) captureField(ref int) *fieldInfo { operation := p.visitor.Operation definition := p.visitor.Definition walker := p.visitor.Walker + config := p.config.CustomConfiguration() f := &fieldInfo{ ref: ref, + depth: walker.Depth, Name: operation.FieldNameString(ref), Alias: operation.FieldAliasString(ref), } @@ -170,7 +165,7 @@ func (p *HypDSPlanner) captureField(ref int) *fieldInfo { if ok { f.TypeName = definition.FieldDefinitionTypeNameString(def) f.ParentType = walker.EnclosingTypeDefinition.NameString(definition) - f.IsMapType = slices.Contains(p.config.MapTypes, f.TypeName) + f.IsMapType = slices.Contains(config.MapTypes, f.TypeName) } if operation.FieldHasSelections(ref) { @@ -183,7 +178,7 @@ func (p *HypDSPlanner) captureField(ref int) *fieldInfo { return f } -func (p *HypDSPlanner) captureInputData(fieldRef int) error { +func (p *modusDataSourcePlanner) captureInputData(fieldRef int) error { operation := p.visitor.Operation variables := resolve.NewVariables() var buf bytes.Buffer @@ -224,29 +219,48 @@ func (p *HypDSPlanner) captureInputData(fieldRef int) error { return nil } -func (p *HypDSPlanner) ConfigureFetch() resolve.FetchConfiguration { +func (p *modusDataSourcePlanner) getInputTemplate() (string, error) { + fieldInfoJson, err := utils.JsonSerialize(p.template.fieldInfo) if err != nil { - logger.Error(p.ctx).Err(err).Msg("Error serializing json while configuring graphql fetch.") - return resolve.FetchConfiguration{} + return "", err } - functionNameJson, err := utils.JsonSerialize(p.template.functionName) + // Note: we have to build the rest of the template manually, because the data field may + // contain placeholders for variables, such as $$0$$ which are not valid in JSON. + // They are replaced with the actual values when the input is rendered. + + b := &strings.Builder{} + b.WriteString(`{"field":`) + b.Write(fieldInfoJson) + + if len(p.template.functionName) > 0 { + b.WriteString(`,"function":`) + b.Write(gjson.AppendJSONString(nil, p.template.functionName)) + } + + b.WriteString(`,"data":`) + b.Write(p.template.data) + + b.WriteByte('}') + + return b.String(), nil +} + +func (p *modusDataSourcePlanner) ConfigureFetch() resolve.FetchConfiguration { + input, err := p.getInputTemplate() if err != nil { - logger.Error(p.ctx).Err(err).Msg("Error serializing json while configuring graphql fetch.") + logger.Error(p.ctx).Err(err).Msg("Error creating input template for Modus data source.") return resolve.FetchConfiguration{} } - // Note: we have to build the rest of the template manually, because the data field may - // contain placeholders for variables, such as $$0$$ which are not valid in JSON. - // They are replaced with the actual values by the time Load is called. - inputTemplate := fmt.Sprintf(`{"field":%s,"function":%s,"data":%s}`, fieldInfoJson, functionNameJson, p.template.data) + config := p.config.CustomConfiguration() return resolve.FetchConfiguration{ - Input: inputTemplate, + Input: input, Variables: p.variables, - DataSource: &ModusDataSource{ - WasmHost: p.config.WasmHost, + DataSource: &functionsDataSource{ + WasmHost: config.WasmHost, }, PostProcessing: resolve.PostProcessingConfiguration{ SelectResponseDataPath: []string{"data"}, @@ -255,6 +269,20 @@ func (p *HypDSPlanner) ConfigureFetch() resolve.FetchConfiguration { } } -func (p *HypDSPlanner) ConfigureSubscription() plan.SubscriptionConfiguration { - panic("subscription not implemented") +func (p *modusDataSourcePlanner) ConfigureSubscription() plan.SubscriptionConfiguration { + input, err := p.getInputTemplate() + if err != nil { + logger.Error(p.ctx).Err(err).Msg("Error creating input template for Modus data source.") + return plan.SubscriptionConfiguration{} + } + + return plan.SubscriptionConfiguration{ + Input: input, + Variables: p.variables, + DataSource: &eventsDataSource{}, + PostProcessing: resolve.PostProcessingConfiguration{ + SelectResponseDataPath: []string{"data"}, + SelectResponseErrorsPath: []string{"errors"}, + }, + } } diff --git a/runtime/graphql/engine/engine.go b/runtime/graphql/engine/engine.go index a61962f7a..21d536837 100644 --- a/runtime/graphql/engine/engine.go +++ b/runtime/graphql/engine/engine.go @@ -72,7 +72,7 @@ func Activate(ctx context.Context, plugin *plugins.Plugin) error { return nil } -func generateSchema(ctx context.Context, md *metadata.Metadata) (*gql.Schema, *datasource.HypDSConfig, error) { +func generateSchema(ctx context.Context, md *metadata.Metadata) (*gql.Schema, *datasource.ModusDataSourceConfig, error) { span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) defer span.Finish() @@ -94,7 +94,7 @@ func generateSchema(ctx context.Context, md *metadata.Metadata) (*gql.Schema, *d return nil, nil, err } - cfg := &datasource.HypDSConfig{ + cfg := &datasource.ModusDataSourceConfig{ WasmHost: wasmhost.GetWasmHost(ctx), FieldsToFunctions: generated.FieldsToFunctions, MapTypes: generated.MapTypes, @@ -103,7 +103,7 @@ func generateSchema(ctx context.Context, md *metadata.Metadata) (*gql.Schema, *d return schema, cfg, nil } -func getDatasourceConfig(ctx context.Context, schema *gql.Schema, cfg *datasource.HypDSConfig) (plan.DataSourceConfiguration[datasource.HypDSConfig], error) { +func getDatasourceConfig(ctx context.Context, schema *gql.Schema, cfg *datasource.ModusDataSourceConfig) (plan.DataSourceConfiguration[datasource.ModusDataSourceConfig], error) { span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) defer span.Finish() @@ -113,6 +113,9 @@ func getDatasourceConfig(ctx context.Context, schema *gql.Schema, cfg *datasourc mutationTypeName := schema.MutationTypeName() mutationFieldNames := getTypeFields(ctx, schema, mutationTypeName) + subscriptionTypeName := schema.SubscriptionTypeName() + subscriptionFieldNames := getTypeFields(ctx, schema, subscriptionTypeName) + rootNodes := []plan.TypeField{ { TypeName: queryTypeName, @@ -122,16 +125,24 @@ func getDatasourceConfig(ctx context.Context, schema *gql.Schema, cfg *datasourc TypeName: mutationTypeName, FieldNames: mutationFieldNames, }, + { + TypeName: subscriptionTypeName, + FieldNames: subscriptionFieldNames, + }, } childNodes := []plan.TypeField{} childNodes = append(childNodes, getChildNodes(queryFieldNames, schema, queryTypeName)...) childNodes = append(childNodes, getChildNodes(mutationFieldNames, schema, mutationTypeName)...) + childNodes = append(childNodes, getChildNodes(subscriptionFieldNames, schema, subscriptionTypeName)...) + + metadata := &plan.DataSourceMetadata{RootNodes: rootNodes, ChildNodes: childNodes} + cfg.Metadata = metadata return plan.NewDataSourceConfiguration( - datasource.DataSourceName, - datasource.NewHypDSFactory(ctx), - &plan.DataSourceMetadata{RootNodes: rootNodes, ChildNodes: childNodes}, + "Modus", + datasource.NewModusDataSourceFactory(ctx), + metadata, *cfg, ) } @@ -154,7 +165,7 @@ func getChildNodes(fieldNames []string, schema *gql.Schema, typeName string) []p return childNodes } -func makeEngine(ctx context.Context, schema *gql.Schema, datasourceConfig plan.DataSourceConfiguration[datasource.HypDSConfig]) (*engine.ExecutionEngine, error) { +func makeEngine(ctx context.Context, schema *gql.Schema, datasourceConfig plan.DataSourceConfiguration[datasource.ModusDataSourceConfig]) (*engine.ExecutionEngine, error) { span, ctx := utils.NewSentrySpanForCurrentFunc(ctx) defer span.Finish() diff --git a/runtime/graphql/graphql.go b/runtime/graphql/graphql.go index a576f9f34..2c8704b0c 100644 --- a/runtime/graphql/graphql.go +++ b/runtime/graphql/graphql.go @@ -91,7 +91,7 @@ func handleGraphQLRequest(w http.ResponseWriter, r *http.Request) { if engine == nil { msg := "There is no active GraphQL schema. Please load a Modus plugin." logger.Warn(ctx).Msg(msg) - utils.WriteJsonContentHeader(w) + w.Header().Set("Content-Type", "application/json") if ok, _ := gqlRequest.IsIntrospectionQuery(); ok { fmt.Fprint(w, `{"data":{"__schema":{"types":[]}}}`) } else { @@ -125,10 +125,51 @@ func handleGraphQLRequest(w http.ResponseWriter, r *http.Request) { options = append(options, eng.WithRequestTraceOptions(traceOpts)) } - // Execute the GraphQL operation + // Prepare the result writer + streaming := false resultWriter := gql.NewEngineResultWriter() - if err := engine.Execute(ctx, &gqlRequest, &resultWriter, options...); err != nil { + if operationType, err := gqlRequest.OperationType(); err != nil { + msg := "Failed to determine operation type from GraphQL request." + logger.Err(ctx, err).Msg(msg) + http.Error(w, msg, http.StatusBadRequest) + return + } else if operationType == gql.OperationTypeSubscription { + if !isSSERequest(r) { + msg := "Subscriptions use SSE (Server-Sent Events). Requests must accept 'text/event-stream' for SSE responses." + logger.Warn(ctx).Msg(msg) + http.Error(w, msg, http.StatusBadRequest) + return + } + + flusher := w.(http.Flusher) + streaming = true + + // We're following the GraphQL SSE draft spec for subscriptions. References: + // https://the-guild.dev/graphql/sse + // https://github.com/enisdenjo/graphql-sse/blob/master/PROTOCOL.md + // + // Clients should be implemented similar to these examples: + // https://the-guild.dev/graphql/sse/recipes#client-usage + + h := w.Header() + h.Set("Content-Type", "text/event-stream") + h.Set("Cache-Control", "no-cache") + h.Set("Connection", "keep-alive") + + // note: this event isn't in the graphql-sse draft spec, but is helpful to indicate the connection is established, + // when testing, and doesn't interfere with the graphql events. + fmt.Fprint(w, "event: ack\ndata: \n\n") + flusher.Flush() + + resultWriter.SetFlushCallback(func(data []byte) { + // graphql subscription data and errors are pushed over a "next" event, per the graphql-sse draft spec. + fmt.Fprintf(w, "event: next\ndata: %s\n\n", data) + flusher.Flush() + }) + } + // Execute the GraphQL operation + if err := engine.Execute(ctx, &gqlRequest, &resultWriter, options...); err != nil { if report, ok := err.(operationreport.Report); ok { if len(report.InternalErrors) > 0 { // Log internal errors, but don't return them to the client @@ -141,8 +182,15 @@ func handleGraphQLRequest(w http.ResponseWriter, r *http.Request) { if requestErrors := graphqlerrors.RequestErrorsFromError(err); len(requestErrors) > 0 { // TODO: we should capture metrics here - utils.WriteJsonContentHeader(w) - _, _ = requestErrors.WriteResponse(w) + + if streaming { + fmt.Fprint(w, "event: next\ndata: ") + _, _ = requestErrors.WriteResponse(w) + fmt.Fprint(w, "\n\nevent: complete\ndata: \n\n") + } else { + w.Header().Set("Content-Type", "application/json") + _, _ = requestErrors.WriteResponse(w) + } // NOTE: We only log these in dev, to avoid a bad actor spamming the logs in prod. if app.IsDevEnvironment() { @@ -159,12 +207,17 @@ func handleGraphQLRequest(w http.ResponseWriter, r *http.Request) { return } + if streaming { + // In case the connection is still open, we send a final "complete" event, per the graphql-sse draft spec. + fmt.Fprint(w, "event: complete\ndata: \n\n") + return + } + if response, err := addOutputToResponse(resultWriter.Bytes(), xsync.ToPlainMap(output)); err != nil { msg := "Failed to add function output to response." logger.Err(ctx, err).Msg(msg) http.Error(w, fmt.Sprintf("%s\n%v", msg, err), http.StatusInternalServerError) } else { - utils.WriteJsonContentHeader(w) // An introspection query will always return a Query type, but if only mutations were defined, // the fields of the Query type will be null. That will fail the introspection query, so we need @@ -179,6 +232,7 @@ func handleGraphQLRequest(w http.ResponseWriter, r *http.Request) { } } + w.Header().Set("Content-Type", "application/json") _, _ = w.Write(response) } } @@ -236,3 +290,14 @@ func addOutputToResponse(response []byte, output map[string]wasmhost.ExecutionIn return response, nil } + +func isSSERequest(r *http.Request) bool { + for _, accept := range r.Header.Values("Accept") { + for value := range strings.SplitSeq(accept, ",") { + if strings.EqualFold(strings.TrimSpace(strings.SplitN(value, ";", 2)[0]), "text/event-stream") { + return true + } + } + } + return false +} diff --git a/runtime/graphql/schemagen/schemagen.go b/runtime/graphql/schemagen/schemagen.go index 02ff328b6..c48d499d5 100644 --- a/runtime/graphql/schemagen/schemagen.go +++ b/runtime/graphql/schemagen/schemagen.go @@ -70,6 +70,22 @@ func GetGraphQLSchema(ctx context.Context, md *metadata.Metadata) (*GraphQLSchem fieldsToFunctions[f.Name] = f.Function } + // TODO: the subscription probably should only be added if there is at least one agent defined. + // For now it is hardcoded to always be present. + buf.WriteString(` +scalar JSON + +type AgentEvent { + name: String! + data: JSON + timestamp: String +} + +type Subscription { + agentEvent(agentId: String!): AgentEvent! +} +`) + return &GraphQLSchema{ Schema: buf.String(), FieldsToFunctions: fieldsToFunctions, diff --git a/runtime/graphql/schemagen/schemagen_as_test.go b/runtime/graphql/schemagen/schemagen_as_test.go index fefc02240..5ec3d89b1 100644 --- a/runtime/graphql/schemagen/schemagen_as_test.go +++ b/runtime/graphql/schemagen/schemagen_as_test.go @@ -322,6 +322,18 @@ type StringStringPair { key: String! value: String! } + +scalar JSON + +type AgentEvent { + name: String! + data: JSON + timestamp: String +} + +type Subscription { + agentEvent(agentId: String!): AgentEvent! +} `[1:] require.Nil(t, err) diff --git a/runtime/graphql/schemagen/schemagen_go_test.go b/runtime/graphql/schemagen/schemagen_go_test.go index 0d90bd3ac..f88d6672b 100644 --- a/runtime/graphql/schemagen/schemagen_go_test.go +++ b/runtime/graphql/schemagen/schemagen_go_test.go @@ -364,6 +364,18 @@ type StringStringPair { key: String! value: String! } + +scalar JSON + +type AgentEvent { + name: String! + data: JSON + timestamp: String +} + +type Subscription { + agentEvent(agentId: String!): AgentEvent! +} `[1:] require.Nil(t, err) diff --git a/runtime/hostfunctions/agents.go b/runtime/hostfunctions/agents.go index 6d760db42..c3cbccb5f 100644 --- a/runtime/hostfunctions/agents.go +++ b/runtime/hostfunctions/agents.go @@ -44,4 +44,10 @@ func init() { withMessageDetail(func(agentId string, msgName string, data *string, timeout int64) string { return fmt.Sprintf("AgentId: %s, MsgName: %s", agentId, msgName) })) + + registerHostFunction(module_name, "publishEvent", actors.PublishAgentEvent, + withErrorMessage("Error publishing agent event."), + withMessageDetail(func(agentId, eventName string, eventData *string) string { + return fmt.Sprintf("AgentId: %s, EventName: %s", agentId, eventName) + })) } diff --git a/runtime/httpserver/health.go b/runtime/httpserver/health.go index 813b3750b..a0a54ae76 100644 --- a/runtime/httpserver/health.go +++ b/runtime/httpserver/health.go @@ -16,7 +16,6 @@ import ( "github.com/hypermodeinc/modus/runtime/actors" "github.com/hypermodeinc/modus/runtime/app" - "github.com/hypermodeinc/modus/runtime/utils" ) var healthHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -24,10 +23,10 @@ var healthHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request ver := app.VersionNumber() agents := actors.ListLocalAgents() - // custom format the JSON response for easy readability - + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - utils.WriteJsonContentHeader(w) + + // custom format the JSON response for easy readability _, _ = w.Write([]byte(`{ "status": "ok", "environment": "` + env + `", diff --git a/runtime/integration_tests/postgresql_integration_test.go b/runtime/integration_tests/postgresql_integration_test.go index 5be92d5cf..dd757ccd9 100644 --- a/runtime/integration_tests/postgresql_integration_test.go +++ b/runtime/integration_tests/postgresql_integration_test.go @@ -109,6 +109,7 @@ func updateManifest(t *testing.T, jsonManifest []byte) func() { func TestMain(m *testing.M) { // setup config cfg := app.NewAppConfig(). + WithEnvironment("dev"). WithAppPath(testPluginsPath). WithRefreshInterval(refreshPluginInterval). WithPort(httpListenPort) diff --git a/runtime/main.go b/runtime/main.go index 24d97eae7..da85ee50d 100644 --- a/runtime/main.go +++ b/runtime/main.go @@ -22,6 +22,9 @@ import ( func main() { + // Initialize the app configuration (command-line flags, etc.) + app.Initialize() + // Create the main background context ctx := context.Background() diff --git a/runtime/messages/messages.pb.go b/runtime/messages/messages.pb.go index a2083e52b..ecbadb011 100644 --- a/runtime/messages/messages.pb.go +++ b/runtime/messages/messages.pb.go @@ -9,6 +9,8 @@ package messages import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + structpb "google.golang.org/protobuf/types/known/structpb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" unsafe "unsafe" @@ -125,31 +127,105 @@ func (x *AgentResponseMessage) GetData() string { return "" } +type AgentEventMessage struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Data *structpb.Value `protobuf:"bytes,2,opt,name=data,proto3,oneof" json:"data,omitempty"` + Timestamp *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *AgentEventMessage) Reset() { + *x = AgentEventMessage{} + mi := &file_messages_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *AgentEventMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*AgentEventMessage) ProtoMessage() {} + +func (x *AgentEventMessage) ProtoReflect() protoreflect.Message { + mi := &file_messages_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use AgentEventMessage.ProtoReflect.Descriptor instead. +func (*AgentEventMessage) Descriptor() ([]byte, []int) { + return file_messages_proto_rawDescGZIP(), []int{2} +} + +func (x *AgentEventMessage) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *AgentEventMessage) GetData() *structpb.Value { + if x != nil { + return x.Data + } + return nil +} + +func (x *AgentEventMessage) GetTimestamp() *timestamppb.Timestamp { + if x != nil { + return x.Timestamp + } + return nil +} + var File_messages_proto protoreflect.FileDescriptor var file_messages_proto_rawDesc = string([]byte{ 0x0a, 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x12, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0x65, 0x0a, 0x13, 0x41, 0x67, - 0x65, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x88, 0x01, 0x01, 0x12, 0x18, - 0x0a, 0x07, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x07, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x64, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x64, 0x61, 0x74, - 0x61, 0x22, 0x38, 0x0a, 0x14, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x17, 0x0a, 0x04, 0x64, 0x61, 0x74, - 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x88, - 0x01, 0x01, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x42, 0x8f, 0x01, 0x0a, 0x0c, - 0x63, 0x6f, 0x6d, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x0d, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, 0x5a, - 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x79, 0x70, 0x65, - 0x72, 0x6d, 0x6f, 0x64, 0x65, 0x69, 0x6e, 0x63, 0x2f, 0x6d, 0x6f, 0x64, 0x75, 0x73, 0x2f, 0x72, - 0x75, 0x6e, 0x74, 0x69, 0x6d, 0x65, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0xa2, - 0x02, 0x03, 0x4d, 0x58, 0x58, 0xaa, 0x02, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, - 0xca, 0x02, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0xe2, 0x02, 0x14, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, - 0x74, 0x61, 0xea, 0x02, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, + 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x65, 0x0a, 0x13, 0x41, 0x67, 0x65, + 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x17, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x88, 0x01, 0x01, 0x12, 0x18, 0x0a, + 0x07, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, + 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x64, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x64, 0x61, 0x74, 0x61, + 0x22, 0x38, 0x0a, 0x14, 0x41, 0x67, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x17, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x88, 0x01, + 0x01, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x22, 0x9b, 0x01, 0x0a, 0x11, 0x41, + 0x67, 0x65, 0x6e, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x2f, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x88, 0x01, 0x01, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, + 0x07, 0x0a, 0x05, 0x5f, 0x64, 0x61, 0x74, 0x61, 0x42, 0x8f, 0x01, 0x0a, 0x0c, 0x63, 0x6f, 0x6d, + 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x42, 0x0d, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x48, 0x02, 0x50, 0x01, 0x5a, 0x2e, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x79, 0x70, 0x65, 0x72, 0x6d, 0x6f, + 0x64, 0x65, 0x69, 0x6e, 0x63, 0x2f, 0x6d, 0x6f, 0x64, 0x75, 0x73, 0x2f, 0x72, 0x75, 0x6e, 0x74, + 0x69, 0x6d, 0x65, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0xa2, 0x02, 0x03, 0x4d, + 0x58, 0x58, 0xaa, 0x02, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0xca, 0x02, 0x08, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0xe2, 0x02, 0x14, 0x4d, 0x65, 0x73, 0x73, 0x61, + 0x67, 0x65, 0x73, 0x5c, 0x47, 0x50, 0x42, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0xea, + 0x02, 0x08, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, }) var ( @@ -164,17 +240,22 @@ func file_messages_proto_rawDescGZIP() []byte { return file_messages_proto_rawDescData } -var file_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_messages_proto_goTypes = []any{ - (*AgentRequestMessage)(nil), // 0: messages.AgentRequestMessage - (*AgentResponseMessage)(nil), // 1: messages.AgentResponseMessage + (*AgentRequestMessage)(nil), // 0: messages.AgentRequestMessage + (*AgentResponseMessage)(nil), // 1: messages.AgentResponseMessage + (*AgentEventMessage)(nil), // 2: messages.AgentEventMessage + (*structpb.Value)(nil), // 3: google.protobuf.Value + (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp } var file_messages_proto_depIdxs = []int32{ - 0, // [0:0] is the sub-list for method output_type - 0, // [0:0] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 3, // 0: messages.AgentEventMessage.data:type_name -> google.protobuf.Value + 4, // 1: messages.AgentEventMessage.timestamp:type_name -> google.protobuf.Timestamp + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_messages_proto_init() } @@ -184,13 +265,14 @@ func file_messages_proto_init() { } file_messages_proto_msgTypes[0].OneofWrappers = []any{} file_messages_proto_msgTypes[1].OneofWrappers = []any{} + file_messages_proto_msgTypes[2].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_messages_proto_rawDesc), len(file_messages_proto_rawDesc)), NumEnums: 0, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 0, }, diff --git a/runtime/protos/messages.proto b/runtime/protos/messages.proto index 3b6573ae4..42af26c66 100644 --- a/runtime/protos/messages.proto +++ b/runtime/protos/messages.proto @@ -4,6 +4,9 @@ package messages; option go_package = "github.com/hypermodeinc/modus/runtime/messages"; +import "google/protobuf/struct.proto"; +import "google/protobuf/timestamp.proto"; + message AgentRequestMessage { string name = 1; optional string data = 2; @@ -13,3 +16,9 @@ message AgentRequestMessage { message AgentResponseMessage { optional string data = 1; } + +message AgentEventMessage { + string name = 1; + optional google.protobuf.Value data = 2; + google.protobuf.Timestamp timestamp = 3; +} diff --git a/runtime/services/services.go b/runtime/services/services.go index a126932ba..58abf6b65 100644 --- a/runtime/services/services.go +++ b/runtime/services/services.go @@ -53,11 +53,11 @@ func Start(ctx context.Context) context.Context { storage.Initialize(ctx) db.Initialize(ctx) db.InitModusDb(ctx) - manifestdata.MonitorManifestFile(ctx) + actors.Initialize(ctx) + graphql.Initialize() envfiles.MonitorEnvFiles(ctx) + manifestdata.MonitorManifestFile(ctx) pluginmanager.Initialize(ctx) - graphql.Initialize() - actors.Initialize(ctx) return ctx } diff --git a/runtime/utils/http.go b/runtime/utils/http.go index 0e8095129..fe0afb031 100644 --- a/runtime/utils/http.go +++ b/runtime/utils/http.go @@ -128,7 +128,3 @@ func PostHttp[TResult any](ctx context.Context, url string, payload any, beforeS EndTime: endTime, }, err } - -func WriteJsonContentHeader(w http.ResponseWriter) { - w.Header().Set("Content-Type", "application/json") -} diff --git a/runtime/wasmhost/wasmhost.go b/runtime/wasmhost/wasmhost.go index 2afcf438b..37a781e0d 100644 --- a/runtime/wasmhost/wasmhost.go +++ b/runtime/wasmhost/wasmhost.go @@ -79,7 +79,7 @@ func NewWasmHost(ctx context.Context, registrations ...func(WasmHost) error) Was } func GetWasmHost(ctx context.Context) WasmHost { - host, ok := ctx.Value(utils.WasmHostContextKey).(WasmHost) + host, ok := TryGetWasmHost(ctx) if !ok { logger.Fatal(ctx).Msg("WASM Host not found in context.") return nil @@ -87,6 +87,11 @@ func GetWasmHost(ctx context.Context) WasmHost { return host } +func TryGetWasmHost(ctx context.Context) (WasmHost, bool) { + host, ok := ctx.Value(utils.WasmHostContextKey).(WasmHost) + return host, ok +} + func (host *wasmHost) Close(ctx context.Context) { if err := host.runtime.Close(ctx); err != nil { logger.Err(ctx, err).Msg("Failed to cleanly close the WASM runtime.") diff --git a/sdk/assemblyscript/examples/agents/assembly/counterAgent.ts b/sdk/assemblyscript/examples/agents/assembly/counterAgent.ts index aac4fb960..e486f9334 100644 --- a/sdk/assemblyscript/examples/agents/assembly/counterAgent.ts +++ b/sdk/assemblyscript/examples/agents/assembly/counterAgent.ts @@ -4,7 +4,7 @@ * See the LICENSE file that accompanied this code for further details. */ -import { Agent } from "@hypermode/modus-sdk-as"; +import { Agent, AgentEvent } from "@hypermode/modus-sdk-as"; /** * This is a very simple agent that is used to demonstrate how Modus Agents work. @@ -50,9 +50,7 @@ export class CounterAgent extends Agent { // If you don't need to do anything special when the agent starts, then you can omit it. // It can be used to initialize state, retrieve data, etc. // This is a good place to set up any listeners or subscriptions. - onInitialize(): void { - console.info("Counter agent started"); - } + onInitialize(): void {} // When the agent is suspended, this method is automatically called. Implementing it is optional. // If you don't need to do anything special when the agent is suspended, then you can omit it. @@ -64,15 +62,11 @@ export class CounterAgent extends Agent { // Note that the agent may be suspended and resumed multiple times during its lifetime, // but the Modus Runtime will automatically save and restore the state of the agent, // so you don't need to worry about that here. - onSuspend(): void { - console.info("Counter agent suspended"); - } + onSuspend(): void {} // When the agent is resumed, this method is automatically called. Implementing it is optional. // If you don't need to do anything special when the agent is resumed, then you can omit it. - onResume(): void { - console.info("Counter agent resumed"); - } + onResume(): void {} // When the agent is terminated, this method is automatically called. Implementing it is optional. // It can be used to send final data somewhere, such as a database or an API. @@ -80,9 +74,7 @@ export class CounterAgent extends Agent { // Note that resources are automatically cleaned up when the agent is terminated, // so you don't need to worry about that here. // Once an agent is terminated, it cannot be resumed. - onTerminate(): void { - console.info("Counter agent terminated"); - } + onTerminate(): void {} // This method is called when the agent receives a message. // This is how agents update their state and share data. @@ -105,6 +97,10 @@ export class CounterAgent extends Agent { } else { this.count++; } + + // publish an event to subscribers + this.publishEvent(new CountUpdated(this.count)); + return this.count.toString(); } } @@ -112,3 +108,11 @@ export class CounterAgent extends Agent { return null; } } + + +@json +class CountUpdated extends AgentEvent { + constructor(public count: i32) { + super("countUpdated"); + } +} diff --git a/sdk/assemblyscript/examples/agents/package-lock.json b/sdk/assemblyscript/examples/agents/package-lock.json index c6c390e24..d1daeb080 100644 --- a/sdk/assemblyscript/examples/agents/package-lock.json +++ b/sdk/assemblyscript/examples/agents/package-lock.json @@ -36,7 +36,7 @@ }, "devDependencies": { "@eslint/js": "^9.28.0", - "@types/node": "^22.15.29", + "@types/node": "^22.15.30", "as-test": "^0.4.4", "assemblyscript": "^0.28.2", "assemblyscript-prettier": "^3.0.1", @@ -107,9 +107,9 @@ } }, "node_modules/@eslint/config-helpers": { - "version": "0.2.1", - "resolved": "https://registry.npmjs.org/@eslint/config-helpers/-/config-helpers-0.2.1.tgz", - "integrity": "sha512-RI17tsD2frtDu/3dmI7QRrD4bedNKPM08ziRYaC5AhkGrzIAJelm9kJU1TznK+apx6V+cqRz8tfpEeG3oIyjxw==", + "version": "0.2.2", + "resolved": "https://registry.npmjs.org/@eslint/config-helpers/-/config-helpers-0.2.2.tgz", + "integrity": "sha512-+GPzk8PlG0sPpzdU5ZvIRMPidzAnZDl/s9L+y13iodqvb8leL53bTannOrQ/Im7UkpsmFU5Ily5U60LWixnmLg==", "dev": true, "license": "Apache-2.0", "engines": { @@ -243,9 +243,9 @@ } }, "node_modules/@humanwhocodes/retry": { - "version": "0.4.2", - "resolved": "https://registry.npmjs.org/@humanwhocodes/retry/-/retry-0.4.2.tgz", - "integrity": "sha512-xeO57FpIu4p1Ri3Jq/EXq4ClRm86dVF2z/+kvFnyqVYRavTZmaFaUBbWCOuuTh0o/g7DSsk6kc2vrS4Vl5oPOQ==", + "version": "0.4.3", + "resolved": "https://registry.npmjs.org/@humanwhocodes/retry/-/retry-0.4.3.tgz", + "integrity": "sha512-bV0Tgo9K4hfPCek+aMAn81RppFKv2ySDQeMoSZuvTASywNTnVJCArCZE2FWqpvIatKu7VMRLWlR1EazvVhDyhQ==", "dev": true, "license": "Apache-2.0", "engines": { @@ -299,9 +299,9 @@ } }, "node_modules/@types/estree": { - "version": "1.0.6", - "resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.6.tgz", - "integrity": "sha512-AYnb1nQyY49te+VRAVgmzfcgjYS91mY5P0TKUDCLEM+gNnA+3T6rWITXRLYCpahpqSQbN5cE+gHpnPyXjHWxcw==", + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/@types/estree/-/estree-1.0.8.tgz", + "integrity": "sha512-dWHzHa2WqEXI/O1E9OjrocMTKJl2mSrEolh1Iomrv6U+JuNwaHXsXx9bLu5gG7BUWFIN0skIQJQ/L1rIex4X6w==", "dev": true, "license": "MIT" }, @@ -691,21 +691,14 @@ "url": "https://opencollective.com/assemblyscript" } }, - "node_modules/assemblyscript/node_modules/binaryen": { - "version": "123.0.0-nightly.20250530", - "resolved": "https://registry.npmjs.org/binaryen/-/binaryen-123.0.0-nightly.20250530.tgz", - "integrity": "sha512-d1zPHBN5YlOd3Ff+OUxvVExuFeh8heSnqe+X3bjItFxGLvn4VGBKmrvv7pgy/cRhrIUGuPW138iaWfDhwjyDqg==", + "node_modules/assemblyscript-prettier/node_modules/binaryen": { + "version": "116.0.0-nightly.20240114", + "resolved": "https://registry.npmjs.org/binaryen/-/binaryen-116.0.0-nightly.20240114.tgz", + "integrity": "sha512-0GZrojJnuhoe+hiwji7QFaL3tBlJoA+KFUN7ouYSDGZLSo9CKM8swQX8n/UcbR0d1VuZKU+nhogNzv423JEu5A==", "dev": true, "license": "Apache-2.0", "bin": { - "wasm-as": "bin/wasm-as", - "wasm-ctor-eval": "bin/wasm-ctor-eval", - "wasm-dis": "bin/wasm-dis", - "wasm-merge": "bin/wasm-merge", - "wasm-metadce": "bin/wasm-metadce", "wasm-opt": "bin/wasm-opt", - "wasm-reduce": "bin/wasm-reduce", - "wasm-shell": "bin/wasm-shell", "wasm2js": "bin/wasm2js" } }, @@ -717,13 +710,20 @@ "license": "MIT" }, "node_modules/binaryen": { - "version": "116.0.0-nightly.20240114", - "resolved": "https://registry.npmjs.org/binaryen/-/binaryen-116.0.0-nightly.20240114.tgz", - "integrity": "sha512-0GZrojJnuhoe+hiwji7QFaL3tBlJoA+KFUN7ouYSDGZLSo9CKM8swQX8n/UcbR0d1VuZKU+nhogNzv423JEu5A==", + "version": "123.0.0-nightly.20250530", + "resolved": "https://registry.npmjs.org/binaryen/-/binaryen-123.0.0-nightly.20250530.tgz", + "integrity": "sha512-d1zPHBN5YlOd3Ff+OUxvVExuFeh8heSnqe+X3bjItFxGLvn4VGBKmrvv7pgy/cRhrIUGuPW138iaWfDhwjyDqg==", "dev": true, "license": "Apache-2.0", "bin": { + "wasm-as": "bin/wasm-as", + "wasm-ctor-eval": "bin/wasm-ctor-eval", + "wasm-dis": "bin/wasm-dis", + "wasm-merge": "bin/wasm-merge", + "wasm-metadce": "bin/wasm-metadce", "wasm-opt": "bin/wasm-opt", + "wasm-reduce": "bin/wasm-reduce", + "wasm-shell": "bin/wasm-shell", "wasm2js": "bin/wasm2js" } }, @@ -761,6 +761,23 @@ "node": ">=6" } }, + "node_modules/chalk": { + "version": "4.1.2", + "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", + "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", + "dev": true, + "license": "MIT", + "dependencies": { + "ansi-styles": "^4.1.0", + "supports-color": "^7.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/chalk/chalk?sponsor=1" + } + }, "node_modules/color-convert": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/color-convert/-/color-convert-2.0.1.tgz", @@ -804,9 +821,9 @@ } }, "node_modules/debug": { - "version": "4.4.0", - "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.0.tgz", - "integrity": "sha512-6WTZ/IxCY/T6BALoZHaE4ctp9xm+Z5kY/pzYaCHRFeyVhojxlrm+46y68HA6hr0TcwEssoxNiDEUJQjfPZ/RYA==", + "version": "4.4.1", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.1.tgz", + "integrity": "sha512-KcKCqiftBJcZr++7ykoDIEwSa3XWowTfNPo92BYxjXiyYEVrUQh2aLyhxBCwww+heortUFxEJYcRzosstTEBYQ==", "dev": true, "license": "MIT", "dependencies": { @@ -932,23 +949,6 @@ "url": "https://opencollective.com/eslint" } }, - "node_modules/eslint/node_modules/chalk": { - "version": "4.1.2", - "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", - "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", - "dev": true, - "license": "MIT", - "dependencies": { - "ansi-styles": "^4.1.0", - "supports-color": "^7.1.0" - }, - "engines": { - "node": ">=10" - }, - "funding": { - "url": "https://github.com/chalk/chalk?sponsor=1" - } - }, "node_modules/espree": { "version": "10.3.0", "resolved": "https://registry.npmjs.org/espree/-/espree-10.3.0.tgz", @@ -1346,9 +1346,9 @@ "license": "MIT" }, "node_modules/long": { - "version": "5.3.1", - "resolved": "https://registry.npmjs.org/long/-/long-5.3.1.tgz", - "integrity": "sha512-ka87Jz3gcx/I7Hal94xaN2tZEOPoUOEVftkQqZx2EeQRN7LGdfLlI3FvZ+7WDplm+vK2Urx9ULrvSowtdCieng==", + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/long/-/long-5.3.2.tgz", + "integrity": "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==", "dev": true, "license": "Apache-2.0" }, diff --git a/sdk/assemblyscript/src/assembly/__tests__/agent.spec.ts b/sdk/assemblyscript/src/assembly/__tests__/agent.spec.ts index 7b7fd3def..651e6a2d9 100644 --- a/sdk/assemblyscript/src/assembly/__tests__/agent.spec.ts +++ b/sdk/assemblyscript/src/assembly/__tests__/agent.spec.ts @@ -52,6 +52,12 @@ mockImport( }, ); +mockImport( + "modus_agents.publishEvent", + // eslint-disable-next-line @typescript-eslint/no-unused-vars + (agentId: string, eventName: string, eventData: string | null): void => {}, +); + it("should serialize an AgentStatus using type aliases", () => { const status: AgentStatus = AgentStatus.Resuming; expect(JSON.stringify(status)).toBe('"' + AgentStatus.Resuming + '"'); @@ -85,7 +91,7 @@ it("should list current agents", () => { ]; const agents = listAgents(); expect(agents.length).toBe(1); - const agent = agents[0]!; + const agent = agents[0]; expect(agent.id).toBe("d1086e837bkp4ltjm150"); expect(agent.name).toBe("TaskManager"); expect(agent.status).toBe("running"); diff --git a/sdk/assemblyscript/src/assembly/agent.ts b/sdk/assemblyscript/src/assembly/agent.ts index 712b8cd8e..3d940c806 100644 --- a/sdk/assemblyscript/src/assembly/agent.ts +++ b/sdk/assemblyscript/src/assembly/agent.ts @@ -9,6 +9,7 @@ import { AgentStatus } from "./enums"; import * as utils from "./utils"; +import { JSON } from "json-as"; const agents = new Map(); let activeAgent: Agent | null = null; @@ -87,6 +88,14 @@ export abstract class Agent { onReceiveMessage(msgName: string, data: string | null): string | null { return null; } + + /** + * Publishes an event from this agent to any subscribers. + */ + publishEvent(event: AgentEvent): void { + const data = JSON.stringify(event); + hostPublishEvent(this.id, event.eventName, data); + } } /** @@ -114,6 +123,14 @@ declare function hostGetAgentInfo(agentId: string): AgentInfo; @external("modus_agents", "listAgents") declare function hostListAgents(): AgentInfo[]; +// @ts-expect-error: decorator +@external("modus_agents", "publishEvent") +declare function hostPublishEvent( + agentId: string, + eventName: string, + eventData: string | null, +): void; + /** * Starts an agent with the given name. * This can be called from any user code, such as function or another agent's methods. @@ -271,3 +288,27 @@ export class AgentInfo { this.status = status; } } + +/** + * Base class for agent events. + * Custom agent events should extend this class. + */ +@json +export class AgentEvent { + /** + * The name of the event. + */ + @omit + readonly eventName: string; + + /** + * Creates a new agent event. + * @param eventName The name of the event. + */ + constructor(eventName: string) { + if (eventName == "") { + throw new Error("Event name cannot be empty."); + } + this.eventName = eventName; + } +} diff --git a/sdk/assemblyscript/src/assembly/agents.ts b/sdk/assemblyscript/src/assembly/agents.ts index 5317c5285..9c414ee23 100644 --- a/sdk/assemblyscript/src/assembly/agents.ts +++ b/sdk/assemblyscript/src/assembly/agents.ts @@ -51,6 +51,10 @@ export function sendMessage( if (response == null) { throw new Error("Failed to send message to agent."); } + if (response.error) { + throw new Error(response.error!); + } + return response.data; } @@ -67,5 +71,6 @@ export function sendMessageAsync( } class MessageResponse { - data!: string | null; + data: string | null = null; + error: string | null = null; } diff --git a/sdk/assemblyscript/src/assembly/index.ts b/sdk/assemblyscript/src/assembly/index.ts index 98f5cf51c..a77cf5508 100644 --- a/sdk/assemblyscript/src/assembly/index.ts +++ b/sdk/assemblyscript/src/assembly/index.ts @@ -43,6 +43,6 @@ export { localtime }; export * from "./dynamicmap"; export * from "./enums"; -export { Agent, AgentInfo } from "./agent"; +export { Agent, AgentInfo, AgentEvent } from "./agent"; import * as agents from "./agents"; export { agents }; diff --git a/sdk/assemblyscript/src/plugin.asconfig.json b/sdk/assemblyscript/src/plugin.asconfig.json index d8ac8a170..a2194f96b 100644 --- a/sdk/assemblyscript/src/plugin.asconfig.json +++ b/sdk/assemblyscript/src/plugin.asconfig.json @@ -16,7 +16,8 @@ "Date=wasi_Date" ], "exportStart": "_initialize", - "exportRuntime": true + "exportRuntime": true, + "disableWarning": [228] }, "targets": { "debug": { diff --git a/sdk/go/examples/agents/counterAgent.go b/sdk/go/examples/agents/counterAgent.go index 2b2923391..0525a6c6e 100644 --- a/sdk/go/examples/agents/counterAgent.go +++ b/sdk/go/examples/agents/counterAgent.go @@ -7,7 +7,6 @@ package main import ( - "fmt" "strconv" "github.com/hypermodeinc/modus/sdk/go/pkg/agents" @@ -33,7 +32,7 @@ type CounterAgent struct { // Agents are identified by a name. Each agent in your project must have a unique name. // The name is used to register the agent with the host, and to send messages to it. // It should be a short, descriptive name that reflects the purpose of the agent. -func (c *CounterAgent) Name() string { +func (a *CounterAgent) Name() string { return "Counter" } @@ -44,20 +43,20 @@ func (c *CounterAgent) Name() string { // This method should return the current state of the agent as a string. // Any format is fine, but it should be consistent and easy to parse. -func (c *CounterAgent) GetState() *string { - s := strconv.Itoa(c.count) +func (a *CounterAgent) GetState() *string { + s := strconv.Itoa(a.count) return &s } // This method should set the state of the agent from a string. // The string should be in the same format as the one returned by GetState. // Be sure to consider data compatibility when changing the format of the state. -func (c *CounterAgent) SetState(data *string) { +func (a *CounterAgent) SetState(data *string) { if data == nil { return } if n, err := strconv.Atoi(*data); err == nil { - c.count = n + a.count = n } } @@ -65,8 +64,7 @@ func (c *CounterAgent) SetState(data *string) { // If you don't need to do anything special when the agent starts, then you can omit it. // It can be used to initialize state, retrieve data, etc. // This is a good place to set up any listeners or subscriptions. -func (c *CounterAgent) OnInitialize() error { - fmt.Println("Counter agent started") +func (a *CounterAgent) OnInitialize() error { return nil } @@ -80,15 +78,13 @@ func (c *CounterAgent) OnInitialize() error { // Note that the agent may be suspended and resumed multiple times during its lifetime, // but the Modus Runtime will automatically save and restore the state of the agent, // so you don't need to worry about that here. -func (c *CounterAgent) OnSuspend() error { - fmt.Println("Counter agent suspended") +func (a *CounterAgent) OnSuspend() error { return nil } // When the agent is resumed, this method is automatically called. Implementing it is optional. // If you don't need to do anything special when the agent is resumed, then you can omit it. -func (c *CounterAgent) OnResume() error { - fmt.Println("Counter agent resumed") +func (a *CounterAgent) OnResume() error { return nil } @@ -98,27 +94,46 @@ func (c *CounterAgent) OnResume() error { // Note that resources are automatically cleaned up when the agent is terminated, // so you don't need to worry about that here. // Once an agent is terminated, it cannot be resumed. -func (c *CounterAgent) OnTerminate() error { - fmt.Println("Counter agent terminated") +func (a *CounterAgent) OnTerminate() error { return nil } // This method is called when the agent receives a message. // This is how agents update their state and share data. -func (c *CounterAgent) OnReceiveMessage(msgName string, data *string) (*string, error) { +func (a *CounterAgent) OnReceiveMessage(msgName string, data *string) (*string, error) { switch msgName { case "count": - s := strconv.Itoa(c.count) + // just return the current count + s := strconv.Itoa(a.count) return &s, nil + case "increment": + + // increment the count by 1, or by the specified quantity if provided if data == nil { - c.count++ + a.count++ } else if n, err := strconv.Atoi(*data); err == nil { - c.count += n + a.count += n } - s := strconv.Itoa(c.count) + + // publish an event to subscribers + if err := a.PublishEvent(countUpdated{Count: a.count}); err != nil { + return nil, err + } + + // return the new count as a string + s := strconv.Itoa(a.count) return &s, nil } return nil, nil } + +// This defines the event that is published when the count is updated. +type countUpdated struct { + Count int `json:"count"` +} + +func (e countUpdated) EventName() string { + return "countUpdated" +} diff --git a/sdk/go/pkg/agents/agents.go b/sdk/go/pkg/agents/agents.go index b5816163b..2a1852e41 100644 --- a/sdk/go/pkg/agents/agents.go +++ b/sdk/go/pkg/agents/agents.go @@ -15,14 +15,23 @@ import ( "time" "github.com/hypermodeinc/modus/sdk/go/pkg/console" + "github.com/hypermodeinc/modus/sdk/go/pkg/utils" ) +// AgentInfo contains information about an agent. type AgentInfo struct { Id string Name string Status string } +// AgentEvent is an interface that represents an event emitted by an agent. +// Custom agent events should implement this interface. +type AgentEvent interface { + // EventName returns the type of the event. + EventName() string +} + type AgentStatus = string const ( @@ -260,8 +269,21 @@ func (a *AgentBase) OnReceiveMessage(msgName string, data *string) (*string, err return nil, nil } +// Publishes an event from this agent to any subscribers. +func (a *AgentBase) PublishEvent(event AgentEvent) error { + bytes, err := utils.JsonSerialize(event) + if err != nil { + return fmt.Errorf("failed to serialize event data: %w", err) + } + data := string(bytes) + name := event.EventName() + hostPublishEvent(activeAgentId, &name, &data) + return nil +} + type MessageResponse struct { - data *string + data *string + error *string } type message struct { @@ -339,6 +361,9 @@ func sendMessage(agentId string, m message) (*string, error) { if response == nil { return nil, errors.New("failed to send message to agent") } + if response.error != nil { + return nil, errors.New(*response.error) + } return response.data, nil } diff --git a/sdk/go/pkg/agents/imports_mock.go b/sdk/go/pkg/agents/imports_mock.go index 78c9f4708..648ec0c08 100644 --- a/sdk/go/pkg/agents/imports_mock.go +++ b/sdk/go/pkg/agents/imports_mock.go @@ -20,6 +20,7 @@ var SendMessageCallStack = testutils.NewCallStack() var StopAgentCallStack = testutils.NewCallStack() var GetAgentInfoCallStack = testutils.NewCallStack() var ListAgentsCallStack = testutils.NewCallStack() +var PublishEventCallStack = testutils.NewCallStack() func hostStartAgent(agentName *string) *AgentInfo { StartAgentCallStack.Push(agentName) @@ -78,3 +79,7 @@ func hostListAgents() *[]AgentInfo { {Id: "def456", Name: "Logger", Status: AgentStatusRunning}, } } + +func hostPublishEvent(agentId, eventName, eventData *string) { + PublishEventCallStack.Push(agentId, eventName, eventData) +} diff --git a/sdk/go/pkg/agents/imports_wasi.go b/sdk/go/pkg/agents/imports_wasi.go index 606de29c0..b45704c42 100644 --- a/sdk/go/pkg/agents/imports_wasi.go +++ b/sdk/go/pkg/agents/imports_wasi.go @@ -80,3 +80,7 @@ func hostListAgents() *[]AgentInfo { return (*[]AgentInfo)(ptr) } + +//go:noescape +//go:wasmimport modus_agents publishEvent +func hostPublishEvent(agentId, eventName, eventData *string)