Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,5 @@ infra/tests/*

# mise-en-place
.mise.toml

recordings/
1 change: 1 addition & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/onkernel/kernel-images/server
go 1.25.0

require (
github.com/avast/retry-go/v5 v5.0.0
github.com/coder/websocket v1.8.14
github.com/fsnotify/fsnotify v1.9.0
github.com/getkin/kin-openapi v0.132.0
Expand Down
2 changes: 2 additions & 0 deletions server/go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=
github.com/avast/retry-go/v5 v5.0.0 h1:kf1Qc2UsTZ4qq8elDymqfbISvkyMuhgRxuJqX2NHP7k=
github.com/avast/retry-go/v5 v5.0.0/go.mod h1://d+usmKWio1agtZfS1H/ltTqwtIfBnRq9zEwjc3eH8=
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g=
github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg=
Expand Down
241 changes: 241 additions & 0 deletions server/scripts/concurrent_stop_test/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Tool to reproduce Stop concurrency behavior: start a recording, trigger concurrent stops,
// then download and validate the resulting video with ffprobe.
// Usage: go run main.go -url http://localhost:10001 -duration 3 -concurrency 2
package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"os/exec"
"sync"
"time"

retry "github.com/avast/retry-go/v5"
"github.com/nrednav/cuid2"
oapi "github.com/onkernel/kernel-images/server/lib/oapi"
)

func main() {
baseURL := flag.String("url", "http://localhost:444", "Base URL of the kernel-images API")
duration := flag.Int("duration", 3, "Recording duration in seconds before stopping")
concurrency := flag.Int("concurrency", 2, "Number of concurrent stop calls")
iterations := flag.Int("iterations", 5, "Number of test iterations")
flag.Parse()

fmt.Printf("Testing concurrent stop race condition\n")
fmt.Printf(" URL: %s\n", *baseURL)
fmt.Printf(" Duration: %ds\n", *duration)
fmt.Printf(" Concurrency: %d\n", *concurrency)
fmt.Printf(" Iterations: %d\n", *iterations)

passed := 0
failed := 0

for i := 0; i < *iterations; i++ {
testID := fmt.Sprintf("race-test-%s-%d", cuid2.Generate(), i)

fmt.Printf("=== Iteration %d/%d (id=%s) ===\n", i+1, *iterations, testID)

err := runTest(*baseURL, testID, *duration, *concurrency)
if err != nil {
fmt.Printf("❌ FAILED: %v\n\n", err)
failed++
} else {
fmt.Printf("✅ PASSED\n\n")
passed++
}
}

fmt.Printf("=== RESULTS: %d passed, %d failed ===\n", passed, failed)
if failed > 0 {
os.Exit(1)
}
}

func runTest(baseURL, replayID string, duration, concurrency int) error {
ctx := context.Background()

client, err := oapi.NewClientWithResponses(baseURL)
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}

fmt.Printf(" Starting recording...\n")
if err := startRecording(ctx, client, replayID); err != nil {
return fmt.Errorf("failed to start recording: %w", err)
}

fmt.Printf(" Recording for %d seconds...\n", duration)
time.Sleep(time.Duration(duration) * time.Second)

fmt.Printf(" Calling stop %d times concurrently...\n", concurrency)
stopResults := make(chan error, concurrency)
var wg sync.WaitGroup

for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(goroutineID int) {
defer wg.Done()
err := stopRecording(ctx, client, replayID)
if err != nil {
stopResults <- fmt.Errorf("goroutine %d: %w", goroutineID, err)
} else {
stopResults <- nil
}
}(i)
}

wg.Wait()
close(stopResults)

var stopErrors []error
for err := range stopResults {
if err != nil {
stopErrors = append(stopErrors, err)
}
}
if len(stopErrors) > 0 {
fmt.Printf(" Stop errors: %v\n", stopErrors)
}

fmt.Printf(" Downloading recording...\n")
data, err := downloadRecording(ctx, client, replayID)
if err != nil {
return fmt.Errorf("failed to download recording: %w", err)
}
fmt.Printf(" Downloaded %d bytes\n", len(data))

tmpFile, err := os.CreateTemp("", "race-test-*.mp4")
if err != nil {
return fmt.Errorf("failed to create temp file: %w", err)
}
defer os.Remove(tmpFile.Name())

if _, err := tmpFile.Write(data); err != nil {
tmpFile.Close()
return fmt.Errorf("failed to write temp file: %w", err)
}
tmpFile.Close()

fmt.Printf(" Validating with ffprobe...\n")
if err := validateMP4(tmpFile.Name()); err != nil {
return fmt.Errorf("validation failed: %w", err)
}

fmt.Printf(" Cleaning up...\n")
_ = deleteRecording(ctx, client, replayID)

return nil
}

func startRecording(ctx context.Context, client *oapi.ClientWithResponses, replayID string) error {
resp, err := client.StartRecordingWithResponse(ctx, oapi.StartRecordingJSONRequestBody{
Id: &replayID,
})
if err != nil {
return err
}

if resp.StatusCode() != http.StatusCreated && resp.StatusCode() != http.StatusConflict {
return fmt.Errorf("unexpected status %d: %s", resp.StatusCode(), string(resp.Body))
}

return nil
}

func stopRecording(ctx context.Context, client *oapi.ClientWithResponses, replayID string) error {
resp, err := client.StopRecordingWithResponse(ctx, oapi.StopRecordingJSONRequestBody{
Id: &replayID,
})
if err != nil {
return err
}

if resp.StatusCode() != http.StatusOK {
return fmt.Errorf("unexpected status %d: %s", resp.StatusCode(), string(resp.Body))
}

return nil
}

func downloadRecording(ctx context.Context, client *oapi.ClientWithResponses, replayID string) ([]byte, error) {
var data []byte
err := retry.New(
retry.Attempts(10),
retry.Delay(500*time.Millisecond),
retry.DelayType(retry.FixedDelay),
retry.LastErrorOnly(true),
retry.Context(ctx),
).Do(func() error {
resp, err := client.DownloadRecordingWithResponse(ctx, &oapi.DownloadRecordingParams{
Id: &replayID,
})
if err != nil {
return err
}

if resp.StatusCode() == http.StatusAccepted {
return fmt.Errorf("recording not ready yet")
}

if resp.StatusCode() != http.StatusOK {
return fmt.Errorf("unexpected status %d: %s", resp.StatusCode(), string(resp.Body))
}

data = resp.Body
return nil
})
if err != nil {
return nil, fmt.Errorf("failed after retries: %w", err)
}
return data, nil
}

func deleteRecording(ctx context.Context, client *oapi.ClientWithResponses, replayID string) error {
resp, err := client.DeleteRecordingWithResponse(ctx, oapi.DeleteRecordingJSONRequestBody{
Id: &replayID,
})
if err != nil {
return err
}

if resp.StatusCode() != http.StatusOK && resp.StatusCode() != http.StatusNotFound {
return fmt.Errorf("unexpected status %d: %s", resp.StatusCode(), string(resp.Body))
}

return nil
}

func validateMP4(filePath string) error {
cmd := exec.Command("ffprobe",
"-v", "error",
"-show_format",
"-show_streams",
"-output_format", "json",
filePath)

output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("ffprobe failed: %w\nOutput: %s", err, string(output))
}

var result struct {
Format struct {
Duration string `json:"duration"`
} `json:"format"`
}
if err := json.Unmarshal(output, &result); err != nil {
return fmt.Errorf("failed to parse ffprobe output: %w", err)
}

if result.Format.Duration == "" {
return fmt.Errorf("no duration found in video - file may be corrupt")
}

fmt.Printf(" Video duration: %s seconds\n", result.Format.Duration)
return nil
}
Loading