|
| 1 | +// Copyright Envoy AI Gateway Authors |
| 2 | +// SPDX-License-Identifier: Apache-2.0 |
| 3 | +// The full text of the Apache license is available in the LICENSE file at |
| 4 | +// the root of the repo. |
| 5 | + |
| 6 | +package mcpproxy |
| 7 | + |
| 8 | +import ( |
| 9 | + "context" |
| 10 | + "fmt" |
| 11 | + "maps" |
| 12 | + "regexp" |
| 13 | + "slices" |
| 14 | + "strings" |
| 15 | + "sync" |
| 16 | + |
| 17 | + "github.com/envoyproxy/ai-gateway/internal/filterapi" |
| 18 | +) |
| 19 | + |
| 20 | +type ( |
| 21 | + // ProxyConfig holds the main MCP proxy configuration. |
| 22 | + ProxyConfig struct { |
| 23 | + *mcpProxyConfig |
| 24 | + toolChangeSignaler changeSignaler // signals tool changes to active sessions. |
| 25 | + } |
| 26 | + |
| 27 | + mcpProxyConfig struct { |
| 28 | + backendListenerAddr string |
| 29 | + routes map[filterapi.MCPRouteName]*mcpProxyConfigRoute // route name -> backends of that route. |
| 30 | + } |
| 31 | + |
| 32 | + mcpProxyConfigRoute struct { |
| 33 | + backends map[filterapi.MCPBackendName]filterapi.MCPBackend |
| 34 | + toolSelectors map[filterapi.MCPBackendName]*toolSelector |
| 35 | + } |
| 36 | + |
| 37 | + // toolSelector filters tools using include patterns with exact matches or regular expressions. |
| 38 | + toolSelector struct { |
| 39 | + include map[string]struct{} |
| 40 | + includeRegexps []*regexp.Regexp |
| 41 | + } |
| 42 | + |
| 43 | + // changeSignaler is an interface for signaling configuration changes to multiple |
| 44 | + // watchers. |
| 45 | + changeSignaler interface { |
| 46 | + // Watch returns a channel that receives a signal when the configuration changes. |
| 47 | + Watch() <-chan struct{} |
| 48 | + // Signal all watchers that the configuration has changed. |
| 49 | + Signal() |
| 50 | + } |
| 51 | + |
| 52 | + multiWatcherSignaler struct { |
| 53 | + mu sync.Mutex |
| 54 | + notify chan struct{} |
| 55 | + } |
| 56 | +) |
| 57 | + |
| 58 | +func (m *mcpProxyConfig) sameTools(other *mcpProxyConfig) bool { |
| 59 | + if m == nil || other == nil { |
| 60 | + return m == other |
| 61 | + } |
| 62 | + return maps.EqualFunc(m.routes, other.routes, func(a, b *mcpProxyConfigRoute) bool { |
| 63 | + return a.sameTools(b) |
| 64 | + }) |
| 65 | +} |
| 66 | + |
| 67 | +func (m *mcpProxyConfigRoute) sameTools(other *mcpProxyConfigRoute) bool { |
| 68 | + if m == nil || other == nil { |
| 69 | + return m == other |
| 70 | + } |
| 71 | + if !equalKeys(m.backends, other.backends) { |
| 72 | + return false |
| 73 | + } |
| 74 | + return maps.EqualFunc(m.toolSelectors, other.toolSelectors, func(a, b *toolSelector) bool { |
| 75 | + return a.sameTools(b) |
| 76 | + }) |
| 77 | +} |
| 78 | + |
| 79 | +var sortRegexpAsString = func(a, b *regexp.Regexp) int { return strings.Compare(a.String(), b.String()) } |
| 80 | + |
| 81 | +func equalKeys[K comparable, V any](m1, m2 map[K]V) bool { |
| 82 | + return maps.EqualFunc(m1, m2, func(_, _ V) bool { return true }) |
| 83 | +} |
| 84 | + |
| 85 | +func (t *toolSelector) sameTools(other *toolSelector) bool { |
| 86 | + if t == nil || other == nil { |
| 87 | + return t == other |
| 88 | + } |
| 89 | + if !equalKeys(t.include, other.include) { |
| 90 | + return false |
| 91 | + } |
| 92 | + slices.SortFunc(t.includeRegexps, sortRegexpAsString) |
| 93 | + slices.SortFunc(other.includeRegexps, sortRegexpAsString) |
| 94 | + return slices.EqualFunc(t.includeRegexps, other.includeRegexps, |
| 95 | + func(a, b *regexp.Regexp) bool { |
| 96 | + return a.String() == b.String() |
| 97 | + }) |
| 98 | +} |
| 99 | + |
| 100 | +func (t *toolSelector) allows(tool string) bool { |
| 101 | + // Check include filters - if no filter, allow all; if filter exists, allow only matches |
| 102 | + if len(t.include) > 0 { |
| 103 | + _, ok := t.include[tool] |
| 104 | + return ok |
| 105 | + } |
| 106 | + if len(t.includeRegexps) > 0 { |
| 107 | + for _, re := range t.includeRegexps { |
| 108 | + if re.MatchString(tool) { |
| 109 | + return true |
| 110 | + } |
| 111 | + } |
| 112 | + return false |
| 113 | + } |
| 114 | + // No filters, allow all |
| 115 | + return true |
| 116 | +} |
| 117 | + |
| 118 | +// LoadConfig implements [extproc.ConfigReceiver.LoadConfig] which will be called |
| 119 | +// when the configuration is updated on the file system. |
| 120 | +func (p *ProxyConfig) LoadConfig(_ context.Context, config *filterapi.Config) error { |
| 121 | + newConfig := &mcpProxyConfig{} |
| 122 | + mcpConfig := config.MCPConfig |
| 123 | + if config.MCPConfig == nil { |
| 124 | + return nil |
| 125 | + } |
| 126 | + |
| 127 | + // Talk to the backend MCP listener on the local Envoy instance. |
| 128 | + newConfig.backendListenerAddr = mcpConfig.BackendListenerAddr |
| 129 | + |
| 130 | + // Build a map of routes to backends. |
| 131 | + // Each route has its own set of backends. For a given downstream request, |
| 132 | + // the MCP proxy initializes sessions only with the backends tied to that route. |
| 133 | + newConfig.routes = make(map[filterapi.MCPRouteName]*mcpProxyConfigRoute, len(mcpConfig.Routes)) |
| 134 | + |
| 135 | + for _, route := range mcpConfig.Routes { |
| 136 | + r := &mcpProxyConfigRoute{ |
| 137 | + backends: make(map[filterapi.MCPBackendName]filterapi.MCPBackend, len(route.Backends)), |
| 138 | + toolSelectors: make(map[filterapi.MCPBackendName]*toolSelector, len(route.Backends)), |
| 139 | + } |
| 140 | + for _, backend := range route.Backends { |
| 141 | + r.backends[backend.Name] = backend |
| 142 | + if s := backend.ToolSelector; s != nil { |
| 143 | + ts := &toolSelector{ |
| 144 | + include: make(map[string]struct{}), |
| 145 | + } |
| 146 | + for _, tool := range s.Include { |
| 147 | + ts.include[tool] = struct{}{} |
| 148 | + } |
| 149 | + for _, expr := range s.IncludeRegex { |
| 150 | + re, err := regexp.Compile(expr) |
| 151 | + if err != nil { |
| 152 | + return fmt.Errorf("failed to compile include regex %q for backend %q in route %q: %w", expr, backend.Name, route.Name, err) |
| 153 | + } |
| 154 | + ts.includeRegexps = append(ts.includeRegexps, re) |
| 155 | + } |
| 156 | + r.toolSelectors[backend.Name] = ts |
| 157 | + } |
| 158 | + } |
| 159 | + newConfig.routes[route.Name] = r |
| 160 | + } |
| 161 | + |
| 162 | + toolsChanged := !p.sameTools(newConfig) |
| 163 | + p.mcpProxyConfig = newConfig // This is racy, but we don't care. |
| 164 | + if toolsChanged { |
| 165 | + p.toolChangeSignaler.Signal() |
| 166 | + } |
| 167 | + |
| 168 | + return nil |
| 169 | +} |
| 170 | + |
| 171 | +// newMultiWatcherSignaler creates a new multi-watcher signaler. |
| 172 | +func newMultiWatcherSignaler() *multiWatcherSignaler { |
| 173 | + return &multiWatcherSignaler{ |
| 174 | + notify: make(chan struct{}), |
| 175 | + } |
| 176 | +} |
| 177 | + |
| 178 | +// Watch returns a channel that is closed when the configuration changes. |
| 179 | +func (m *multiWatcherSignaler) Watch() <-chan struct{} { |
| 180 | + m.mu.Lock() |
| 181 | + defer m.mu.Unlock() |
| 182 | + return m.notify |
| 183 | +} |
| 184 | + |
| 185 | +// Signal notifies all watchers of a configuration change. |
| 186 | +func (m *multiWatcherSignaler) Signal() { |
| 187 | + m.mu.Lock() |
| 188 | + defer m.mu.Unlock() |
| 189 | + close(m.notify) // Wake everyone |
| 190 | + m.notify = make(chan struct{}) // Create a new channel for future updates |
| 191 | +} |
0 commit comments