Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- feat: add kubernetes secrets provider and API to read secrets [#885](https://github.com/hypermodeinc/modus/pull/885)
- feat: "start" and "stop" are now mutation prefixes [#889](https://github.com/hypermodeinc/modus/pull/889)
- fix: start agents synchronously, and add examples setting data on start [#890](https://github.com/hypermodeinc/modus/pull/890)
- fix: resolve deadlocks related to open inbound http connections [#891](https://github.com/hypermodeinc/modus/pull/891)

## 2025-06-10 - Runtime 0.18.0-alpha.6

Expand Down
3 changes: 2 additions & 1 deletion runtime/actors/agents.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ func PublishAgentEvent(ctx context.Context, agentId, eventName string, eventData

topicActor := _actorSystem.TopicActor()

if pid, err := getActorPid(ctx, agentId); err == nil {
pid, err := getActorPid(ctx, agentId)
if err == nil {
return pid.Tell(ctx, topicActor, pubMsg)
}

Expand Down
17 changes: 17 additions & 0 deletions runtime/graphql/graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/hypermodeinc/modus/runtime/utils"
"github.com/hypermodeinc/modus/runtime/wasmhost"
"github.com/puzpuzpuz/xsync/v4"
"github.com/rs/xid"

"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
Expand All @@ -38,6 +39,16 @@ import (

var GraphQLRequestHandler = http.HandlerFunc(handleGraphQLRequest)

var cancelFuncs = xsync.NewMap[string, context.CancelFunc]()

func CancelSubscriptions() {
cancelFuncs.Range(func(key string, cancel context.CancelFunc) bool {
cancel()
return true
})
cancelFuncs.Clear()
}

func Initialize() {
// The GraphQL engine's Activate function should be called when a plugin is loaded.
pluginmanager.RegisterPluginLoadedCallback(engine.Activate)
Expand Down Expand Up @@ -162,6 +173,12 @@ func handleGraphQLRequest(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "event: next\ndata: %s\n\n", data)
flusher.Flush()
})

id := xid.New().String()
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
cancelFuncs.Store(id, cancel)
defer cancelFuncs.Delete(id)
}

// Execute the GraphQL operation
Expand Down
51 changes: 36 additions & 15 deletions runtime/httpserver/dynamicMux.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,54 +10,75 @@
package httpserver

import (
"maps"
"net/http"
"strings"
"sync"

"github.com/puzpuzpuz/xsync/v4"
)

type dynamicMux struct {
mux *http.ServeMux
routes map[string]http.Handler
mu sync.RWMutex
routes *xsync.Map[string, http.Handler]
}

func newDynamicMux(routes map[string]http.Handler) *dynamicMux {

mapRoutes := xsync.NewMap[string, http.Handler](xsync.WithPresize(len(routes)))
for path, handler := range routes {
mapRoutes.Store(path, handler)
}

return &dynamicMux{
mux: http.NewServeMux(),
routes: routes,
routes: mapRoutes,
}
}

func (dm *dynamicMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
dm.mu.RLock()
defer dm.mu.RUnlock()

if handler, ok := dm.routes[r.URL.Path]; ok {
if handler, ok := dm.routes.Load(r.URL.Path); ok {
handler.ServeHTTP(w, r)
return
}

path := strings.TrimSuffix(r.URL.Path, "/")
if path != r.URL.Path {
if _, ok := dm.routes[path]; ok {
if _, ok := dm.routes.Load(path); ok {
http.Redirect(w, r, path, http.StatusMovedPermanently)
return
}
}

for route, handler := range dm.routes {
var found bool
dm.routes.Range(func(route string, handler http.Handler) bool {
if len(route) > 1 && strings.HasSuffix(route, "/") &&
(strings.HasPrefix(r.URL.Path, route) || route == r.URL.Path+"/") {
handler.ServeHTTP(w, r)
return
found = true
return false
} else {
return true
}
}
})

http.Error(w, "Not Found", http.StatusNotFound)
if !found {
http.Error(w, "Not Found", http.StatusNotFound)
}
}

func (dm *dynamicMux) ReplaceRoutes(routes map[string]http.Handler) {
dm.mu.Lock()
defer dm.mu.Unlock()
dm.routes = routes
temp := maps.Clone(routes)
dm.routes.Range(func(route string, _ http.Handler) bool {
if handler, ok := temp[route]; ok {
dm.routes.Store(route, handler)
delete(temp, route)
} else {
dm.routes.Delete(route)
}
return true
})
for route, handler := range temp {
dm.routes.Store(route, handler)
}
}
1 change: 1 addition & 0 deletions runtime/httpserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func startHttpServer(ctx context.Context, mux http.Handler, addresses ...string)
for _, server := range servers {
shutdownCtx, shutdownRelease := context.WithTimeout(ctx, shutdownTimeout)
defer shutdownRelease()
server.RegisterOnShutdown(graphql.CancelSubscriptions)
if err := server.Shutdown(shutdownCtx); err != nil {
logger.Fatal(ctx).Err(err).Msg("HTTP server shutdown error.")
}
Expand Down
Loading