Skip to content

Commit bce35e0

Browse files
authored
Enable state tracking in status files (#1376)
This switches on the file-based state tracking. It also adds some e2e tests for unhealthy workloads.
1 parent 36a32b8 commit bce35e0

File tree

4 files changed

+255
-10
lines changed

4 files changed

+255
-10
lines changed

pkg/workloads/statuses/status.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,10 @@ func NewStatusManagerFromRuntime(runtime rt.Runtime) StatusManager {
3838
// based on the runtime environment. If running in Kubernetes, it returns the runtime-based
3939
// implementation. Otherwise, it returns the file-based implementation.
4040
func NewStatusManager(runtime rt.Runtime) (StatusManager, error) {
41-
/*
42-
if rt.IsKubernetesRuntime() {
43-
return NewStatusManagerFromRuntime(runtime), nil
44-
}
45-
return NewFileStatusManager(runtime)
46-
*/
47-
// For now, we only support the runtime-based implementation.
48-
return NewStatusManagerFromRuntime(runtime), nil
41+
if rt.IsKubernetesRuntime() {
42+
return NewStatusManagerFromRuntime(runtime), nil
43+
}
44+
return NewFileStatusManager(runtime)
4945
}
5046

5147
// runtimeStatusManager is an implementation of StatusManager that uses the state

pkg/workloads/statuses/status_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -436,9 +436,9 @@ func TestNewStatusManager(t *testing.T) {
436436
expectedType: &runtimeStatusManager{},
437437
},
438438
{
439-
name: "returns runtime status manager outside Kubernetes (temporary behavior)",
439+
name: "returns file status manager outside Kubernetes",
440440
isKubernetes: false,
441-
expectedType: &runtimeStatusManager{}, // Currently always returns runtime manager
441+
expectedType: &fileStatusManager{},
442442
},
443443
}
444444

test/e2e/helpers.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,3 +243,32 @@ func StartDockerCommand(args ...string) *exec.Cmd {
243243
cmd.Env = os.Environ()
244244
return cmd
245245
}
246+
247+
// WaitForWorkloadUnhealthy waits for a workload to be marked as unhealthy
248+
func WaitForWorkloadUnhealthy(config *TestConfig, serverName string, timeout time.Duration) error {
249+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
250+
defer cancel()
251+
252+
ticker := time.NewTicker(2 * time.Second)
253+
defer ticker.Stop()
254+
255+
for {
256+
select {
257+
case <-ctx.Done():
258+
return fmt.Errorf("timeout waiting for workload %s to be marked as unhealthy", serverName)
259+
case <-ticker.C:
260+
stdout, _, err := NewTHVCommand(config, "list", "--all").Run()
261+
if err != nil {
262+
continue
263+
}
264+
265+
// Check if the server is listed and marked as unhealthy
266+
lines := strings.Split(stdout, "\n")
267+
for _, line := range lines {
268+
if strings.Contains(line, serverName) && strings.Contains(line, "unhealthy") {
269+
return nil
270+
}
271+
}
272+
}
273+
}
274+
}

test/e2e/unhealthy_workload_test.go

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package e2e_test
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"os/exec"
7+
"strconv"
8+
"strings"
9+
"syscall"
10+
"time"
11+
12+
. "github.com/onsi/ginkgo/v2"
13+
. "github.com/onsi/gomega"
14+
15+
"github.com/stacklok/toolhive/test/e2e"
16+
)
17+
18+
var _ = Describe("Unhealthy Workload Detection", func() {
19+
var (
20+
config *e2e.TestConfig
21+
serverName string
22+
)
23+
24+
BeforeEach(func() {
25+
config = e2e.NewTestConfig()
26+
serverName = generateUnhealthyTestServerName("unhealthy-test")
27+
28+
// Check if thv binary is available
29+
err := e2e.CheckTHVBinaryAvailable(config)
30+
Expect(err).ToNot(HaveOccurred(), "thv binary should be available")
31+
})
32+
33+
AfterEach(func() {
34+
if config.CleanupAfter {
35+
// Clean up the server if it exists
36+
err := e2e.StopAndRemoveMCPServer(config, serverName)
37+
Expect(err).ToNot(HaveOccurred(), "Should be able to stop and remove server")
38+
}
39+
})
40+
41+
Describe("Detecting unhealthy workloads", func() {
42+
Context("when the proxy process is killed", func() {
43+
It("should mark the workload as unhealthy", func() {
44+
By("Starting an OSV MCP server")
45+
stdout, stderr := e2e.NewTHVCommand(config, "run", "--name", serverName, "osv").ExpectSuccess()
46+
Expect(stdout+stderr).To(ContainSubstring("osv"), "Output should mention the osv server")
47+
48+
By("Waiting for the server to be running")
49+
err := e2e.WaitForMCPServer(config, serverName, 60*time.Second)
50+
Expect(err).ToNot(HaveOccurred(), "Server should be running within 60 seconds")
51+
52+
By("Verifying the server is healthy initially")
53+
stdout, _ = e2e.NewTHVCommand(config, "list").ExpectSuccess()
54+
Expect(stdout).To(ContainSubstring(serverName), "Server should be listed")
55+
Expect(stdout).To(ContainSubstring("running"), "Server should be in running state")
56+
57+
By("Finding and killing the proxy process")
58+
proxyPID, err := findProxyProcess(serverName)
59+
Expect(err).ToNot(HaveOccurred(), "Should be able to find proxy process")
60+
Expect(proxyPID).ToNot(BeZero(), "Proxy PID should not be zero")
61+
62+
// Kill the proxy process
63+
err = killProcess(proxyPID)
64+
Expect(err).ToNot(HaveOccurred(), "Should be able to kill proxy process")
65+
66+
By("Waiting for the workload to be detected as unhealthy")
67+
err = e2e.WaitForWorkloadUnhealthy(config, serverName, 10*time.Second)
68+
Expect(err).ToNot(HaveOccurred(), "Server should be marked as unhealthy within 10 seconds")
69+
70+
By("Verifying the workload shows unhealthy status with context")
71+
stdout, _ = e2e.NewTHVCommand(config, "list", "--all").ExpectSuccess()
72+
Expect(stdout).To(ContainSubstring(serverName), "Server should be listed")
73+
Expect(stdout).To(ContainSubstring("unhealthy"), "Server should be marked as unhealthy")
74+
})
75+
})
76+
77+
Context("when the docker container is killed", func() {
78+
It("should mark the workload as unhealthy", func() {
79+
By("Starting an OSV MCP server")
80+
stdout, stderr := e2e.NewTHVCommand(config, "run", "--name", serverName, "osv").ExpectSuccess()
81+
Expect(stdout+stderr).To(ContainSubstring("osv"), "Output should mention the osv server")
82+
83+
By("Waiting for the server to be running")
84+
err := e2e.WaitForMCPServer(config, serverName, 60*time.Second)
85+
Expect(err).ToNot(HaveOccurred(), "Server should be running within 60 seconds")
86+
87+
By("Verifying the server is healthy initially")
88+
stdout, _ = e2e.NewTHVCommand(config, "list").ExpectSuccess()
89+
Expect(stdout).To(ContainSubstring(serverName), "Server should be listed")
90+
Expect(stdout).To(ContainSubstring("running"), "Server should be in running state")
91+
92+
By("Finding and killing the docker container")
93+
containerName, err := findDockerContainer(serverName)
94+
Expect(err).ToNot(HaveOccurred(), "Should be able to find docker container")
95+
Expect(containerName).ToNot(BeEmpty(), "Container name should not be empty")
96+
97+
// Kill the docker container
98+
err = killDockerContainer(containerName)
99+
Expect(err).ToNot(HaveOccurred(), "Should be able to kill docker container")
100+
101+
By("Waiting for the workload to be detected as unhealthy")
102+
err = e2e.WaitForWorkloadUnhealthy(config, serverName, 10*time.Second)
103+
Expect(err).ToNot(HaveOccurred(), "Server should be marked as unhealthy within 10 seconds")
104+
105+
By("Verifying the workload shows unhealthy status with context")
106+
stdout, _ = e2e.NewTHVCommand(config, "list", "--all").ExpectSuccess()
107+
Expect(stdout).To(ContainSubstring(serverName), "Server should be listed")
108+
Expect(stdout).To(ContainSubstring("unhealthy"), "Server should be marked as unhealthy")
109+
})
110+
})
111+
})
112+
})
113+
114+
// Helper functions for process and container management
115+
116+
// findProxyProcess finds the PID of the proxy process for a given server name
117+
func findProxyProcess(serverName string) (int, error) {
118+
// The proxy process PID should be stored in a file in the temp directory
119+
// following the pattern: toolhive-{serverName}.pid
120+
pidFile := fmt.Sprintf("/tmp/toolhive-%s.pid", serverName)
121+
122+
// Read the PID file
123+
pidBytes, err := os.ReadFile(pidFile)
124+
if err != nil {
125+
return 0, fmt.Errorf("failed to read PID file %s: %w", pidFile, err)
126+
}
127+
128+
pidStr := strings.TrimSpace(string(pidBytes))
129+
pid, err := strconv.Atoi(pidStr)
130+
if err != nil {
131+
return 0, fmt.Errorf("failed to parse PID from file %s: %w", pidFile, err)
132+
}
133+
134+
// Verify the process is actually running
135+
if !isProcessRunning(pid) {
136+
return 0, fmt.Errorf("process with PID %d is not running", pid)
137+
}
138+
139+
return pid, nil
140+
}
141+
142+
// isProcessRunning checks if a process with the given PID is running
143+
func isProcessRunning(pid int) bool {
144+
// Try to find the process
145+
proc, err := os.FindProcess(pid)
146+
if err != nil {
147+
return false
148+
}
149+
150+
// Send signal 0 to check if the process exists
151+
err = proc.Signal(syscall.Signal(0))
152+
return err == nil
153+
}
154+
155+
// killProcess kills a process by its PID
156+
func killProcess(pid int) error {
157+
proc, err := os.FindProcess(pid)
158+
if err != nil {
159+
return fmt.Errorf("failed to find process with PID %d: %w", pid, err)
160+
}
161+
162+
// Send SIGTERM first for graceful shutdown
163+
err = proc.Signal(syscall.SIGTERM)
164+
if err != nil {
165+
return fmt.Errorf("failed to send SIGTERM to process %d: %w", pid, err)
166+
}
167+
168+
// Give it a moment to terminate gracefully
169+
time.Sleep(1 * time.Second)
170+
171+
// Check if it's still running, if so use SIGKILL
172+
if isProcessRunning(pid) {
173+
err = proc.Signal(syscall.SIGKILL)
174+
if err != nil {
175+
return fmt.Errorf("failed to send SIGKILL to process %d: %w", pid, err)
176+
}
177+
}
178+
179+
return nil
180+
}
181+
182+
// findDockerContainer finds the docker container name for a given server name
183+
func findDockerContainer(serverName string) (string, error) {
184+
// Use docker ps to find the container
185+
cmd := exec.Command("docker", "ps", "--filter", fmt.Sprintf("name=%s", serverName), "--format", "{{.Names}}")
186+
output, err := cmd.Output()
187+
if err != nil {
188+
return "", fmt.Errorf("failed to list docker containers: %w", err)
189+
}
190+
191+
containerName := strings.TrimSpace(string(output))
192+
if containerName == "" {
193+
return "", fmt.Errorf("no container found with name pattern %s", serverName)
194+
}
195+
196+
// If multiple containers are returned, take the first one
197+
lines := strings.Split(containerName, "\n")
198+
if len(lines) > 1 {
199+
containerName = lines[0]
200+
}
201+
202+
return containerName, nil
203+
}
204+
205+
// killDockerContainer kills a docker container by name
206+
func killDockerContainer(containerName string) error {
207+
// First try docker kill (SIGKILL)
208+
cmd := exec.Command("docker", "kill", containerName)
209+
err := cmd.Run()
210+
if err != nil {
211+
return fmt.Errorf("failed to kill docker container %s: %w", containerName, err)
212+
}
213+
214+
return nil
215+
}
216+
217+
// generateUnhealthyTestServerName creates a unique server name for unhealthy workload tests
218+
func generateUnhealthyTestServerName(prefix string) string {
219+
return fmt.Sprintf("%s-%d", prefix, GinkgoRandomSeed())
220+
}

0 commit comments

Comments
 (0)