Skip to content

Commit 769594a

Browse files
committed
feat(operator): add optional kagent integration for MCP server discovery
This commit introduces an optional integration between the ToolHive operator and kagent, enabling kagent agents to discover and use MCP servers managed by ToolHive. - Added new mcpserver_kagent.go with logic to manage kagent ToolServer resources - Implemented ensureKagentToolServer() to create/update kagent ToolServers - Implemented deleteKagentToolServer() for cleanup on MCPServer deletion - Added kagent ToolServer management to main reconciliation loop - Added kagent ToolServer cleanup to finalization logic - Added kagentIntegration.enabled flag to values.yaml (default: false) - Added KAGENT_INTEGRATION_ENABLED environment variable to operator deployment - Added conditional RBAC permissions for kagent.dev/toolservers resources - Automatically creates kagent ToolServer resources for each ToolHive MCPServer - ToolServers reference the ToolHive proxy service URL - Proper ownership chain ensures cleanup when MCPServers are deleted - Maps ToolHive transport types to kagent config types: - sse → sse - streamable-http → streamableHttp - stdio → sse (since ToolHive exposes all via HTTP) - Added comprehensive unit tests for kagent integration logic - Tests cover ToolServer creation, updates, and deletion - Tests validate proper URL generation and transport mapping - Added detailed kagent integration section to operator README - Included configuration examples and usage instructions - Documented requirements and how the integration works Kagent is a framework for building AI agents in Kubernetes environments. By creating kagent ToolServer references, we enable kagent agents to: - Discover available MCP servers managed by ToolHive - Use these MCP servers as tools in their workflows - Benefit from ToolHive's security and management features This integration is optional and backward-compatible, requiring explicit enablement via Helm values. Fixes: Enables kagent to use ToolHive-managed MCP servers Signed-off-by: Juan Antonio Osorio <[email protected]>
1 parent 7138ac9 commit 769594a

File tree

7 files changed

+664
-1
lines changed

7 files changed

+664
-1
lines changed

cmd/thv-operator/README.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,84 @@ kubectl describe mcpserver <name>
213213
| `permissionProfile` | Permission profile configuration | No | - |
214214
| `tools` | Allow-list filter on the list of tools | No | - |
215215

216+
### Kagent Integration
217+
218+
The ToolHive operator supports optional integration with [kagent](https://kagent.dev), allowing kagent agents to discover and use MCP servers managed by ToolHive. When enabled, the operator automatically creates kagent ToolServer resources that reference the ToolHive-managed MCP servers.
219+
220+
#### Enabling Kagent Integration
221+
222+
To enable kagent integration, set the following Helm value when installing the operator:
223+
224+
```bash
225+
helm upgrade -i toolhive-operator oci://ghcr.io/stacklok/toolhive/toolhive-operator \
226+
--set kagentIntegration.enabled=true \
227+
-n toolhive-system --create-namespace
228+
```
229+
230+
Or add it to your values file:
231+
232+
```yaml
233+
kagentIntegration:
234+
enabled: true
235+
```
236+
237+
#### How It Works
238+
239+
When kagent integration is enabled:
240+
241+
1. For each ToolHive MCPServer resource created, the operator automatically creates a corresponding kagent ToolServer resource
242+
2. The ToolServer resource references the ToolHive-managed MCP server service URL
243+
3. The ToolServer is owned by the MCPServer, ensuring it's deleted when the MCPServer is removed
244+
4. Kagent agents can then discover and use these ToolServers to access the MCP servers
245+
246+
The kagent ToolServer resources are created with:
247+
- Name: `toolhive-<mcpserver-name>`
248+
- Namespace: Same as the MCPServer
249+
- Transport configuration: Mapped from ToolHive transport types (sse → sse, streamable-http → streamableHttp, stdio → sse)
250+
- Service URL: Points to the ToolHive proxy service
251+
252+
#### Requirements
253+
254+
- Kagent must be installed in your cluster
255+
- The operator needs permissions to manage kagent ToolServer resources (automatically configured when integration is enabled)
256+
257+
#### Example
258+
259+
When you create a ToolHive MCPServer:
260+
261+
```yaml
262+
apiVersion: toolhive.stacklok.dev/v1alpha1
263+
kind: MCPServer
264+
metadata:
265+
name: github
266+
namespace: toolhive-system
267+
spec:
268+
image: ghcr.io/github/github-mcp-server
269+
transport: sse
270+
port: 8080
271+
```
272+
273+
With kagent integration enabled, the operator automatically creates:
274+
275+
```yaml
276+
apiVersion: kagent.dev/v1alpha1
277+
kind: ToolServer
278+
metadata:
279+
name: toolhive-github
280+
namespace: toolhive-system
281+
labels:
282+
toolhive.stacklok.dev/managed-by: toolhive-operator
283+
toolhive.stacklok.dev/mcpserver: github
284+
spec:
285+
description: "ToolHive MCP Server: github"
286+
config:
287+
type: sse
288+
sse:
289+
url: http://mcp-github-proxy.toolhive-system.svc.cluster.local:8080
290+
```
291+
292+
Kagent agents can then reference this ToolServer to use the GitHub MCP server in their workflows.
293+
216294
### Permission Profiles
217295

218296
Permission profiles can be configured in two ways:

cmd/thv-operator/controllers/mcpserver_controller.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,13 @@ func (r *MCPServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
318318
}
319319
}
320320

321+
// Ensure kagent ToolServer resource if integration is enabled
322+
if err := r.ensureKagentToolServer(ctx, mcpServer); err != nil {
323+
ctxLogger.Error(err, "Failed to ensure kagent ToolServer")
324+
// Log the error but don't fail the reconciliation
325+
// This allows the ToolHive MCPServer to work even if kagent integration fails
326+
}
327+
321328
// Check if the deployment spec changed
322329
if r.deploymentNeedsUpdate(deployment, mcpServer) {
323330
// Update the deployment
@@ -1248,6 +1255,13 @@ func (r *MCPServerReconciler) finalizeMCPServer(ctx context.Context, m *mcpv1alp
12481255
return fmt.Errorf("failed to check RunConfig ConfigMap %s: %w", runConfigName, err)
12491256
}
12501257

1258+
// Step 5: Delete associated kagent ToolServer if it exists
1259+
// This should be handled by owner references, but we explicitly delete for safety
1260+
if err := r.deleteKagentToolServer(ctx, m); err != nil {
1261+
// Log the error but don't fail the finalization
1262+
ctxLogger.Error(err, "Failed to delete kagent ToolServer during finalization")
1263+
}
1264+
12511265
// The owner references will automatically delete the deployment and service
12521266
// when the MCPServer is deleted, so we don't need to do anything here.
12531267
return nil
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package controllers
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"strconv"
8+
9+
"k8s.io/apimachinery/pkg/api/equality"
10+
"k8s.io/apimachinery/pkg/api/errors"
11+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
12+
"k8s.io/apimachinery/pkg/runtime/schema"
13+
"k8s.io/apimachinery/pkg/types"
14+
"sigs.k8s.io/controller-runtime/pkg/log"
15+
16+
mcpv1alpha1 "github.com/stacklok/toolhive/cmd/thv-operator/api/v1alpha1"
17+
)
18+
19+
// kagentToolServerGVK defines the GroupVersionKind for kagent ToolServer
20+
var kagentToolServerGVK = schema.GroupVersionKind{
21+
Group: "kagent.dev",
22+
Version: "v1alpha1",
23+
Kind: "ToolServer",
24+
}
25+
26+
// Constants for kagent config types
27+
const (
28+
kagentConfigTypeSSE = "sse"
29+
kagentConfigTypeStreamableHTTP = "streamableHttp"
30+
)
31+
32+
// isKagentIntegrationEnabled checks if kagent integration is enabled via environment variable
33+
func isKagentIntegrationEnabled() bool {
34+
enabled := os.Getenv("KAGENT_INTEGRATION_ENABLED")
35+
if enabled == "" {
36+
return false
37+
}
38+
result, err := strconv.ParseBool(enabled)
39+
if err != nil {
40+
return false
41+
}
42+
return result
43+
}
44+
45+
// ensureKagentToolServer ensures a kagent ToolServer resource exists for the ToolHive MCPServer
46+
func (r *MCPServerReconciler) ensureKagentToolServer(ctx context.Context, mcpServer *mcpv1alpha1.MCPServer) error {
47+
logger := log.FromContext(ctx)
48+
49+
// Check if kagent integration is enabled
50+
if !isKagentIntegrationEnabled() {
51+
// If not enabled, ensure any existing kagent ToolServer is deleted
52+
return r.deleteKagentToolServer(ctx, mcpServer)
53+
}
54+
55+
// Create the kagent ToolServer object
56+
kagentToolServer := r.createKagentToolServerObject(mcpServer)
57+
58+
// Check if the kagent ToolServer already exists
59+
existing := &unstructured.Unstructured{}
60+
existing.SetGroupVersionKind(kagentToolServerGVK)
61+
err := r.Get(ctx, types.NamespacedName{
62+
Name: kagentToolServer.GetName(),
63+
Namespace: kagentToolServer.GetNamespace(),
64+
}, existing)
65+
66+
if errors.IsNotFound(err) {
67+
// Create the kagent ToolServer
68+
logger.Info("Creating kagent ToolServer",
69+
"name", kagentToolServer.GetName(),
70+
"namespace", kagentToolServer.GetNamespace())
71+
if err := r.Create(ctx, kagentToolServer); err != nil {
72+
return fmt.Errorf("failed to create kagent ToolServer: %w", err)
73+
}
74+
return nil
75+
} else if err != nil {
76+
return fmt.Errorf("failed to get kagent ToolServer: %w", err)
77+
}
78+
79+
// Update the kagent ToolServer if needed
80+
existingSpec, _, _ := unstructured.NestedMap(existing.Object, "spec")
81+
desiredSpec, _, _ := unstructured.NestedMap(kagentToolServer.Object, "spec")
82+
83+
if !equality.Semantic.DeepEqual(existingSpec, desiredSpec) {
84+
logger.Info("Updating kagent ToolServer",
85+
"name", kagentToolServer.GetName(),
86+
"namespace", kagentToolServer.GetNamespace())
87+
existing.Object["spec"] = kagentToolServer.Object["spec"]
88+
if err := r.Update(ctx, existing); err != nil {
89+
return fmt.Errorf("failed to update kagent ToolServer: %w", err)
90+
}
91+
}
92+
93+
return nil
94+
}
95+
96+
// deleteKagentToolServer deletes the kagent ToolServer if it exists
97+
func (r *MCPServerReconciler) deleteKagentToolServer(ctx context.Context, mcpServer *mcpv1alpha1.MCPServer) error {
98+
logger := log.FromContext(ctx)
99+
100+
kagentToolServer := &unstructured.Unstructured{}
101+
kagentToolServer.SetGroupVersionKind(kagentToolServerGVK)
102+
kagentToolServer.SetName(fmt.Sprintf("toolhive-%s", mcpServer.Name))
103+
kagentToolServer.SetNamespace(mcpServer.Namespace)
104+
105+
err := r.Get(ctx, types.NamespacedName{
106+
Name: kagentToolServer.GetName(),
107+
Namespace: kagentToolServer.GetNamespace(),
108+
}, kagentToolServer)
109+
110+
if errors.IsNotFound(err) {
111+
// Already deleted
112+
return nil
113+
} else if err != nil {
114+
return fmt.Errorf("failed to get kagent ToolServer for deletion: %w", err)
115+
}
116+
117+
logger.Info("Deleting kagent ToolServer",
118+
"name", kagentToolServer.GetName(),
119+
"namespace", kagentToolServer.GetNamespace())
120+
if err := r.Delete(ctx, kagentToolServer); err != nil && !errors.IsNotFound(err) {
121+
return fmt.Errorf("failed to delete kagent ToolServer: %w", err)
122+
}
123+
124+
return nil
125+
}
126+
127+
// createKagentToolServerObject creates an unstructured kagent ToolServer object
128+
func (*MCPServerReconciler) createKagentToolServerObject(mcpServer *mcpv1alpha1.MCPServer) *unstructured.Unstructured {
129+
kagentToolServer := &unstructured.Unstructured{}
130+
kagentToolServer.SetGroupVersionKind(kagentToolServerGVK)
131+
kagentToolServer.SetName(fmt.Sprintf("toolhive-%s", mcpServer.Name))
132+
kagentToolServer.SetNamespace(mcpServer.Namespace)
133+
134+
// Build the service URL for the ToolHive MCP server
135+
serviceName := createServiceName(mcpServer.Name)
136+
serviceURL := fmt.Sprintf("http://%s.%s.svc.cluster.local:%d",
137+
serviceName, mcpServer.Namespace, mcpServer.Spec.Port)
138+
139+
// Determine the config type based on ToolHive transport
140+
var configType string
141+
var config map[string]interface{}
142+
143+
switch mcpServer.Spec.Transport {
144+
case "sse":
145+
configType = kagentConfigTypeSSE
146+
config = map[string]interface{}{
147+
kagentConfigTypeSSE: map[string]interface{}{
148+
"url": serviceURL,
149+
},
150+
}
151+
case "streamable-http":
152+
configType = kagentConfigTypeStreamableHTTP
153+
config = map[string]interface{}{
154+
kagentConfigTypeStreamableHTTP: map[string]interface{}{
155+
"url": serviceURL,
156+
},
157+
}
158+
default:
159+
// For stdio or any other transport, default to SSE
160+
// since ToolHive exposes everything via HTTP
161+
configType = kagentConfigTypeSSE
162+
config = map[string]interface{}{
163+
kagentConfigTypeSSE: map[string]interface{}{
164+
"url": serviceURL,
165+
},
166+
}
167+
}
168+
169+
config["type"] = configType
170+
171+
// Build the spec
172+
spec := map[string]interface{}{
173+
"description": fmt.Sprintf("ToolHive MCP Server: %s", mcpServer.Name),
174+
"config": config,
175+
}
176+
177+
kagentToolServer.Object = map[string]interface{}{
178+
"apiVersion": "kagent.dev/v1alpha1",
179+
"kind": "ToolServer",
180+
"metadata": map[string]interface{}{
181+
"name": kagentToolServer.GetName(),
182+
"namespace": kagentToolServer.GetNamespace(),
183+
"labels": map[string]interface{}{
184+
"toolhive.stacklok.dev/managed-by": "toolhive-operator",
185+
"toolhive.stacklok.dev/mcpserver": mcpServer.Name,
186+
},
187+
"ownerReferences": []interface{}{
188+
map[string]interface{}{
189+
"apiVersion": "toolhive.stacklok.dev/v1alpha1",
190+
"kind": "MCPServer",
191+
"name": mcpServer.Name,
192+
"uid": string(mcpServer.UID),
193+
"controller": true,
194+
"blockOwnerDeletion": true,
195+
},
196+
},
197+
},
198+
"spec": spec,
199+
}
200+
201+
return kagentToolServer
202+
}

0 commit comments

Comments
 (0)