Skip to content

Commit bd49aff

Browse files
authored
Sample script to test replay concurrency safety (#101)
## Overview Noticed some issues with our replay stop + download flow. Adding a script to help with testing scenarios / repeatability. I ran the docker container locally and then hit it directly. Even on local we can reasonably repro the failures ## Testing Ran locally and confirmed we can detect the failures. Example run: ``` Testing concurrent stop race condition URL: http://localhost:444 Duration: 3s Concurrency: 2 Iterations: 5 === Iteration 1/5 (id=race-test-1764972766315168000-0) === Starting recording... Recording for 3 seconds... Calling stop 2 times concurrently... Downloading recording... Downloaded 36 bytes Validating with ffprobe... ❌ FAILED: validation failed: ffprobe failed: exit status 1 Output: [mov,mp4,m4a,3gp,3g2,mj2 @ 0x11f606420] moov atom not found /var/folders/km/8rkz_b8s4dj0djvhk0p82bm80000gn/T/race-test-812359799.mp4: Invalid data found when processing input { } === Iteration 2/5 (id=race-test-1764972770128817000-1) === Starting recording... Recording for 3 seconds... Calling stop 2 times concurrently... Downloading recording... Downloaded 36 bytes Validating with ffprobe... ❌ FAILED: validation failed: ffprobe failed: exit status 1 Output: [mov,mp4,m4a,3gp,3g2,mj2 @ 0x150804820] moov atom not found /var/folders/km/8rkz_b8s4dj0djvhk0p82bm80000gn/T/race-test-877575510.mp4: Invalid data found when processing input { } === Iteration 3/5 (id=race-test-1764972773977277000-2) === Starting recording... Recording for 3 seconds... Calling stop 2 times concurrently... Downloading recording... Downloaded 20968 bytes Validating with ffprobe... Video duration: 3.600000 seconds Cleaning up... ✅ PASSED === Iteration 4/5 (id=race-test-1764972777850324000-3) === Starting recording... Recording for 3 seconds... Calling stop 2 times concurrently... Downloading recording... Downloaded 21007 bytes Validating with ffprobe... Video duration: 3.600000 seconds Cleaning up... ✅ PASSED === Iteration 5/5 (id=race-test-1764972781717936000-4) === Starting recording... Recording for 3 seconds... Calling stop 2 times concurrently... Downloading recording... Downloaded 20978 bytes Validating with ffprobe... Video duration: 3.600000 seconds Cleaning up... ✅ PASSED === RESULTS: 3 passed, 2 failed === ``` <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Introduce a Go script to test concurrent recording stop behavior, add retry dependency, and ignore local recordings. > > - **Scripts/Tooling**: > - Add `server/scripts/concurrent_stop_test/main.go` to reproduce and validate concurrent recording `stop` behavior (start, concurrent stop calls, retrying download, ffprobe validation, cleanup; configurable via flags). > - **Dependencies**: > - Add `github.com/avast/retry-go/v5` (updated `go.mod`/`go.sum`). > - **Repo**: > - Ignore `recordings/` in `.gitignore`. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit 7808c6c. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY -->
1 parent 3dfcec2 commit bd49aff

File tree

4 files changed

+246
-0
lines changed

4 files changed

+246
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,5 @@ infra/tests/*
188188

189189
# mise-en-place
190190
.mise.toml
191+
192+
recordings/

server/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/onkernel/kernel-images/server
33
go 1.25.0
44

55
require (
6+
github.com/avast/retry-go/v5 v5.0.0
67
github.com/coder/websocket v1.8.14
78
github.com/fsnotify/fsnotify v1.9.0
89
github.com/getkin/kin-openapi v0.132.0

server/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
22
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
33
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=
4+
github.com/avast/retry-go/v5 v5.0.0 h1:kf1Qc2UsTZ4qq8elDymqfbISvkyMuhgRxuJqX2NHP7k=
5+
github.com/avast/retry-go/v5 v5.0.0/go.mod h1://d+usmKWio1agtZfS1H/ltTqwtIfBnRq9zEwjc3eH8=
46
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
57
github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g=
68
github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg=
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
// Tool to reproduce Stop concurrency behavior: start a recording, trigger concurrent stops,
2+
// then download and validate the resulting video with ffprobe.
3+
// Usage: go run main.go -url http://localhost:10001 -duration 3 -concurrency 2
4+
package main
5+
6+
import (
7+
"context"
8+
"encoding/json"
9+
"flag"
10+
"fmt"
11+
"net/http"
12+
"os"
13+
"os/exec"
14+
"sync"
15+
"time"
16+
17+
retry "github.com/avast/retry-go/v5"
18+
"github.com/nrednav/cuid2"
19+
oapi "github.com/onkernel/kernel-images/server/lib/oapi"
20+
)
21+
22+
func main() {
23+
baseURL := flag.String("url", "http://localhost:444", "Base URL of the kernel-images API")
24+
duration := flag.Int("duration", 3, "Recording duration in seconds before stopping")
25+
concurrency := flag.Int("concurrency", 2, "Number of concurrent stop calls")
26+
iterations := flag.Int("iterations", 5, "Number of test iterations")
27+
flag.Parse()
28+
29+
fmt.Printf("Testing concurrent stop race condition\n")
30+
fmt.Printf(" URL: %s\n", *baseURL)
31+
fmt.Printf(" Duration: %ds\n", *duration)
32+
fmt.Printf(" Concurrency: %d\n", *concurrency)
33+
fmt.Printf(" Iterations: %d\n", *iterations)
34+
35+
passed := 0
36+
failed := 0
37+
38+
for i := 0; i < *iterations; i++ {
39+
testID := fmt.Sprintf("race-test-%s-%d", cuid2.Generate(), i)
40+
41+
fmt.Printf("=== Iteration %d/%d (id=%s) ===\n", i+1, *iterations, testID)
42+
43+
err := runTest(*baseURL, testID, *duration, *concurrency)
44+
if err != nil {
45+
fmt.Printf("❌ FAILED: %v\n\n", err)
46+
failed++
47+
} else {
48+
fmt.Printf("✅ PASSED\n\n")
49+
passed++
50+
}
51+
}
52+
53+
fmt.Printf("=== RESULTS: %d passed, %d failed ===\n", passed, failed)
54+
if failed > 0 {
55+
os.Exit(1)
56+
}
57+
}
58+
59+
func runTest(baseURL, replayID string, duration, concurrency int) error {
60+
ctx := context.Background()
61+
62+
client, err := oapi.NewClientWithResponses(baseURL)
63+
if err != nil {
64+
return fmt.Errorf("failed to create client: %w", err)
65+
}
66+
67+
fmt.Printf(" Starting recording...\n")
68+
if err := startRecording(ctx, client, replayID); err != nil {
69+
return fmt.Errorf("failed to start recording: %w", err)
70+
}
71+
72+
fmt.Printf(" Recording for %d seconds...\n", duration)
73+
time.Sleep(time.Duration(duration) * time.Second)
74+
75+
fmt.Printf(" Calling stop %d times concurrently...\n", concurrency)
76+
stopResults := make(chan error, concurrency)
77+
var wg sync.WaitGroup
78+
79+
for i := 0; i < concurrency; i++ {
80+
wg.Add(1)
81+
go func(goroutineID int) {
82+
defer wg.Done()
83+
err := stopRecording(ctx, client, replayID)
84+
if err != nil {
85+
stopResults <- fmt.Errorf("goroutine %d: %w", goroutineID, err)
86+
} else {
87+
stopResults <- nil
88+
}
89+
}(i)
90+
}
91+
92+
wg.Wait()
93+
close(stopResults)
94+
95+
var stopErrors []error
96+
for err := range stopResults {
97+
if err != nil {
98+
stopErrors = append(stopErrors, err)
99+
}
100+
}
101+
if len(stopErrors) > 0 {
102+
fmt.Printf(" Stop errors: %v\n", stopErrors)
103+
}
104+
105+
fmt.Printf(" Downloading recording...\n")
106+
data, err := downloadRecording(ctx, client, replayID)
107+
if err != nil {
108+
return fmt.Errorf("failed to download recording: %w", err)
109+
}
110+
fmt.Printf(" Downloaded %d bytes\n", len(data))
111+
112+
tmpFile, err := os.CreateTemp("", "race-test-*.mp4")
113+
if err != nil {
114+
return fmt.Errorf("failed to create temp file: %w", err)
115+
}
116+
defer os.Remove(tmpFile.Name())
117+
118+
if _, err := tmpFile.Write(data); err != nil {
119+
tmpFile.Close()
120+
return fmt.Errorf("failed to write temp file: %w", err)
121+
}
122+
tmpFile.Close()
123+
124+
fmt.Printf(" Validating with ffprobe...\n")
125+
if err := validateMP4(tmpFile.Name()); err != nil {
126+
return fmt.Errorf("validation failed: %w", err)
127+
}
128+
129+
fmt.Printf(" Cleaning up...\n")
130+
_ = deleteRecording(ctx, client, replayID)
131+
132+
return nil
133+
}
134+
135+
func startRecording(ctx context.Context, client *oapi.ClientWithResponses, replayID string) error {
136+
resp, err := client.StartRecordingWithResponse(ctx, oapi.StartRecordingJSONRequestBody{
137+
Id: &replayID,
138+
})
139+
if err != nil {
140+
return err
141+
}
142+
143+
if resp.StatusCode() != http.StatusCreated && resp.StatusCode() != http.StatusConflict {
144+
return fmt.Errorf("unexpected status %d: %s", resp.StatusCode(), string(resp.Body))
145+
}
146+
147+
return nil
148+
}
149+
150+
func stopRecording(ctx context.Context, client *oapi.ClientWithResponses, replayID string) error {
151+
resp, err := client.StopRecordingWithResponse(ctx, oapi.StopRecordingJSONRequestBody{
152+
Id: &replayID,
153+
})
154+
if err != nil {
155+
return err
156+
}
157+
158+
if resp.StatusCode() != http.StatusOK {
159+
return fmt.Errorf("unexpected status %d: %s", resp.StatusCode(), string(resp.Body))
160+
}
161+
162+
return nil
163+
}
164+
165+
func downloadRecording(ctx context.Context, client *oapi.ClientWithResponses, replayID string) ([]byte, error) {
166+
var data []byte
167+
err := retry.New(
168+
retry.Attempts(10),
169+
retry.Delay(500*time.Millisecond),
170+
retry.DelayType(retry.FixedDelay),
171+
retry.LastErrorOnly(true),
172+
retry.Context(ctx),
173+
).Do(func() error {
174+
resp, err := client.DownloadRecordingWithResponse(ctx, &oapi.DownloadRecordingParams{
175+
Id: &replayID,
176+
})
177+
if err != nil {
178+
return err
179+
}
180+
181+
if resp.StatusCode() == http.StatusAccepted {
182+
return fmt.Errorf("recording not ready yet")
183+
}
184+
185+
if resp.StatusCode() != http.StatusOK {
186+
return fmt.Errorf("unexpected status %d: %s", resp.StatusCode(), string(resp.Body))
187+
}
188+
189+
data = resp.Body
190+
return nil
191+
})
192+
if err != nil {
193+
return nil, fmt.Errorf("failed after retries: %w", err)
194+
}
195+
return data, nil
196+
}
197+
198+
func deleteRecording(ctx context.Context, client *oapi.ClientWithResponses, replayID string) error {
199+
resp, err := client.DeleteRecordingWithResponse(ctx, oapi.DeleteRecordingJSONRequestBody{
200+
Id: &replayID,
201+
})
202+
if err != nil {
203+
return err
204+
}
205+
206+
if resp.StatusCode() != http.StatusOK && resp.StatusCode() != http.StatusNotFound {
207+
return fmt.Errorf("unexpected status %d: %s", resp.StatusCode(), string(resp.Body))
208+
}
209+
210+
return nil
211+
}
212+
213+
func validateMP4(filePath string) error {
214+
cmd := exec.Command("ffprobe",
215+
"-v", "error",
216+
"-show_format",
217+
"-show_streams",
218+
"-output_format", "json",
219+
filePath)
220+
221+
output, err := cmd.CombinedOutput()
222+
if err != nil {
223+
return fmt.Errorf("ffprobe failed: %w\nOutput: %s", err, string(output))
224+
}
225+
226+
var result struct {
227+
Format struct {
228+
Duration string `json:"duration"`
229+
} `json:"format"`
230+
}
231+
if err := json.Unmarshal(output, &result); err != nil {
232+
return fmt.Errorf("failed to parse ffprobe output: %w", err)
233+
}
234+
235+
if result.Format.Duration == "" {
236+
return fmt.Errorf("no duration found in video - file may be corrupt")
237+
}
238+
239+
fmt.Printf(" Video duration: %s seconds\n", result.Format.Duration)
240+
return nil
241+
}

0 commit comments

Comments
 (0)