Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 183 additions & 0 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,3 +1074,186 @@ func (ctx *e2eTestContext) getPodByOwner(namespace, ownerKind, ownerName string)

return found, nil
}

// loadTestResult represents the result of a single load test request
type loadTestResult struct {
success bool
err error
duration time.Duration
}

// runCodeInterpreterLoadTest executes a load test with the specified configuration.
func runCodeInterpreterLoadTest(
t *testing.T,
env *testEnv,
namespace, name string,
requestsPerSecond int,
testDuration time.Duration,
outputMessageFormat string,
) {
totalRequests := int(testDuration.Seconds() * float64(requestsPerSecond))

t.Logf("Starting load test: %d requests per second for %v (total: %d requests)",
requestsPerSecond, testDuration, totalRequests)

// Create a ticker for rate limiting
ticker := time.NewTicker(time.Second / time.Duration(requestsPerSecond))
defer ticker.Stop()

// Track results
results := make(chan loadTestResult, totalRequests)

startTime := time.Now()
requestsSent := 0

// Send requests at controlled rate
for requestsSent < totalRequests {
<-ticker.C // Wait for next tick

requestNum := requestsSent
requestsSent++

go func(reqNum int) {
reqStart := time.Now()

// Create a new session for this request
sessionID, err := env.createCodeInterpreterSession(namespace, name)
if err != nil {
results <- loadTestResult{success: false, err: err, duration: time.Since(reqStart)}
return
}

// Cleanup session when done
defer func() {
_ = env.deleteCodeInterpreterSession(sessionID)
}()

// Execute command
req := &CodeInterpreterExecuteRequest{
Command: []string{"echo", fmt.Sprintf(outputMessageFormat, reqNum)},
}

resp, err := env.invokeCodeInterpreter(namespace, name, sessionID, req)
if err != nil {
results <- loadTestResult{success: false, err: err, duration: time.Since(reqStart)}
return
}

expectedOutput := fmt.Sprintf(outputMessageFormat, reqNum)
if !strings.Contains(resp.Stdout, expectedOutput) {
err := fmt.Errorf("unexpected output: got '%s', expected to contain '%s'", resp.Stdout, expectedOutput)
results <- loadTestResult{success: false, err: err, duration: time.Since(reqStart)}
return
}

duration := time.Since(reqStart)
results <- loadTestResult{success: true, err: nil, duration: duration}
}(requestNum)
}

// Wait for all results
successCount := 0
failureCount := 0
var totalDuration time.Duration
var maxDuration time.Duration
var minDuration = time.Hour

for i := 0; i < totalRequests; i++ {
res := <-results
if res.success {
successCount++
totalDuration += res.duration
if res.duration > maxDuration {
maxDuration = res.duration
}
if res.duration < minDuration {
minDuration = res.duration
}
} else {
failureCount++
t.Logf("Request failed: %v", res.err)
}
}

elapsedTime := time.Since(startTime)
var avgDuration time.Duration
if successCount > 0 {
avgDuration = totalDuration / time.Duration(successCount)
}

t.Logf("Load test results:")
t.Logf(" Total requests: %d", totalRequests)
t.Logf(" Successful: %d", successCount)
t.Logf(" Failed: %d", failureCount)
t.Logf(" Success rate: %.2f%%", float64(successCount)/float64(totalRequests)*100)
t.Logf(" Total elapsed time: %v", elapsedTime)
if successCount > 0 {
t.Logf(" Average response time: %v", avgDuration)
t.Logf(" Min response time: %v", minDuration)
t.Logf(" Max response time: %v", maxDuration)
}
t.Logf(" Actual rate: %.2f req/sec", float64(totalRequests)/elapsedTime.Seconds())

// Verify that most requests succeeded (allow up to 10% failure for network issues)
require.GreaterOrEqual(t, float64(successCount)/float64(totalRequests), 0.9,
"At least 90%% of requests should succeed")
}

// TestCodeInterpreterWarmPoolLoad tests code interpreter with warmpool under load (10 requests per second)
func TestCodeInterpreterWarmPoolLoad(t *testing.T) {
env := newTestEnv(t)
ctx, err := newE2ETestContext()
require.NoError(t, err)

yamlPath := "e2e_code_interpreter_warmpool.yaml"
codeInterpreter, err := loadCodeInterpreterYAML(yamlPath)
require.NoError(t, err)

namespace := codeInterpreter.Namespace
if namespace == "" {
namespace = "default"
}
name := codeInterpreter.Name
warmPoolSize := 0
if codeInterpreter.Spec.WarmPoolSize != nil {
warmPoolSize = int(*codeInterpreter.Spec.WarmPoolSize)
}

t.Logf("Applying %s...", yamlPath)
require.NoError(t, ctx.applyYamlFile(yamlPath))

defer ctx.cleanupCodeInterpreter(t, namespace, name, yamlPath)

ctx.verifyWarmPoolReady(t, namespace, name, warmPoolSize)

// Load test configuration
const (
requestsPerSecond = 10
testDuration = 10 * time.Second
)

// Run load test with warmpool
runCodeInterpreterLoadTest(t, env, namespace, name, requestsPerSecond, testDuration,
"Load test request %d from warmpool!")

// Verify warmpool still has correct number of pods after load test
ctx.verifyWarmPoolReady(t, namespace, name, warmPoolSize)
}

// TestCodeInterpreterBasicInvocationLoad tests code interpreter without warmpool under load (10 requests per second)
func TestCodeInterpreterBasicInvocationLoad(t *testing.T) {
env := newTestEnv(t)

namespace := agentcubeNamespace
name := "e2e-code-interpreter"

// Load test configuration
const (
requestsPerSecond = 10
testDuration = 10 * time.Second
)

// Run load test with basic invocation
runCodeInterpreterLoadTest(t, env, namespace, name, requestsPerSecond, testDuration,
"Load test request %d!")
}
Loading