Skip to content

Commit 963af6e

Browse files
JAORMXclaude
andcommitted
Add registry export controller for automatic MCP resource discovery
Implements Phase 1 of operator registry export (PR #2591). The controller watches MCPServer, MCPRemoteProxy, and VirtualMCPServer resources for registry export annotations and aggregates them into a ConfigMap per namespace in the upstream MCP registry format. Key features: - Annotation-based export using `toolhive.stacklok.dev/registry-url` - Per-namespace ConfigMap with `{namespace}-registry-export` naming - Feature toggle via Helm value `operator.features.registryExport` or env var `ENABLE_REGISTRY_EXPORT` (disabled by default) - Stable checksums computed only on server entries (not timestamps) - Deterministic output with sorted entries - Efficient ConfigMap watches using label predicate filter 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 2b448fe commit 963af6e

File tree

11 files changed

+990
-1
lines changed

11 files changed

+990
-1
lines changed
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
// Package controllers provides Kubernetes controllers for the ToolHive operator.
2+
package controllers
3+
4+
import (
5+
"context"
6+
"fmt"
7+
8+
upstreamv0 "github.com/modelcontextprotocol/registry/pkg/api/v0"
9+
"github.com/modelcontextprotocol/registry/pkg/model"
10+
corev1 "k8s.io/api/core/v1"
11+
ctrl "sigs.k8s.io/controller-runtime"
12+
"sigs.k8s.io/controller-runtime/pkg/builder"
13+
"sigs.k8s.io/controller-runtime/pkg/client"
14+
"sigs.k8s.io/controller-runtime/pkg/handler"
15+
"sigs.k8s.io/controller-runtime/pkg/log"
16+
"sigs.k8s.io/controller-runtime/pkg/predicate"
17+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
18+
19+
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
20+
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/registryexport"
21+
"github.com/stacklok/toolhive/pkg/env"
22+
)
23+
24+
// RegistryExportReconciler reconciles MCP resources to export them to the registry.
25+
// It watches MCPServer, MCPRemoteProxy, and VirtualMCPServer resources for the
26+
// registry-url annotation and aggregates them into a ConfigMap per namespace.
27+
type RegistryExportReconciler struct {
28+
client.Client
29+
Generator *registryexport.Generator
30+
ConfigMapMgr *registryexport.ConfigMapManager
31+
}
32+
33+
// +kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=mcpservers,verbs=get;list;watch
34+
// +kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=mcpremoteproxies,verbs=get;list;watch
35+
// +kubebuilder:rbac:groups=toolhive.stacklok.dev,resources=virtualmcpservers,verbs=get;list;watch
36+
// +kubebuilder:rbac:groups="",resources=configmaps,verbs=get;list;watch;create;update;patch;delete
37+
// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
38+
39+
// Reconcile handles reconciliation for registry export.
40+
// It is triggered by changes to any MCP resource in a namespace and aggregates
41+
// all annotated resources into a single ConfigMap.
42+
func (r *RegistryExportReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
43+
ctxLogger := log.FromContext(ctx)
44+
ctxLogger.Info("Reconciling registry export", "namespace", req.Namespace)
45+
46+
// Collect all exportable resources in the namespace
47+
entries, err := r.collectExportableResources(ctx, req.Namespace)
48+
if err != nil {
49+
ctxLogger.Error(err, "Failed to collect exportable resources")
50+
return ctrl.Result{}, err
51+
}
52+
53+
// If no resources to export, delete the ConfigMap if it exists
54+
if len(entries) == 0 {
55+
ctxLogger.Info("No exportable resources found, deleting ConfigMap if exists")
56+
if err := r.ConfigMapMgr.DeleteConfigMap(ctx, req.Namespace); err != nil {
57+
ctxLogger.Error(err, "Failed to delete ConfigMap")
58+
return ctrl.Result{}, err
59+
}
60+
return ctrl.Result{}, nil
61+
}
62+
63+
// Build and update the registry ConfigMap
64+
registry := r.Generator.BuildUpstreamRegistry(entries)
65+
if err := r.ConfigMapMgr.UpsertConfigMap(ctx, req.Namespace, registry); err != nil {
66+
ctxLogger.Error(err, "Failed to upsert ConfigMap")
67+
return ctrl.Result{}, err
68+
}
69+
70+
ctxLogger.Info("Successfully reconciled registry export", "entries", len(entries))
71+
return ctrl.Result{}, nil
72+
}
73+
74+
// collectExportableResources collects all MCP resources with registry export annotations.
75+
func (r *RegistryExportReconciler) collectExportableResources(
76+
ctx context.Context,
77+
namespace string,
78+
) ([]upstreamv0.ServerJSON, error) {
79+
var entries []upstreamv0.ServerJSON
80+
81+
// Collect from MCPServers
82+
mcpServers, err := r.collectMCPServers(ctx, namespace)
83+
if err != nil {
84+
return nil, fmt.Errorf("failed to collect MCPServers: %w", err)
85+
}
86+
entries = append(entries, mcpServers...)
87+
88+
// Collect from MCPRemoteProxies
89+
mcpProxies, err := r.collectMCPRemoteProxies(ctx, namespace)
90+
if err != nil {
91+
return nil, fmt.Errorf("failed to collect MCPRemoteProxies: %w", err)
92+
}
93+
entries = append(entries, mcpProxies...)
94+
95+
// Collect from VirtualMCPServers
96+
vmcpServers, err := r.collectVirtualMCPServers(ctx, namespace)
97+
if err != nil {
98+
return nil, fmt.Errorf("failed to collect VirtualMCPServers: %w", err)
99+
}
100+
entries = append(entries, vmcpServers...)
101+
102+
return entries, nil
103+
}
104+
105+
// collectMCPServers collects registry entries from MCPServer resources.
106+
func (r *RegistryExportReconciler) collectMCPServers(
107+
ctx context.Context,
108+
namespace string,
109+
) ([]upstreamv0.ServerJSON, error) {
110+
ctxLogger := log.FromContext(ctx)
111+
112+
var servers mcpv1alpha1.MCPServerList
113+
if err := r.List(ctx, &servers, client.InNamespace(namespace)); err != nil {
114+
return nil, err
115+
}
116+
117+
var entries []upstreamv0.ServerJSON
118+
for i := range servers.Items {
119+
server := &servers.Items[i]
120+
if !registryexport.HasRegistryExportAnnotation(server) {
121+
continue
122+
}
123+
124+
transport := server.Spec.Transport
125+
if transport == "" {
126+
transport = model.TransportTypeSSE
127+
}
128+
129+
entry, err := r.Generator.GenerateServerEntry(registryexport.ExportableResource{
130+
Object: server,
131+
Transport: transport,
132+
})
133+
if err != nil {
134+
ctxLogger.Error(err, "Failed to generate entry for MCPServer", "name", server.Name)
135+
continue
136+
}
137+
if entry != nil {
138+
entries = append(entries, *entry)
139+
}
140+
}
141+
142+
return entries, nil
143+
}
144+
145+
// collectMCPRemoteProxies collects registry entries from MCPRemoteProxy resources.
146+
func (r *RegistryExportReconciler) collectMCPRemoteProxies(
147+
ctx context.Context,
148+
namespace string,
149+
) ([]upstreamv0.ServerJSON, error) {
150+
ctxLogger := log.FromContext(ctx)
151+
152+
var proxies mcpv1alpha1.MCPRemoteProxyList
153+
if err := r.List(ctx, &proxies, client.InNamespace(namespace)); err != nil {
154+
return nil, err
155+
}
156+
157+
var entries []upstreamv0.ServerJSON
158+
for i := range proxies.Items {
159+
proxy := &proxies.Items[i]
160+
if !registryexport.HasRegistryExportAnnotation(proxy) {
161+
continue
162+
}
163+
164+
transport := proxy.Spec.Transport
165+
if transport == "" {
166+
transport = model.TransportTypeSSE
167+
}
168+
169+
entry, err := r.Generator.GenerateServerEntry(registryexport.ExportableResource{
170+
Object: proxy,
171+
Transport: transport,
172+
})
173+
if err != nil {
174+
ctxLogger.Error(err, "Failed to generate entry for MCPRemoteProxy", "name", proxy.Name)
175+
continue
176+
}
177+
if entry != nil {
178+
entries = append(entries, *entry)
179+
}
180+
}
181+
182+
return entries, nil
183+
}
184+
185+
// collectVirtualMCPServers collects registry entries from VirtualMCPServer resources.
186+
func (r *RegistryExportReconciler) collectVirtualMCPServers(
187+
ctx context.Context,
188+
namespace string,
189+
) ([]upstreamv0.ServerJSON, error) {
190+
ctxLogger := log.FromContext(ctx)
191+
192+
var vmcpServers mcpv1alpha1.VirtualMCPServerList
193+
if err := r.List(ctx, &vmcpServers, client.InNamespace(namespace)); err != nil {
194+
return nil, err
195+
}
196+
197+
var entries []upstreamv0.ServerJSON
198+
for i := range vmcpServers.Items {
199+
vmcp := &vmcpServers.Items[i]
200+
if !registryexport.HasRegistryExportAnnotation(vmcp) {
201+
continue
202+
}
203+
204+
// VirtualMCPServer uses streamable-http by default
205+
entry, err := r.Generator.GenerateServerEntry(registryexport.ExportableResource{
206+
Object: vmcp,
207+
Transport: "streamable-http",
208+
})
209+
if err != nil {
210+
ctxLogger.Error(err, "Failed to generate entry for VirtualMCPServer", "name", vmcp.Name)
211+
continue
212+
}
213+
if entry != nil {
214+
entries = append(entries, *entry)
215+
}
216+
}
217+
218+
return entries, nil
219+
}
220+
221+
// mapMCPResourceToNamespace maps any MCP resource to a reconcile request for its namespace.
222+
// This ensures that any change to an MCP resource triggers a full namespace reconciliation.
223+
// We always trigger reconciliation to handle both annotation additions and removals.
224+
func (*RegistryExportReconciler) mapMCPResourceToNamespace(
225+
_ context.Context,
226+
obj client.Object,
227+
) []reconcile.Request {
228+
return []reconcile.Request{{
229+
NamespacedName: client.ObjectKey{
230+
Namespace: obj.GetNamespace(),
231+
Name: obj.GetNamespace(),
232+
},
233+
}}
234+
}
235+
236+
// SetupWithManager sets up the controller with the Manager.
237+
func (r *RegistryExportReconciler) SetupWithManager(mgr ctrl.Manager) error {
238+
// Predicate to filter only registry export ConfigMaps
239+
registryExportConfigMapPredicate := predicate.NewPredicateFuncs(func(obj client.Object) bool {
240+
labels := obj.GetLabels()
241+
return labels != nil && labels[registryexport.LabelRegistryExport] == registryexport.LabelRegistryExportValue
242+
})
243+
244+
return ctrl.NewControllerManagedBy(mgr).
245+
// Watch only registry export ConfigMaps as the primary resource
246+
For(&corev1.ConfigMap{}, builder.WithPredicates(registryExportConfigMapPredicate)).
247+
Watches(
248+
&mcpv1alpha1.MCPServer{},
249+
handler.EnqueueRequestsFromMapFunc(r.mapMCPResourceToNamespace),
250+
).
251+
Watches(
252+
&mcpv1alpha1.MCPRemoteProxy{},
253+
handler.EnqueueRequestsFromMapFunc(r.mapMCPResourceToNamespace),
254+
).
255+
Watches(
256+
&mcpv1alpha1.VirtualMCPServer{},
257+
handler.EnqueueRequestsFromMapFunc(r.mapMCPResourceToNamespace),
258+
).
259+
Complete(r)
260+
}
261+
262+
// IsRegistryExportEnabled checks if the registry export feature is enabled via environment variable.
263+
func IsRegistryExportEnabled() bool {
264+
return IsRegistryExportEnabledWithEnv(&env.OSReader{})
265+
}
266+
267+
// IsRegistryExportEnabledWithEnv checks if the registry export feature is enabled
268+
// using a custom environment reader for testing.
269+
func IsRegistryExportEnabledWithEnv(envReader env.Reader) bool {
270+
val := envReader.Getenv(registryexport.EnvEnableRegistryExport)
271+
return val == "true" || val == "1" || val == "yes"
272+
}
273+
274+
// NewRegistryExportReconciler creates a new RegistryExportReconciler.
275+
func NewRegistryExportReconciler(c client.Client) *RegistryExportReconciler {
276+
return &RegistryExportReconciler{
277+
Client: c,
278+
Generator: registryexport.NewGenerator(),
279+
ConfigMapMgr: registryexport.NewConfigMapManager(c),
280+
}
281+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package controllers
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"go.uber.org/mock/gomock"
8+
9+
"github.com/stacklok/toolhive/cmd/thv-operator/pkg/registryexport"
10+
"github.com/stacklok/toolhive/pkg/env/mocks"
11+
)
12+
13+
func TestIsRegistryExportEnabledWithEnv(t *testing.T) {
14+
t.Parallel()
15+
tests := []struct {
16+
name string
17+
envValue string
18+
want bool
19+
}{
20+
{"enabled with true", "true", true},
21+
{"enabled with 1", "1", true},
22+
{"enabled with yes", "yes", true},
23+
{"disabled with false", "false", false},
24+
{"disabled with 0", "0", false},
25+
{"disabled with empty", "", false},
26+
}
27+
28+
for _, tt := range tests {
29+
t.Run(tt.name, func(t *testing.T) {
30+
t.Parallel()
31+
ctrl := gomock.NewController(t)
32+
defer ctrl.Finish()
33+
34+
mockReader := mocks.NewMockReader(ctrl)
35+
mockReader.EXPECT().Getenv(registryexport.EnvEnableRegistryExport).Return(tt.envValue)
36+
37+
got := IsRegistryExportEnabledWithEnv(mockReader)
38+
assert.Equal(t, tt.want, got)
39+
})
40+
}
41+
}

cmd/thv-operator/main.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,13 @@ func setupControllersAndWebhooks(mgr ctrl.Manager) error {
199199
if err := (&mcpv1alpha1.VirtualMCPCompositeToolDefinition{}).SetupWebhookWithManager(mgr); err != nil {
200200
return fmt.Errorf("unable to create webhook VirtualMCPCompositeToolDefinition: %w", err)
201201
}
202+
203+
// Set up RegistryExport controller if enabled
204+
if controllers.IsRegistryExportEnabled() {
205+
if err := controllers.NewRegistryExportReconciler(mgr.GetClient()).SetupWithManager(mgr); err != nil {
206+
return fmt.Errorf("unable to create controller RegistryExport: %w", err)
207+
}
208+
}
202209
//+kubebuilder:scaffold:builder
203210

204211
return nil
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Package registryexport provides functionality for automatically exporting
2+
// MCP resources to the registry based on annotations.
3+
package registryexport
4+
5+
const (
6+
// AnnotationRegistryURL is the external URL for registry export.
7+
// When present on an MCP resource, the resource will be exported to the registry.
8+
AnnotationRegistryURL = "toolhive.stacklok.dev/registry-url"
9+
10+
// AnnotationRegistryDescription is the description for servers not in a source registry.
11+
// Required when creating new registry entries for internal/custom servers.
12+
AnnotationRegistryDescription = "toolhive.stacklok.dev/registry-description"
13+
14+
// AnnotationRegistryName overrides the generated server name in the registry.
15+
// If not specified, a name is generated from namespace/name in reverse-DNS format.
16+
AnnotationRegistryName = "toolhive.stacklok.dev/registry-name"
17+
18+
// ConfigMapSuffix is appended to namespace for the export ConfigMap name.
19+
ConfigMapSuffix = "-registry-export"
20+
21+
// ConfigMapKey is the key in the ConfigMap for registry data.
22+
ConfigMapKey = "registry.json"
23+
24+
// LabelRegistryExport identifies ConfigMaps created by the registry export controller.
25+
LabelRegistryExport = "toolhive.stacklok.dev/registry-export"
26+
27+
// LabelRegistryExportValue is the value for the registry export label.
28+
LabelRegistryExportValue = "true"
29+
30+
// EnvEnableRegistryExport is the environment variable to enable/disable registry export.
31+
EnvEnableRegistryExport = "ENABLE_REGISTRY_EXPORT"
32+
)
33+
34+
// GetConfigMapName returns the ConfigMap name for a given namespace.
35+
func GetConfigMapName(namespace string) string {
36+
return namespace + ConfigMapSuffix
37+
}

0 commit comments

Comments
 (0)