Skip to content

Commit 68db6ac

Browse files
authored
mcp: send a tools changed notification when mcp routes change tools (#1630)
**Description** Sends a tool list changed notification when the MCPRoutes change. We were already notifying when the upstream MCP servers changed the notifications, but now we're also notifying when the routes change or the filtered tools change (without upstream MCP server changes). **Related Issues/PRs (if applicable)** Fixes #1619 **Special notes for reviewers (if applicable)** N/A --------- Signed-off-by: Ignasi Barrera <[email protected]>
1 parent e011e5c commit 68db6ac

File tree

13 files changed

+750
-170
lines changed

13 files changed

+750
-170
lines changed

internal/mcpproxy/config.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
// Copyright Envoy AI Gateway Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
// The full text of the Apache license is available in the LICENSE file at
4+
// the root of the repo.
5+
6+
package mcpproxy
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"maps"
12+
"regexp"
13+
"slices"
14+
"strings"
15+
"sync"
16+
17+
"github.com/envoyproxy/ai-gateway/internal/filterapi"
18+
)
19+
20+
type (
21+
// ProxyConfig holds the main MCP proxy configuration.
22+
ProxyConfig struct {
23+
*mcpProxyConfig
24+
toolChangeSignaler changeSignaler // signals tool changes to active sessions.
25+
}
26+
27+
mcpProxyConfig struct {
28+
backendListenerAddr string
29+
routes map[filterapi.MCPRouteName]*mcpProxyConfigRoute // route name -> backends of that route.
30+
}
31+
32+
mcpProxyConfigRoute struct {
33+
backends map[filterapi.MCPBackendName]filterapi.MCPBackend
34+
toolSelectors map[filterapi.MCPBackendName]*toolSelector
35+
}
36+
37+
// toolSelector filters tools using include patterns with exact matches or regular expressions.
38+
toolSelector struct {
39+
include map[string]struct{}
40+
includeRegexps []*regexp.Regexp
41+
}
42+
43+
// changeSignaler is an interface for signaling configuration changes to multiple
44+
// watchers.
45+
changeSignaler interface {
46+
// Watch returns a channel that is closed then the configuration changes.
47+
// The channel should be obtained by calling this method every time when used in a loop,
48+
// because it will be closed and recreated after each signal is sent.
49+
Watch() <-chan struct{}
50+
// Signal all watchers that the configuration has changed.
51+
Signal()
52+
}
53+
54+
multiWatcherSignaler struct {
55+
mu sync.Mutex
56+
notify chan struct{}
57+
}
58+
)
59+
60+
func (m *mcpProxyConfig) sameTools(other *mcpProxyConfig) bool {
61+
if m == nil || other == nil {
62+
return m == other
63+
}
64+
return maps.EqualFunc(m.routes, other.routes, func(a, b *mcpProxyConfigRoute) bool {
65+
return a.sameTools(b)
66+
})
67+
}
68+
69+
func (m *mcpProxyConfigRoute) sameTools(other *mcpProxyConfigRoute) bool {
70+
if m == nil || other == nil {
71+
return m == other
72+
}
73+
if !equalKeys(m.backends, other.backends) {
74+
return false
75+
}
76+
return maps.EqualFunc(m.toolSelectors, other.toolSelectors, func(a, b *toolSelector) bool {
77+
return a.sameTools(b)
78+
})
79+
}
80+
81+
var sortRegexpAsString = func(a, b *regexp.Regexp) int { return strings.Compare(a.String(), b.String()) }
82+
83+
func equalKeys[K comparable, V any](m1, m2 map[K]V) bool {
84+
return maps.EqualFunc(m1, m2, func(_, _ V) bool { return true })
85+
}
86+
87+
func (t *toolSelector) sameTools(other *toolSelector) bool {
88+
if t == nil || other == nil {
89+
return t == other
90+
}
91+
if !equalKeys(t.include, other.include) {
92+
return false
93+
}
94+
slices.SortFunc(t.includeRegexps, sortRegexpAsString)
95+
slices.SortFunc(other.includeRegexps, sortRegexpAsString)
96+
return slices.EqualFunc(t.includeRegexps, other.includeRegexps,
97+
func(a, b *regexp.Regexp) bool {
98+
return a.String() == b.String()
99+
})
100+
}
101+
102+
func (t *toolSelector) allows(tool string) bool {
103+
// Check include filters - if no filter, allow all; if filter exists, allow only matches
104+
if len(t.include) > 0 {
105+
_, ok := t.include[tool]
106+
return ok
107+
}
108+
if len(t.includeRegexps) > 0 {
109+
for _, re := range t.includeRegexps {
110+
if re.MatchString(tool) {
111+
return true
112+
}
113+
}
114+
return false
115+
}
116+
// No filters, allow all
117+
return true
118+
}
119+
120+
// LoadConfig implements [extproc.ConfigReceiver.LoadConfig] which will be called
121+
// when the configuration is updated on the file system.
122+
func (p *ProxyConfig) LoadConfig(_ context.Context, config *filterapi.Config) error {
123+
newConfig := &mcpProxyConfig{}
124+
mcpConfig := config.MCPConfig
125+
if config.MCPConfig == nil {
126+
return nil
127+
}
128+
129+
// Talk to the backend MCP listener on the local Envoy instance.
130+
newConfig.backendListenerAddr = mcpConfig.BackendListenerAddr
131+
132+
// Build a map of routes to backends.
133+
// Each route has its own set of backends. For a given downstream request,
134+
// the MCP proxy initializes sessions only with the backends tied to that route.
135+
newConfig.routes = make(map[filterapi.MCPRouteName]*mcpProxyConfigRoute, len(mcpConfig.Routes))
136+
137+
for _, route := range mcpConfig.Routes {
138+
r := &mcpProxyConfigRoute{
139+
backends: make(map[filterapi.MCPBackendName]filterapi.MCPBackend, len(route.Backends)),
140+
toolSelectors: make(map[filterapi.MCPBackendName]*toolSelector, len(route.Backends)),
141+
}
142+
for _, backend := range route.Backends {
143+
r.backends[backend.Name] = backend
144+
if s := backend.ToolSelector; s != nil {
145+
ts := &toolSelector{
146+
include: make(map[string]struct{}),
147+
}
148+
for _, tool := range s.Include {
149+
ts.include[tool] = struct{}{}
150+
}
151+
for _, expr := range s.IncludeRegex {
152+
re, err := regexp.Compile(expr)
153+
if err != nil {
154+
return fmt.Errorf("failed to compile include regex %q for backend %q in route %q: %w", expr, backend.Name, route.Name, err)
155+
}
156+
ts.includeRegexps = append(ts.includeRegexps, re)
157+
}
158+
r.toolSelectors[backend.Name] = ts
159+
}
160+
}
161+
newConfig.routes[route.Name] = r
162+
}
163+
164+
toolsChanged := !p.sameTools(newConfig)
165+
p.mcpProxyConfig = newConfig // This is racy, but we don't care.
166+
if toolsChanged {
167+
p.toolChangeSignaler.Signal()
168+
}
169+
170+
return nil
171+
}
172+
173+
// newMultiWatcherSignaler creates a new multi-watcher signaler.
174+
func newMultiWatcherSignaler() *multiWatcherSignaler {
175+
return &multiWatcherSignaler{
176+
notify: make(chan struct{}),
177+
}
178+
}
179+
180+
// Watch returns a channel that is closed then the configuration changes.
181+
// The channel should be obtained by calling this method every time when used in a loop,
182+
// because it will be closed and recreated after each signal is sent.
183+
func (m *multiWatcherSignaler) Watch() <-chan struct{} {
184+
m.mu.Lock()
185+
defer m.mu.Unlock()
186+
return m.notify
187+
}
188+
189+
// Signal notifies all watchers of a configuration change.
190+
func (m *multiWatcherSignaler) Signal() {
191+
m.mu.Lock()
192+
defer m.mu.Unlock()
193+
close(m.notify) // Wake everyone
194+
m.notify = make(chan struct{}) // Create a new channel for future updates
195+
}

0 commit comments

Comments
 (0)