Skip to content
This repository was archived by the owner on Sep 11, 2025. It is now read-only.

Commit dfe5648

Browse files
fix: resolve deadlocks related to open inbound http connections (#891)
1 parent 6aaedc0 commit dfe5648

File tree

5 files changed

+57
-16
lines changed

5 files changed

+57
-16
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- feat: add kubernetes secrets provider and API to read secrets [#885](https://github.com/hypermodeinc/modus/pull/885)
88
- feat: "start" and "stop" are now mutation prefixes [#889](https://github.com/hypermodeinc/modus/pull/889)
99
- fix: start agents synchronously, and add examples setting data on start [#890](https://github.com/hypermodeinc/modus/pull/890)
10+
- fix: resolve deadlocks related to open inbound http connections [#891](https://github.com/hypermodeinc/modus/pull/891)
1011

1112
## 2025-06-10 - Runtime 0.18.0-alpha.6
1213

runtime/actors/agents.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,8 @@ func PublishAgentEvent(ctx context.Context, agentId, eventName string, eventData
257257

258258
topicActor := _actorSystem.TopicActor()
259259

260-
if pid, err := getActorPid(ctx, agentId); err == nil {
260+
pid, err := getActorPid(ctx, agentId)
261+
if err == nil {
261262
return pid.Tell(ctx, topicActor, pubMsg)
262263
}
263264

runtime/graphql/graphql.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/hypermodeinc/modus/runtime/utils"
2727
"github.com/hypermodeinc/modus/runtime/wasmhost"
2828
"github.com/puzpuzpuz/xsync/v4"
29+
"github.com/rs/xid"
2930

3031
"github.com/tidwall/gjson"
3132
"github.com/tidwall/sjson"
@@ -38,6 +39,16 @@ import (
3839

3940
var GraphQLRequestHandler = http.HandlerFunc(handleGraphQLRequest)
4041

42+
var cancelFuncs = xsync.NewMap[string, context.CancelFunc]()
43+
44+
func CancelSubscriptions() {
45+
cancelFuncs.Range(func(key string, cancel context.CancelFunc) bool {
46+
cancel()
47+
return true
48+
})
49+
cancelFuncs.Clear()
50+
}
51+
4152
func Initialize() {
4253
// The GraphQL engine's Activate function should be called when a plugin is loaded.
4354
pluginmanager.RegisterPluginLoadedCallback(engine.Activate)
@@ -162,6 +173,12 @@ func handleGraphQLRequest(w http.ResponseWriter, r *http.Request) {
162173
fmt.Fprintf(w, "event: next\ndata: %s\n\n", data)
163174
flusher.Flush()
164175
})
176+
177+
id := xid.New().String()
178+
var cancel context.CancelFunc
179+
ctx, cancel = context.WithCancel(ctx)
180+
cancelFuncs.Store(id, cancel)
181+
defer cancelFuncs.Delete(id)
165182
}
166183

167184
// Execute the GraphQL operation

runtime/httpserver/dynamicMux.go

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,54 +10,75 @@
1010
package httpserver
1111

1212
import (
13+
"maps"
1314
"net/http"
1415
"strings"
15-
"sync"
16+
17+
"github.com/puzpuzpuz/xsync/v4"
1618
)
1719

1820
type dynamicMux struct {
1921
mux *http.ServeMux
20-
routes map[string]http.Handler
21-
mu sync.RWMutex
22+
routes *xsync.Map[string, http.Handler]
2223
}
2324

2425
func newDynamicMux(routes map[string]http.Handler) *dynamicMux {
26+
27+
mapRoutes := xsync.NewMap[string, http.Handler](xsync.WithPresize(len(routes)))
28+
for path, handler := range routes {
29+
mapRoutes.Store(path, handler)
30+
}
31+
2532
return &dynamicMux{
2633
mux: http.NewServeMux(),
27-
routes: routes,
34+
routes: mapRoutes,
2835
}
2936
}
3037

3138
func (dm *dynamicMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
32-
dm.mu.RLock()
33-
defer dm.mu.RUnlock()
3439

35-
if handler, ok := dm.routes[r.URL.Path]; ok {
40+
if handler, ok := dm.routes.Load(r.URL.Path); ok {
3641
handler.ServeHTTP(w, r)
3742
return
3843
}
3944

4045
path := strings.TrimSuffix(r.URL.Path, "/")
4146
if path != r.URL.Path {
42-
if _, ok := dm.routes[path]; ok {
47+
if _, ok := dm.routes.Load(path); ok {
4348
http.Redirect(w, r, path, http.StatusMovedPermanently)
4449
return
4550
}
4651
}
4752

48-
for route, handler := range dm.routes {
53+
var found bool
54+
dm.routes.Range(func(route string, handler http.Handler) bool {
4955
if len(route) > 1 && strings.HasSuffix(route, "/") &&
5056
(strings.HasPrefix(r.URL.Path, route) || route == r.URL.Path+"/") {
5157
handler.ServeHTTP(w, r)
52-
return
58+
found = true
59+
return false
60+
} else {
61+
return true
5362
}
54-
}
63+
})
5564

56-
http.Error(w, "Not Found", http.StatusNotFound)
65+
if !found {
66+
http.Error(w, "Not Found", http.StatusNotFound)
67+
}
5768
}
5869

5970
func (dm *dynamicMux) ReplaceRoutes(routes map[string]http.Handler) {
60-
dm.mu.Lock()
61-
defer dm.mu.Unlock()
62-
dm.routes = routes
71+
temp := maps.Clone(routes)
72+
dm.routes.Range(func(route string, _ http.Handler) bool {
73+
if handler, ok := temp[route]; ok {
74+
dm.routes.Store(route, handler)
75+
delete(temp, route)
76+
} else {
77+
dm.routes.Delete(route)
78+
}
79+
return true
80+
})
81+
for route, handler := range temp {
82+
dm.routes.Store(route, handler)
83+
}
6384
}

runtime/httpserver/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func startHttpServer(ctx context.Context, mux http.Handler, addresses ...string)
108108
for _, server := range servers {
109109
shutdownCtx, shutdownRelease := context.WithTimeout(ctx, shutdownTimeout)
110110
defer shutdownRelease()
111+
server.RegisterOnShutdown(graphql.CancelSubscriptions)
111112
if err := server.Shutdown(shutdownCtx); err != nil {
112113
logger.Fatal(ctx).Err(err).Msg("HTTP server shutdown error.")
113114
}

0 commit comments

Comments
 (0)