Skip to content

Commit fbbb3ce

Browse files
feat(mcp): implement mcp-hub for hybrid host-cluster visibility (#299)
1 parent be3fde2 commit fbbb3ce

File tree

11 files changed

+817
-4
lines changed

11 files changed

+817
-4
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ help:
2424
@echo " make proxy-build - Build and restart the go proxy server"
2525
@echo " make ingestion-build - Build and restart the go ingestion server"
2626
@echo " make mcp-telemetry-build - Build and restart the go mcp telemetry server"
27-
@echo " make all-build - Build and restart all Go services"
27+
@echo " make mcp-pods-build - Build and restart the go mcp pods server"
28+
@echo " make mcp-hub-build - Build and restart the go mcp hub server"
29+
@echo " make all-build - Build and restart all Go services"
2830
@echo ""
2931
@echo "Host Tier (Systemd & Secrets):"
3032
@echo " make install-services - Install all systemd units"

cmd/mcp-hub/main.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"os"
6+
"os/signal"
7+
"syscall"
8+
9+
"github.com/modelcontextprotocol/go-sdk/mcp"
10+
11+
internalmcp "observability-hub/internal/mcp"
12+
"observability-hub/internal/mcp/providers"
13+
"observability-hub/internal/telemetry"
14+
)
15+
16+
const (
17+
serviceName = "mcp-hub"
18+
version = "0.1.0"
19+
)
20+
21+
func main() {
22+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
23+
defer stop()
24+
25+
// 1. Initialize Telemetry
26+
shutdown, err := telemetry.Init(ctx, serviceName)
27+
if err != nil {
28+
telemetry.Error("failed to initialize telemetry", "error", err)
29+
os.Exit(1)
30+
}
31+
defer shutdown()
32+
33+
// 2. Initialize Hub Provider
34+
hubProvider := providers.NewHubProvider()
35+
36+
// 3. Create MCP Server using established implementation
37+
server := mcp.NewServer(&mcp.Implementation{
38+
Name: serviceName,
39+
Version: version,
40+
}, nil)
41+
42+
// 4. Register Hub Tools
43+
internalmcp.RegisterHubTools(server, hubProvider)
44+
45+
// 5. Run Server (Stdio transport)
46+
telemetry.Info("mcp-hub ready", "tools", []string{"hub_inspect_platform", "hub_inspect_host", "hub_list_host_services", "hub_query_service_logs"})
47+
48+
transport := &mcp.StdioTransport{}
49+
if err := server.Run(ctx, transport); err != nil {
50+
telemetry.Error("mcp-hub execution failed", "error", err)
51+
os.Exit(1)
52+
}
53+
54+
telemetry.Info("shutting down mcp-hub")
55+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# ADR 019: Hybrid Host-MCP Intelligence Layer
2+
3+
- **Status:** Accepted
4+
- **Date:** 2026-03-12
5+
- **Author:** Victoria Cheng
6+
7+
## Context and Problem Statement
8+
9+
The Observability Hub operates in a hybrid environment where high-performance data ingestion pipelines (`ingestion`, `proxy`) and core security infrastructure (`openbao`) reside as systemd services on the physical host, while the telemetry and visualization stack (LGTM) is orchestrated via Kubernetes (K3s).
10+
11+
Existing Model Context Protocol (MCP) implementations, such as `mcp-pods`, provide deep visibility into the cluster but are blind to the host-resident "Pillar Services." This created an intelligence gap where AI agents could not correlate cluster-level telemetry with host-level system state or logs.
12+
13+
## Decision Outcome
14+
15+
Implement a dedicated MCP server, `mcp-hub`, to act as the authoritative "Host-Cluster Bridge."
16+
17+
### Rationale
18+
19+
- **Hybrid Visibility:** Enables agents to diagnose "Dark Matter" failures occurring outside the Kubernetes runtime by wrapping system-native tools (`systemctl`, `journalctl`, `free`, `df`).
20+
- **Semantic Clarity:** Strict namespacing with the `hub_` prefix prevents tool name collisions and provides a clear domain boundary for the agent.
21+
- **Testability & Reliability:** Use of a `CommandRunner` interface allows for 100% unit test coverage of state-parsing logic without side-effects.
22+
- **Architectural Consistency:** Aligns with the `internalmcp` pattern for unified telemetry and signal handling across all MCP services.
23+
24+
## Consequences
25+
26+
### Positive
27+
28+
- **Executive Oversight:** Provides a unified summary (`hub_inspect_platform`) that combines K3s and Host health status.
29+
- **Direct-Path Ingestion:** Enables autonomous log analysis via systemd journal queries, bypassing the telemetry pipeline for real-time investigation.
30+
- **Resource Awareness:** Agents can now correlate host-level resource pressure (Memory/CPU) with pod failures.
31+
32+
### Negative
33+
34+
- **Privileged Access:** Requires host-level execution permissions for systemd and kubectl, increasing the security surface area.
35+
- **Dependency Bloat:** Adds another service to the MCP fleet that requires maintenance and monitoring.
36+
37+
## Verification
38+
39+
- [x] **Unit Tests:** `internal/mcp/providers/hub_test.go` and `internal/mcp/tools/hub_test.go` verify parsing logic and MCP handlers.
40+
- [x] **Registry Check:** Tools are correctly registered with the `hub_` prefix in the implementation.
41+
- [x] **Binary Verification:** `make mcp-hub-build` successfully compiles the service.

docs/decisions/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ This directory serves as the **Institutional Memory** for the Observability Hub.
88

99
| ADR | Title | Status |
1010
| :--- | :--- | :--- |
11+
| **019** | [Hybrid Host-MCP Intelligence Layer](./019-hybrid-host-mcp-intelligence.md) | 🔵 Accepted |
1112
| **018** | [Domain-Isolated MCP Architecture](./018-domain-isolated-mcp-architecture.md) | 🔵 Accepted |
1213
| **017** | [Agentic Interface via MCP](./017-agentic-interface-mcp.md) | 🔵 Accepted |
1314
| **016** | [OpenTofu for K3s Service Management](./016-opentofu-k3s-migration.md) | 🔵 Accepted |

internal/mcp/providers/hub.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package providers
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os/exec"
7+
"strings"
8+
"time"
9+
10+
"observability-hub/internal/telemetry"
11+
)
12+
13+
// CommandRunner defines the interface for executing shell commands.
14+
type CommandRunner interface {
15+
Run(ctx context.Context, name string, arg ...string) ([]byte, error)
16+
}
17+
18+
// RealCommandRunner is the production implementation.
19+
type RealCommandRunner struct{}
20+
21+
func (r *RealCommandRunner) Run(ctx context.Context, name string, arg ...string) ([]byte, error) {
22+
cmd := exec.CommandContext(ctx, name, arg...)
23+
return cmd.CombinedOutput()
24+
}
25+
26+
// HubProvider provides tools for host-level introspection and platform status.
27+
type HubProvider struct {
28+
runner CommandRunner
29+
targetServices []string
30+
}
31+
32+
// NewHubProvider creates a new HubProvider.
33+
func NewHubProvider() *HubProvider {
34+
return &HubProvider{
35+
runner: &RealCommandRunner{},
36+
targetServices: []string{
37+
"ingestion.service",
38+
"proxy.service",
39+
"openbao.service",
40+
"tailscale-gate.service",
41+
},
42+
}
43+
}
44+
45+
// HostResource represents physical resource usage on the host.
46+
type HostResource struct {
47+
CPUUsage string `json:"cpu_usage"`
48+
MemoryTotal string `json:"memory_total"`
49+
MemoryUsed string `json:"memory_used"`
50+
DiskUsage string `json:"disk_usage"`
51+
LoadAverage string `json:"load_average"`
52+
}
53+
54+
// ServiceStatus represents the state of a systemd unit.
55+
type ServiceStatus struct {
56+
Name string `json:"name"`
57+
Active string `json:"active"`
58+
Sub string `json:"sub"`
59+
Since string `json:"since"`
60+
}
61+
62+
// ListHostServices returns the status of target systemd services.
63+
func (p *HubProvider) ListHostServices(ctx context.Context) ([]ServiceStatus, error) {
64+
var statuses []ServiceStatus
65+
66+
for _, svc := range p.targetServices {
67+
out, err := p.runner.Run(ctx, "systemctl", "show", svc, "--property=ActiveState,SubState,ActiveEnterTimestamp")
68+
if err != nil {
69+
telemetry.Warn("systemctl_show_failed", "service", svc, "error", err)
70+
continue
71+
}
72+
73+
status := ServiceStatus{Name: svc}
74+
lines := strings.Split(string(out), "\n")
75+
for _, line := range lines {
76+
parts := strings.SplitN(line, "=", 2)
77+
if len(parts) < 2 {
78+
continue
79+
}
80+
val := strings.TrimSpace(parts[1])
81+
switch parts[0] {
82+
case "ActiveState":
83+
status.Active = val
84+
case "SubState":
85+
status.Sub = val
86+
case "ActiveEnterTimestamp":
87+
status.Since = val
88+
}
89+
}
90+
statuses = append(statuses, status)
91+
}
92+
93+
return statuses, nil
94+
}
95+
96+
// QueryServiceLogs retrieves journal logs for a specific service since a relative time.
97+
func (p *HubProvider) QueryServiceLogs(ctx context.Context, service string, since string) (string, error) {
98+
if since == "" {
99+
since = "5m"
100+
}
101+
102+
argSince := fmt.Sprintf("%s ago", since)
103+
out, err := p.runner.Run(ctx, "journalctl", "-u", service, "--since", argSince, "--no-pager", "-n", "50")
104+
if err != nil {
105+
return "", fmt.Errorf("failed to fetch logs for %s: %w", service, err)
106+
}
107+
108+
return string(out), nil
109+
}
110+
111+
// InspectHost retrieves physical resource statistics.
112+
func (p *HubProvider) InspectHost(ctx context.Context) (*HostResource, error) {
113+
res := &HostResource{}
114+
115+
// Load Average
116+
loadOut, _ := p.runner.Run(ctx, "uptime")
117+
res.LoadAverage = strings.TrimSpace(string(loadOut))
118+
119+
// Memory (free -h)
120+
memOut, _ := p.runner.Run(ctx, "free", "-h")
121+
lines := strings.Split(string(memOut), "\n")
122+
if len(lines) > 1 {
123+
fields := strings.Fields(lines[1]) // Mem row
124+
if len(fields) > 2 {
125+
res.MemoryTotal = fields[1]
126+
res.MemoryUsed = fields[2]
127+
}
128+
}
129+
130+
// Disk (df -h /)
131+
diskOut, _ := p.runner.Run(ctx, "df", "-h", "/")
132+
dLines := strings.Split(string(diskOut), "\n")
133+
if len(dLines) > 1 {
134+
dFields := strings.Fields(dLines[1])
135+
if len(dFields) > 4 {
136+
res.DiskUsage = dFields[4]
137+
}
138+
}
139+
140+
return res, nil
141+
}
142+
143+
// InspectPlatform returns an executive summary of the entire hub.
144+
func (p *HubProvider) InspectPlatform(ctx context.Context) (map[string]interface{}, error) {
145+
summary := make(map[string]interface{})
146+
summary["timestamp"] = time.Now().Format(time.RFC3339)
147+
summary["node"] = "server2"
148+
149+
if _, err := p.runner.Run(ctx, "kubectl", "get", "nodes"); err != nil {
150+
summary["k3s_status"] = "unreachable"
151+
} else {
152+
summary["k3s_status"] = "healthy"
153+
}
154+
155+
services, _ := p.ListHostServices(ctx)
156+
runningCount := 0
157+
for _, s := range services {
158+
if s.Active == "active" {
159+
runningCount++
160+
}
161+
}
162+
summary["host_services_running"] = fmt.Sprintf("%d/%d", runningCount, len(p.targetServices))
163+
164+
return summary, nil
165+
}

0 commit comments

Comments
 (0)