diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2a717f07..20266b70 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -86,9 +86,6 @@ jobs: - name: Run notification tests run: ./eoapi-cli test notification - - name: Run autoscaling tests - run: ./eoapi-cli test autoscaling - - name: Debug failed deployment if: failure() run: ./eoapi-cli deployment debug diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6d88b4f0..2a5321d6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -34,15 +34,34 @@ repos: hooks: - id: actionlint - # Python type checking for test files + # Python linting and formatting + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.8.4 + hooks: + # Linter + - id: ruff + name: ruff lint + args: ['--fix'] + files: ^tests\/.*\.py$ + # Formatter + - id: ruff-format + name: ruff format + files: ^tests\/.*\.py$ + + # Python type checking for all test files - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.11.2 hooks: - id: mypy - name: mypy strict mode for tests - args: ['--strict', '--ignore-missing-imports'] - files: ^tests\/integration\/.*\.py$ - additional_dependencies: ['types-psycopg2', 'httpx', 'pytest', 'types-requests'] + name: mypy type checking for tests + args: ['--ignore-missing-imports', '--no-strict-optional'] + files: ^tests\/.*\.py$ + additional_dependencies: + - 'types-psycopg2' + - 'types-requests' + - 'httpx' + - 'pytest' + - 'kubernetes' # Fast Helm syntax check only - repo: local diff --git a/CHANGELOG.md b/CHANGELOG.md index d0dbc188..cb20c52f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Added support for annotations on the PgSTAC bootstrap job via `pgstacBootstrap.jobAnnotations` in values.yaml [#381](https://github.com/developmentseed/eoapi-k8s/pull/381) +- Added load testing scripts [#373](https://github.com/developmentseed/eoapi-k8s/pull/373) ### Fixed diff --git a/eoapi-cli b/eoapi-cli index 3c91115e..5d1ac594 100755 --- a/eoapi-cli +++ b/eoapi-cli @@ -16,6 +16,7 @@ readonly COMMANDS=( "cluster" "deployment" "test" + "load" "ingest" "docs" ) @@ -38,6 +39,7 @@ COMMANDS: cluster Manage local Kubernetes clusters for development deployment Deploy and manage eoAPI instances test Run tests (helm, integration, autoscaling) + load Run load testing scenarios ingest Load sample data into eoAPI services docs Generate and serve documentation @@ -56,8 +58,11 @@ EXAMPLES: # Run integration tests only eoapi-cli test integration - # Run autoscaling tests only - eoapi-cli test autoscaling + # Run load tests + eoapi-cli load all + + # Run autoscaling load tests only + eoapi-cli load autoscaling # Ingest sample data eoapi-cli ingest sample-data @@ -99,6 +104,9 @@ get_command_script() { test) echo "${SCRIPTS_DIR}/test.sh" ;; + load) + echo "${SCRIPTS_DIR}/load.sh" + ;; ingest) echo "${SCRIPTS_DIR}/ingest.sh" ;; diff --git a/scripts/lib/common.sh b/scripts/lib/common.sh index 1e4e5a19..7db86c2b 100755 --- a/scripts/lib/common.sh +++ b/scripts/lib/common.sh @@ -270,11 +270,151 @@ cleanup_on_exit() { trap cleanup_on_exit EXIT -# Export functions +get_base_url() { + local namespace="${1:-eoapi}" + + # Try localhost first (most common in local dev) + if curl -s -f -m 3 "http://localhost/stac" >/dev/null 2>&1; then + echo "http://localhost" + return 0 + fi + + # Try ingress if configured + local host + host=$(kubectl get ingress -n "$namespace" -o jsonpath='{.items[0].spec.rules[0].host}' 2>/dev/null || echo "") + if [[ -n "$host" ]] && curl -s -f -m 3 "http://$host/stac" >/dev/null 2>&1; then + echo "http://$host" + return 0 + fi + + return 1 +} + +validate_autoscaling_environment() { + local namespace="$1" + + validate_cluster || return 1 + validate_namespace "$namespace" || return 1 + + # Check HPA exists + if ! kubectl get hpa -n "$namespace" >/dev/null 2>&1 || [[ $(kubectl get hpa -n "$namespace" --no-headers 2>/dev/null | wc -l) -eq 0 ]]; then + log_error "No HPA resources found. Deploy with autoscaling enabled." + return 1 + fi + + # Check metrics server + if ! kubectl get deployment -A | grep -q metrics-server; then + log_error "metrics-server required for autoscaling tests" + return 1 + fi + + return 0 +} + +validate_python_environment() { + if ! command_exists python3; then + log_error "python3 is required but not found" + log_info "Install python3 to continue" + return 1 + fi + + log_debug "Python3 environment validated" + return 0 +} + +install_python_requirements() { + local requirements_file="$1" + local project_root="${2:-}" + + local full_path="$requirements_file" + [[ -n "$project_root" ]] && full_path="$project_root/$requirements_file" + + [[ ! -f "$full_path" ]] && { log_error "Requirements file not found: $full_path"; return 1; } + + # Already in a venv? Just install + if [[ -n "${VIRTUAL_ENV:-}" ]]; then + log_debug "Using existing virtual environment: $VIRTUAL_ENV" + if python3 -m pip install -q -r "$full_path"; then + return 0 + else + log_error "Failed to install requirements in existing venv" + return 1 + fi + fi + + # Check if .venv exists and activate it + local venv_dir="${project_root:-.}/.venv" + if [[ -d "$venv_dir" ]]; then + log_debug "Activating existing venv: $venv_dir" + # shellcheck source=/dev/null + if source "$venv_dir/bin/activate"; then + if python3 -m pip install -q -r "$full_path"; then + return 0 + else + log_warn "Failed to install in existing venv, will recreate" + deactivate 2>/dev/null || true + rm -rf "$venv_dir" + fi + else + log_warn "Failed to activate venv, will recreate" + rm -rf "$venv_dir" + fi + fi + + # Create new venv (prefer uv for speed) + log_info "Creating virtual environment at $venv_dir..." + if command_exists uv; then + if uv venv "$venv_dir" >/dev/null 2>&1; then + # shellcheck source=/dev/null + source "$venv_dir/bin/activate" || { log_error "Failed to activate uv venv"; return 1; } + if uv pip install -q -r "$full_path"; then + return 0 + else + log_error "uv pip install failed" + return 1 + fi + fi + log_warn "uv venv creation failed, falling back to python3 -m venv" + fi + + if python3 -m venv "$venv_dir" 2>&1; then + # shellcheck source=/dev/null + source "$venv_dir/bin/activate" || { log_error "Failed to activate venv"; return 1; } + if python3 -m pip install -q -r "$full_path"; then + return 0 + else + log_error "pip install failed" + return 1 + fi + fi + + log_error "Failed to create virtual environment" + log_info "Try manually: python3 -m venv .venv && source .venv/bin/activate && pip install -r $requirements_file" + return 1 +} + +validate_python_with_requirements() { + local requirements_file="${1:-}" + local project_root="${2:-}" + + validate_python_environment || return 1 + + if [[ -n "$requirements_file" ]]; then + install_python_requirements "$requirements_file" "$project_root" || { + log_warn "Python requirements installation failed, but continuing..." + return 0 # Don't fail the entire operation + } + fi + + return 0 +} + +# Export all functions for use in other scripts export -f log_info log_success log_warn log_error log_debug export -f command_exists validate_tools check_requirements validate_cluster -export -f is_ci validate_namespace +export -f is_ci validate_namespace get_base_url export -f detect_release_name detect_namespace -export -f wait_for_pods validate_eoapi_deployment +export -f wait_for_pods validate_eoapi_deployment validate_autoscaling_environment export -f preflight_deploy preflight_ingest preflight_test +export -f validate_python_environment install_python_requirements validate_python_with_requirements export -f show_standard_options diff --git a/scripts/load.sh b/scripts/load.sh new file mode 100755 index 00000000..86e5e252 --- /dev/null +++ b/scripts/load.sh @@ -0,0 +1,375 @@ +#!/usr/bin/env bash + +# eoAPI Scripts - Load Testing Management +# Run various load testing scenarios for eoAPI + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +source "${SCRIPT_DIR}/lib/common.sh" + +NAMESPACE="${NAMESPACE:-eoapi}" +RELEASE_NAME="${RELEASE_NAME:-eoapi}" + +show_help() { + cat < [ARGS] + +COMMANDS: + baseline Low load, verify monitoring works + autoscaling Delegate to autoscaling.sh for HPA tests + normal Realistic scenario + stress Find breaking points + chaos Kill pods during load, test resilience + all Run all load tests + +OPTIONS: + -h, --help Show this help message + -d, --debug Enable debug mode + -n, --namespace Set Kubernetes namespace + --release NAME Helm release name (default: ${RELEASE_NAME}) + --report-json FILE Export metrics to JSON file + --prometheus-url URL Prometheus URL for infrastructure metrics + --collect-infra-metrics Collect infrastructure metrics from Prometheus + +EXAMPLES: + # Run baseline load test + $(basename "$0") baseline + + # Test autoscaling behavior + $(basename "$0") autoscaling --debug + + # Find breaking points with metrics export + $(basename "$0") stress --debug --report-json stress-results.json + + # Run with Prometheus integration + $(basename "$0") normal --prometheus-url http://prometheus:9090 --collect-infra-metrics + + # Run all load tests + $(basename "$0") all +EOF +} + + +wait_for_services() { + local base_url="$1" + + # Wait for deployments to be available + for service in stac raster vector; do + kubectl wait --for=condition=Available deployment/"${RELEASE_NAME}-${service}" -n "$NAMESPACE" --timeout=60s >/dev/null 2>&1 || \ + log_warn "Service $service may not be ready" + done + + # Test basic connectivity + for endpoint in "$base_url/stac" "$base_url/raster/healthz" "$base_url/vector/healthz"; do + if ! curl -s -f -m 5 "$endpoint" >/dev/null 2>&1; then + log_warn "Endpoint not responding: $endpoint" + fi + done +} + +test_endpoint() { + local url="$1" + local duration="${2:-30}" + local concurrency="${3:-2}" + + if ! command_exists hey; then + log_error "hey not found. Install with: go install github.com/rakyll/hey@latest" + return 1 + fi + + log_info "Testing $url (${duration}s, ${concurrency}c)" + hey -z "${duration}s" -c "$concurrency" "$url" 2>/dev/null | grep -E "(Total:|Requests/sec:|Average:|Status code)" +} + +monitor_during_test() { + local duration="$1" + log_info "Monitor with: watch kubectl get pods -n $NAMESPACE" + sleep "$duration" & + local sleep_pid=$! + + # Show initial state + kubectl get hpa -n "$NAMESPACE" 2>/dev/null | head -2 || true + + wait $sleep_pid +} + +# Common setup for load tests +setup_load_test_environment() { + local namespace="$1" + local skip_python="${2:-false}" + + validate_cluster || return 1 + validate_namespace "$namespace" || return 1 + + if [[ "$skip_python" != "true" ]]; then + validate_python_with_requirements "tests/requirements.txt" "${SCRIPT_DIR}/.." || return 1 + fi + + local base_url + if ! base_url=$(get_base_url "$namespace"); then + log_error "Cannot reach eoAPI endpoints" + return 1 + fi + + log_info "Using base URL: $base_url" + wait_for_services "$base_url" + + echo "$base_url" # Return base_url for caller +} + +load_baseline() { + log_info "Running baseline load test..." + + local base_url + base_url=$(setup_load_test_environment "$NAMESPACE" "true") || return 1 + + log_info "Running light load tests..." + log_info "Monitor pods: kubectl get pods -n $NAMESPACE -w" + + # STAC collections (30s, 2 concurrent) + test_endpoint "$base_url/stac/collections" & + monitor_during_test 30 + wait + + # STAC search (60s, 3 concurrent) + if command_exists curl && command_exists hey; then + log_info "Testing STAC search (60s, 3c)" + hey -z 60s -c 3 -m POST -H "Content-Type: application/json" -d '{"limit":10}' "$base_url/stac/search" 2>/dev/null | \ + grep -E "(Total:|Requests/sec:|Average:|Status code)" + fi + + # Health checks + test_endpoint "$base_url/raster/healthz" + test_endpoint "$base_url/vector/healthz" + + log_success "Baseline load test completed" +} + +load_services() { + log_info "Running service-specific load tests..." + log_warn "Service-specific load tests not yet implemented" + log_info "Use 'load_baseline', 'load_normal', or 'load_stress' instead" + return 0 +} + +load_autoscaling() { + log_info "Running autoscaling tests..." + + validate_autoscaling_environment "$NAMESPACE" || return 1 + + validate_python_with_requirements "tests/requirements.txt" "${SCRIPT_DIR}/.." || return 1 + + # Wait for deployments + for service in stac raster vector; do + kubectl wait --for=condition=Available deployment/"${RELEASE_NAME}-${service}" -n "$NAMESPACE" --timeout=90s || return 1 + done + + # Get ingress host + local ingress_host + ingress_host=$(kubectl get ingress -n "$NAMESPACE" -o jsonpath='{.items[0].spec.rules[0].host}' 2>/dev/null || echo "localhost") + + # Set environment for Python tests + export STAC_ENDPOINT="http://$ingress_host/stac" + export RASTER_ENDPOINT="http://$ingress_host/raster" + export VECTOR_ENDPOINT="http://$ingress_host/vector" + + log_info "Running Python autoscaling tests..." + cd "${SCRIPT_DIR}/.." + + local cmd="python3 -m pytest tests/autoscaling" + [[ "$DEBUG_MODE" == "true" ]] && cmd="$cmd -v --tb=short" + + if eval "$cmd"; then + log_success "Autoscaling tests passed" + else + log_error "Autoscaling tests failed" + return 1 + fi +} + +load_normal() { + log_info "Running normal load test scenario..." + + local base_url + base_url=$(setup_load_test_environment "$NAMESPACE") || return 1 + + log_info "Running Python normal load test..." + cd "${SCRIPT_DIR}/.." + + local cmd="python3 -m tests.load.load_tester normal --base-url $base_url --namespace $NAMESPACE" + [[ "$DEBUG_MODE" == "true" ]] && cmd="$cmd --duration 30 --users 5" + [[ -n "${REPORT_JSON:-}" ]] && cmd="$cmd --report-json $REPORT_JSON" + [[ -n "${PROMETHEUS_URL:-}" ]] && cmd="$cmd --prometheus-url $PROMETHEUS_URL" + [[ "${COLLECT_INFRA_METRICS:-false}" == "true" ]] && cmd="$cmd --collect-infra-metrics" + + log_debug "Running: $cmd" + + if eval "$cmd"; then + log_success "Normal load test completed" + else + log_error "Normal load test failed" + return 1 + fi +} + +load_stress() { + log_info "Running stress test to find breaking points..." + + local base_url + base_url=$(setup_load_test_environment "$NAMESPACE") || return 1 + + log_info "Running Python stress test module..." + cd "${SCRIPT_DIR}/.." + + local cmd="python3 -m tests.load.load_tester stress --base-url $base_url --namespace $NAMESPACE" + [[ "$DEBUG_MODE" == "true" ]] && cmd="$cmd --test-duration 5 --max-workers 20" + [[ -n "${REPORT_JSON:-}" ]] && cmd="$cmd --report-json $REPORT_JSON" + [[ -n "${PROMETHEUS_URL:-}" ]] && cmd="$cmd --prometheus-url $PROMETHEUS_URL" + [[ "${COLLECT_INFRA_METRICS:-false}" == "true" ]] && cmd="$cmd --collect-infra-metrics" + + log_debug "Running: $cmd" + + if eval "$cmd"; then + log_success "Stress test completed" + else + log_error "Stress test failed" + return 1 + fi +} + +load_chaos() { + log_info "Running chaos testing with pod failures..." + + if ! command_exists kubectl; then + log_error "kubectl required for chaos testing" + return 1 + fi + + local base_url + base_url=$(setup_load_test_environment "$NAMESPACE") || return 1 + + log_info "Running Python chaos test..." + cd "${SCRIPT_DIR}/.." + + local cmd="python3 -m tests.load.load_tester chaos --base-url $base_url --namespace $NAMESPACE" + [[ "$DEBUG_MODE" == "true" ]] && cmd="$cmd --duration 60 --kill-interval 30" + [[ -n "${REPORT_JSON:-}" ]] && cmd="$cmd --report-json $REPORT_JSON" + [[ -n "${PROMETHEUS_URL:-}" ]] && cmd="$cmd --prometheus-url $PROMETHEUS_URL" + + log_debug "Running: $cmd" + + if eval "$cmd"; then + log_success "Chaos test completed" + else + log_error "Chaos test failed" + return 1 + fi +} + +load_all() { + local failed=0 + + log_info "Running all load tests..." + + load_baseline || ((failed++)) + load_services || ((failed++)) + load_autoscaling || ((failed++)) + load_normal || ((failed++)) + load_stress || ((failed++)) + load_chaos || ((failed++)) + + if [[ $failed -eq 0 ]]; then + log_success "All load tests passed" + return 0 + else + log_error "$failed load test suites failed" + return 1 + fi +} + +main() { + local command="" + + # Parse options + while [[ $# -gt 0 ]]; do + case $1 in + -h|--help) + show_help + exit 0 + ;; + -d|--debug) + export DEBUG_MODE=true + shift + ;; + -n|--namespace) + NAMESPACE="$2" + shift 2 + ;; + --release) + RELEASE_NAME="$2" + shift 2 + ;; + --report-json) + export REPORT_JSON="$2" + shift 2 + ;; + --prometheus-url) + export PROMETHEUS_URL="$2" + shift 2 + ;; + --collect-infra-metrics) + export COLLECT_INFRA_METRICS=true + shift + ;; + baseline|services|autoscaling|normal|stress|chaos|all) + command="$1" + shift + break + ;; + *) + log_error "Unknown option: $1" + show_help + exit 1 + ;; + esac + done + + [[ -z "$command" ]] && command="all" + + case "$command" in + baseline) + load_baseline + ;; + services) + load_services + ;; + autoscaling) + load_autoscaling + ;; + normal) + load_normal + ;; + stress) + load_stress + ;; + chaos) + load_chaos + ;; + all) + load_all + ;; + *) + log_error "Unknown command: $command" + exit 1 + ;; + esac +} + +if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then + main "$@" +fi diff --git a/scripts/test.sh b/scripts/test.sh index 97e8dd9c..32249124 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -27,7 +27,6 @@ COMMANDS: unit Run Helm unit tests integration Run integration tests with pytest notification Run notification tests with database access - autoscaling Run autoscaling tests with pytest all Run all tests OPTIONS: @@ -50,9 +49,6 @@ EXAMPLES: # Run integration tests with debug $(basename "$0") integration --debug - # Run autoscaling tests with debug - $(basename "$0") autoscaling --debug - # Run all tests $(basename "$0") all EOF @@ -124,13 +120,7 @@ test_integration() { "${SCRIPT_DIR}/test/integration.sh" "$pytest_args" } -test_autoscaling() { - local pytest_args="${1:-}" - export NAMESPACE="$NAMESPACE" - export RELEASE_NAME="$RELEASE_NAME" - export DEBUG_MODE="$DEBUG_MODE" - "${SCRIPT_DIR}/test/autoscaling.sh" "$pytest_args" -} + test_notification() { local pytest_args="${1:-}" @@ -151,7 +141,6 @@ test_all() { if validate_cluster 2>/dev/null; then test_integration || ((failed++)) - test_autoscaling || ((failed++)) test_notification || ((failed++)) else log_warn "Skipping integration tests - no cluster connection" @@ -193,7 +182,7 @@ main() { pytest_args="$2" shift 2 ;; - schema|lint|unit|notification|integration|autoscaling|all) + schema|lint|unit|notification|integration|all) command="$1" shift break @@ -224,9 +213,6 @@ main() { notification) test_notification "$pytest_args" ;; - autoscaling) - test_autoscaling "$pytest_args" - ;; all) test_all ;; diff --git a/scripts/test/autoscaling.sh b/scripts/test/autoscaling.sh deleted file mode 100755 index 52d5d4fb..00000000 --- a/scripts/test/autoscaling.sh +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env bash - -# eoAPI Autoscaling Tests Script - -set -euo pipefail - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -PROJECT_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" - -source "${SCRIPT_DIR}/../lib/common.sh" - -NAMESPACE="${NAMESPACE:-eoapi}" -RELEASE_NAME="${RELEASE_NAME:-eoapi}" - -run_autoscaling_tests() { - local pytest_args="${1:-}" - - log_info "Running autoscaling tests..." - - check_requirements python3 kubectl || return 1 - validate_cluster || return 1 - - log_info "Installing Python test dependencies..." - python3 -m pip install --user -r "${PROJECT_ROOT}/tests/requirements.txt" >/dev/null 2>&1 || { - log_warn "Could not install test dependencies automatically" - log_info "Try manually: pip install -r tests/requirements.txt" - } - - if ! kubectl get deployment -n "$NAMESPACE" -l "app.kubernetes.io/instance=$RELEASE_NAME" &>/dev/null; then - log_error "eoAPI deployment not found (release: $RELEASE_NAME, namespace: $NAMESPACE)" - log_info "Deploy first with: eoapi deployment run" - return 1 - fi - - if ! kubectl get hpa -n "$NAMESPACE" &>/dev/null || [[ $(kubectl get hpa -n "$NAMESPACE" --no-headers 2>/dev/null | wc -l) -eq 0 ]]; then - log_error "No HPA resources found in namespace $NAMESPACE" - log_info "Autoscaling tests require HPA resources. Deploy with autoscaling enabled." - return 1 - fi - - if ! kubectl get deployment metrics-server -n kube-system &>/dev/null; then - log_warn "metrics-server not found in kube-system, checking other namespaces..." - if ! kubectl get deployment -A | grep -q metrics-server; then - log_error "metrics-server is not deployed - required for autoscaling tests" - return 1 - fi - fi - - cd "$PROJECT_ROOT" - - export RELEASE_NAME="$RELEASE_NAME" - export NAMESPACE="$NAMESPACE" - - log_info "Setting up test environment for autoscaling tests..." - - local ingress_host - ingress_host=$(kubectl get ingress -n "$NAMESPACE" -o jsonpath='{.items[0].spec.rules[0].host}' 2>/dev/null || echo "localhost") - log_info "Using ingress host: $ingress_host" - - log_info "Verifying services are ready for load testing..." - local service_ready=false - local retries=15 # More retries for autoscaling tests - while [ $retries -gt 0 ]; do - if curl -s -f http://"$ingress_host"/stac >/dev/null 2>&1 && \ - curl -s -f http://"$ingress_host"/raster/healthz >/dev/null 2>&1 && \ - curl -s -f http://"$ingress_host"/vector/healthz >/dev/null 2>&1; then - service_ready=true - log_info "All services are responding correctly" - break - fi - retries=$((retries - 1)) - if [ $retries -gt 0 ]; then - log_debug "Waiting for services to be ready... (retries left: $retries)" - sleep 3 - fi - done - - if [ "$service_ready" = false ]; then - log_error "Services are not ready for autoscaling tests" - return 1 - fi - - log_info "Ensuring all pods are ready for load testing..." - for service in stac raster vector; do - local deployment="${RELEASE_NAME}-${service}" - if ! kubectl wait --for=condition=available deployment/"${deployment}" -n "$NAMESPACE" --timeout=90s 2>/dev/null; then - log_error "Deployment ${deployment} is not ready for autoscaling tests" - return 1 - fi - done - - log_info "Allowing services to stabilize before load testing..." - sleep 10 - - export STAC_ENDPOINT="${STAC_ENDPOINT:-http://$ingress_host/stac}" - export RASTER_ENDPOINT="${RASTER_ENDPOINT:-http://$ingress_host/raster}" - export VECTOR_ENDPOINT="${VECTOR_ENDPOINT:-http://$ingress_host/vector}" - - log_info "Test endpoints configured:" - log_info " STAC: $STAC_ENDPOINT" - log_info " Raster: $RASTER_ENDPOINT" - log_info " Vector: $VECTOR_ENDPOINT" - - log_info "Checking HPA metrics availability..." - local hpa_ready=false - local hpa_retries=5 - while [ $hpa_retries -gt 0 ]; do - if kubectl get hpa -n "$NAMESPACE" -o json | grep -q "currentCPUUtilizationPercentage\|currentMetrics"; then - hpa_ready=true - log_info "HPA metrics are available" - break - fi - hpa_retries=$((hpa_retries - 1)) - if [ $hpa_retries -gt 0 ]; then - log_debug "Waiting for HPA metrics... (retries left: $hpa_retries)" - sleep 5 - fi - done - - if [ "$hpa_ready" = false ]; then - log_warn "HPA metrics may not be fully available - tests might be flaky" - fi - - log_info "Running extended warmup for load testing..." - for round in {1..3}; do - log_debug "Warmup round $round/3" - for endpoint in "$STAC_ENDPOINT/collections" "$RASTER_ENDPOINT/healthz" "$VECTOR_ENDPOINT/healthz"; do - for _ in {1..5}; do - curl -s -f "$endpoint" >/dev/null 2>&1 || true - sleep 0.2 - done - done - sleep 2 - done - - log_info "Current HPA status before autoscaling tests:" - kubectl get hpa -n "$NAMESPACE" || true - - local cmd="python3 -m pytest tests/autoscaling" - [[ "$DEBUG_MODE" == "true" ]] && cmd="$cmd -v --tb=short" - [[ -n "$pytest_args" ]] && cmd="$cmd $pytest_args" - - log_debug "Running: $cmd" - - if eval "$cmd"; then - log_success "Autoscaling tests passed" - - # Log final HPA status after tests - log_info "Final HPA status after autoscaling tests:" - kubectl get hpa -n "$NAMESPACE" || true - - return 0 - else - log_error "Autoscaling tests failed" - - log_info "HPA status after failed autoscaling tests:" - kubectl get hpa -n "$NAMESPACE" || true - - return 1 - fi -} - -run_autoscaling_tests "$@" diff --git a/scripts/test/integration.sh b/scripts/test/integration.sh index f1a2ecb6..d8d41e38 100755 --- a/scripts/test/integration.sh +++ b/scripts/test/integration.sh @@ -17,14 +17,9 @@ run_integration_tests() { log_info "Running integration tests..." - check_requirements python3 kubectl || return 1 + check_requirements kubectl || return 1 validate_cluster || return 1 - - log_info "Installing Python test dependencies..." - python3 -m pip install --user -r "${PROJECT_ROOT}/tests/requirements.txt" >/dev/null 2>&1 || { - log_warn "Could not install test dependencies automatically" - log_info "Try manually: pip install -r tests/requirements.txt" - } + validate_python_with_requirements "tests/requirements.txt" "$PROJECT_ROOT" || return 1 if ! kubectl get deployment -n "$NAMESPACE" -l "app.kubernetes.io/instance=$RELEASE_NAME" &>/dev/null; then log_error "eoAPI deployment not found (release: $RELEASE_NAME, namespace: $NAMESPACE)" diff --git a/tests/autoscaling/test_autoscaling.py b/tests/autoscaling/test_autoscaling.py index 57870bda..66723c1a 100644 --- a/tests/autoscaling/test_autoscaling.py +++ b/tests/autoscaling/test_autoscaling.py @@ -30,7 +30,7 @@ def generate_load( success_count = 0 error_count = 0 scaling_errors = 0 - error_details = {} # Track specific error types + error_details: dict[str, int] = {} # Track specific error types def worker() -> None: nonlocal success_count, error_count, scaling_errors, error_details @@ -45,20 +45,14 @@ def worker() -> None: # These are expected during scaling scaling_errors += 1 error_key = f"HTTP_{response.status_code}" - error_details[error_key] = ( - error_details.get(error_key, 0) + 1 - ) + error_details[error_key] = error_details.get(error_key, 0) + 1 else: error_count += 1 error_key = f"HTTP_{response.status_code}" - error_details[error_key] = ( - error_details.get(error_key, 0) + 1 - ) + error_details[error_key] = error_details.get(error_key, 0) + 1 except requests.Timeout: scaling_errors += 1 - error_details["Timeout"] = ( - error_details.get("Timeout", 0) + 1 - ) + error_details["Timeout"] = error_details.get("Timeout", 0) + 1 except requests.ConnectionError: scaling_errors += 1 error_details["ConnectionError"] = ( @@ -67,9 +61,7 @@ def worker() -> None: except requests.RequestException as e: scaling_errors += 1 error_key = type(e).__name__ - error_details[error_key] = ( - error_details.get(error_key, 0) + 1 - ) + error_details[error_key] = error_details.get(error_key, 0) + 1 time.sleep(delay) # Start concurrent workers @@ -112,21 +104,17 @@ def test_hpa_resources_properly_configured(self) -> None: spec = hpa["spec"] hpa_name = hpa["metadata"]["name"] - assert "scaleTargetRef" in spec, ( - f"HPA {hpa_name} missing scaleTargetRef" - ) + assert "scaleTargetRef" in spec, f"HPA {hpa_name} missing scaleTargetRef" assert "minReplicas" in spec, f"HPA {hpa_name} missing minReplicas" assert "maxReplicas" in spec, f"HPA {hpa_name} missing maxReplicas" - assert "metrics" in spec, ( - f"HPA {hpa_name} missing metrics configuration" - ) + assert "metrics" in spec, f"HPA {hpa_name} missing metrics configuration" min_replicas = spec["minReplicas"] max_replicas = spec["maxReplicas"] assert min_replicas > 0, f"HPA {hpa_name} minReplicas must be > 0" - assert max_replicas > min_replicas, ( - f"HPA {hpa_name} maxReplicas must be > minReplicas" - ) + assert ( + max_replicas > min_replicas + ), f"HPA {hpa_name} maxReplicas must be > minReplicas" metrics = spec["metrics"] assert len(metrics) > 0, f"HPA {hpa_name} has no metrics configured" @@ -137,9 +125,9 @@ def test_hpa_resources_properly_configured(self) -> None: if m.get("type") == "Resource" and m.get("resource", {}).get("name") == "cpu" ] - assert len(cpu_metrics) > 0, ( - f"HPA {hpa_name} must have CPU metric configured" - ) + assert ( + len(cpu_metrics) > 0 + ), f"HPA {hpa_name} must have CPU metric configured" print( f"✅ HPA {hpa_name}: {min_replicas}-{max_replicas} replicas, {len(metrics)} metrics" @@ -175,16 +163,16 @@ def test_target_deployments_exist(self) -> None: None, ) - assert target_deployment is not None, ( - f"HPA {hpa_name} target deployment {target_name} not found" - ) + assert ( + target_deployment is not None + ), f"HPA {hpa_name} target deployment {target_name} not found" # Check deployment has ready replicas status = target_deployment.get("status", {}) ready_replicas = status.get("readyReplicas", 0) - assert ready_replicas > 0, ( - f"Target deployment {target_name} has no ready replicas" - ) + assert ( + ready_replicas > 0 + ), f"Target deployment {target_name} has no ready replicas" print( f"✅ HPA {hpa_name} target deployment {target_name} is ready ({ready_replicas} replicas)" @@ -211,9 +199,7 @@ def test_cpu_metrics_collection(self) -> None: except Exception as e: print(f"⚠️ Cannot get metrics for {service}: {e}") - assert len(metrics_available) > 0, ( - "No CPU metrics available for any service" - ) + assert len(metrics_available) > 0, "No CPU metrics available for any service" def test_hpa_cpu_utilization_calculation(self) -> None: """Verify HPA calculates CPU utilization correctly.""" @@ -243,12 +229,10 @@ def test_hpa_cpu_utilization_calculation(self) -> None: "averageUtilization" ) if cpu_utilization is not None: - assert 0 <= cpu_utilization <= 1000, ( - f"Invalid CPU utilization: {cpu_utilization}%" - ) - print( - f"✅ HPA {hpa_name} CPU utilization: {cpu_utilization}%" - ) + assert ( + 0 <= cpu_utilization <= 1000 + ), f"Invalid CPU utilization: {cpu_utilization}%" + print(f"✅ HPA {hpa_name} CPU utilization: {cpu_utilization}%") else: print( f"⚠️ HPA {hpa_name} CPU metric exists but no utilization value" @@ -295,9 +279,7 @@ def test_cpu_resource_requests_alignment(self) -> None: pod = running_pods[0] # Check first running pod containers = pod["spec"]["containers"] - main_container = next( - (c for c in containers if c["name"] == service), None - ) + main_container = next((c for c in containers if c["name"] == service), None) if not main_container: continue @@ -316,12 +298,10 @@ def test_cpu_resource_requests_alignment(self) -> None: # Parse CPU request to verify it's reasonable if cpu_request.endswith("m"): cpu_millicores = int(cpu_request[:-1]) - assert cpu_millicores > 0, ( - f"Service {service} has zero CPU request" - ) - assert cpu_millicores <= 2000, ( - f"Service {service} has very high CPU request: {cpu_millicores}m" - ) + assert cpu_millicores > 0, f"Service {service} has zero CPU request" + assert ( + cpu_millicores <= 2000 + ), f"Service {service} has very high CPU request: {cpu_millicores}m" class TestScalingBehavior: @@ -337,8 +317,8 @@ def test_load_response_scaling(self) -> None: # Use simple GET endpoints that are guaranteed to work load_endpoints = [ "/stac/collections", - "/stac", # Root endpoint - "/raster/", # Raster root endpoint (no /collections endpoint) + "/stac", + "/raster/", "/vector/collections", ] @@ -423,9 +403,9 @@ def test_load_response_scaling(self) -> None: f"Load test had low success rate: {load_stats['success_rate']:.2%} " f"(scaling errors: {load_stats.get('scaling_errors', 0)})" ) - assert load_stats["total_requests"] > 100, ( - "Load test generated insufficient requests" - ) + assert ( + load_stats["total_requests"] > 100 + ), "Load test generated insufficient requests" # Note: In CI environments with limited resources, actual scaling may not occur # The important thing is that the system handled the load successfully @@ -485,8 +465,6 @@ class TestRequestRateScaling: def test_custom_metrics_for_request_rate(self) -> None: """Check if custom metrics for request rate scaling are available.""" - namespace = get_namespace() - # Check if custom metrics API has request rate metrics result = subprocess.run( ["kubectl", "get", "--raw", "/apis/custom.metrics.k8s.io/v1beta1"], @@ -548,9 +526,7 @@ def test_hpa_request_rate_metrics(self) -> None: # Check if it's configured but not yet available spec_metrics = hpa["spec"]["metrics"] configured_custom = [ - m - for m in spec_metrics - if m.get("type") in ["Pods", "Object"] + m for m in spec_metrics if m.get("type") in ["Pods", "Object"] ] if configured_custom: diff --git a/tests/integration/test_observability.py b/tests/integration/test_observability.py index 56229109..54b6f725 100644 --- a/tests/integration/test_observability.py +++ b/tests/integration/test_observability.py @@ -32,9 +32,9 @@ def test_prometheus_server_deployment(self) -> None: ready_replicas = deployment["status"].get("readyReplicas", 0) desired_replicas = deployment["spec"]["replicas"] - assert ready_replicas == desired_replicas, ( - f"Prometheus not ready: {ready_replicas}/{desired_replicas} replicas" - ) + assert ( + ready_replicas == desired_replicas + ), f"Prometheus not ready: {ready_replicas}/{desired_replicas} replicas" def test_grafana_deployment(self) -> None: namespace = get_namespace() @@ -52,9 +52,9 @@ def test_grafana_deployment(self) -> None: deployment = deployments["items"][0] ready_replicas = deployment["status"].get("readyReplicas", 0) desired_replicas = deployment["spec"]["replicas"] - assert ready_replicas == desired_replicas, ( - f"Grafana not ready: {ready_replicas}/{desired_replicas} replicas" - ) + assert ( + ready_replicas == desired_replicas + ), f"Grafana not ready: {ready_replicas}/{desired_replicas} replicas" def test_prometheus_adapter_deployment(self) -> None: namespace = get_namespace() @@ -64,9 +64,7 @@ def test_prometheus_adapter_deployment(self) -> None: label_selector="app.kubernetes.io/name=prometheus-adapter", ) - assert result.returncode == 0, ( - "Failed to get Prometheus Adapter deployment" - ) + assert result.returncode == 0, "Failed to get Prometheus Adapter deployment" deployments = json.loads(result.stdout) assert deployments["items"], "No Prometheus Adapter deployment found" @@ -74,9 +72,9 @@ def test_prometheus_adapter_deployment(self) -> None: deployment = deployments["items"][0] ready_replicas = deployment["status"].get("readyReplicas", 0) desired_replicas = deployment["spec"]["replicas"] - assert ready_replicas == desired_replicas, ( - f"Prometheus Adapter not ready: {ready_replicas}/{desired_replicas} replicas" - ) + assert ( + ready_replicas == desired_replicas + ), f"Prometheus Adapter not ready: {ready_replicas}/{desired_replicas} replicas" def test_kube_state_metrics_deployment(self) -> None: """Test kube-state-metrics deployment is running.""" @@ -87,9 +85,7 @@ def test_kube_state_metrics_deployment(self) -> None: label_selector="app.kubernetes.io/name=kube-state-metrics", ) - assert result.returncode == 0, ( - "Failed to get kube-state-metrics deployment" - ) + assert result.returncode == 0, "Failed to get kube-state-metrics deployment" deployments = json.loads(result.stdout) assert deployments["items"], "No kube-state-metrics deployment found" @@ -97,9 +93,9 @@ def test_kube_state_metrics_deployment(self) -> None: deployment = deployments["items"][0] ready_replicas = deployment["status"].get("readyReplicas", 0) desired_replicas = deployment["spec"]["replicas"] - assert ready_replicas == desired_replicas, ( - f"kube-state-metrics not ready: {ready_replicas}/{desired_replicas} replicas" - ) + assert ( + ready_replicas == desired_replicas + ), f"kube-state-metrics not ready: {ready_replicas}/{desired_replicas} replicas" def test_node_exporter_deployment(self) -> None: """Test node-exporter DaemonSet is running.""" @@ -119,9 +115,9 @@ def test_node_exporter_deployment(self) -> None: ready = daemonset["status"].get("numberReady", 0) desired = daemonset["status"].get("desiredNumberScheduled", 0) assert ready > 0, "No node-exporter pods are ready" - assert ready == desired, ( - f"node-exporter not fully deployed: {ready}/{desired} nodes" - ) + assert ( + ready == desired + ), f"node-exporter not fully deployed: {ready}/{desired} nodes" class TestMetricsCollection: @@ -138,9 +134,9 @@ def test_custom_metrics_api_available(self) -> None: ) api_response = json.loads(result.stdout) - assert api_response["kind"] == "APIResourceList", ( - "Invalid custom metrics API response" - ) + assert ( + api_response["kind"] == "APIResourceList" + ), "Invalid custom metrics API response" assert ( api_response["groupVersion"] == "custom.metrics.k8s.io/v1beta1" ), "Wrong API version" @@ -196,20 +192,14 @@ def test_prometheus_targets_reachable(self) -> None: # Wait for proxy to establish if not wait_for_url(f"{proxy_url}/api/v1/targets"): - pytest.skip( - "Could not establish connection to Prometheus via proxy" - ) + pytest.skip("Could not establish connection to Prometheus via proxy") # Check Prometheus targets response = requests.get(f"{proxy_url}/api/v1/targets") - assert response.status_code == 200, ( - "Failed to get Prometheus targets" - ) + assert response.status_code == 200, "Failed to get Prometheus targets" targets_data = response.json() - assert targets_data["status"] == "success", ( - "Failed to retrieve targets" - ) + assert targets_data["status"] == "success", "Failed to retrieve targets" active_targets = targets_data["data"]["activeTargets"] @@ -224,9 +214,7 @@ def test_prometheus_targets_reachable(self) -> None: "kubernetes-apiservers", } - found_jobs = { - target["labels"].get("job") for target in active_targets - } + found_jobs = {target["labels"].get("job") for target in active_targets} # At least some of the expected jobs should be present common_jobs = expected_jobs.intersection(found_jobs) @@ -242,9 +230,7 @@ def test_prometheus_targets_reachable(self) -> None: # Warning about unhealthy targets but don't fail the test if unhealthy_targets: - print( - f"Warning: {len(unhealthy_targets)} unhealthy targets found" - ) + print(f"Warning: {len(unhealthy_targets)} unhealthy targets found") finally: if process: @@ -279,21 +265,17 @@ def test_hpa_resources_exist(self) -> None: existing_hpas = [hpa for hpa in expected_hpas if hpa in found_hpas] if not existing_hpas: - pytest.skip( - "No eoAPI HPA resources found - autoscaling may be disabled" - ) + pytest.skip("No eoAPI HPA resources found - autoscaling may be disabled") # For each found HPA, check configuration for hpa_name in existing_hpas: hpa = next(h for h in hpas if h["metadata"]["name"] == hpa_name) spec = hpa["spec"] - assert spec["minReplicas"] >= 1, ( - f"HPA {hpa_name} min replicas too low" - ) - assert spec["maxReplicas"] > spec["minReplicas"], ( - f"HPA {hpa_name} max replicas not greater than min" - ) + assert spec["minReplicas"] >= 1, f"HPA {hpa_name} min replicas too low" + assert ( + spec["maxReplicas"] > spec["minReplicas"] + ), f"HPA {hpa_name} max replicas not greater than min" def test_hpa_metrics_available(self) -> None: """Test that HPA can access metrics for scaling decisions.""" @@ -325,16 +307,14 @@ def test_hpa_metrics_available(self) -> None: ) if scaling_active: - assert scaling_active["status"] == "True", ( - f"HPA {name} scaling is not active: {scaling_active.get('message', 'Unknown reason')}" - ) + assert ( + scaling_active["status"] == "True" + ), f"HPA {name} scaling is not active: {scaling_active.get('message', 'Unknown reason')}" # If we have been running for a while, we should have metrics # But on fresh deployments, metrics might not be available yet if current_metrics is not None: - assert len(current_metrics) > 0, ( - f"HPA {name} has no current metrics" - ) + assert len(current_metrics) > 0, f"HPA {name} has no current metrics" def test_service_resource_requests_configured(self) -> None: """Verify pods have resource requests for HPA to function.""" @@ -416,9 +396,7 @@ def test_grafana_service_accessibility(self) -> None: assert response.status_code == 200, "Grafana health check failed" health_data = response.json() - assert health_data.get("database") == "ok", ( - "Grafana database not healthy" - ) + assert health_data.get("database") == "ok", "Grafana database not healthy" finally: if process: @@ -448,11 +426,9 @@ def test_grafana_admin_secret_exists(self) -> None: admin_secret = secret break - assert admin_secret is not None, ( - "Grafana admin credentials secret not found" - ) + assert admin_secret is not None, "Grafana admin credentials secret not found" secret_data = admin_secret.get("data", {}) - assert "admin-password" in secret_data, ( - "admin-password not found in Grafana secret" - ) + assert ( + "admin-password" in secret_data + ), "admin-password not found in Grafana secret" diff --git a/tests/integration/test_raster.py b/tests/integration/test_raster.py index b5308cb5..5875e347 100644 --- a/tests/integration/test_raster.py +++ b/tests/integration/test_raster.py @@ -203,10 +203,7 @@ def test_mosaic_search(raster_endpoint: str) -> None: assert len(links) == 2 assert links[0]["rel"] == "self" assert links[1]["rel"] == "next" - assert ( - links[1]["href"] - == f"{raster_endpoint}/searches/list?limit=10&offset=10" - ) + assert links[1]["href"] == f"{raster_endpoint}/searches/list?limit=10&offset=10" resp = client.get( f"{raster_endpoint}/searches/list", params={"limit": 1, "offset": 1} @@ -219,49 +216,33 @@ def test_mosaic_search(raster_endpoint: str) -> None: links = resp.json()["links"] assert len(links) == 3 assert links[0]["rel"] == "self" - assert ( - links[0]["href"] == f"{raster_endpoint}/searches/list?limit=1&offset=1" - ) + assert links[0]["href"] == f"{raster_endpoint}/searches/list?limit=1&offset=1" assert links[1]["rel"] == "next" - assert ( - links[1]["href"] == f"{raster_endpoint}/searches/list?limit=1&offset=2" - ) + assert links[1]["href"] == f"{raster_endpoint}/searches/list?limit=1&offset=2" assert links[2]["rel"] == "prev" - assert ( - links[2]["href"] == f"{raster_endpoint}/searches/list?limit=1&offset=0" - ) + assert links[2]["href"] == f"{raster_endpoint}/searches/list?limit=1&offset=0" # Filter on mosaic metadata - resp = client.get( - f"{raster_endpoint}/searches/list", params={"owner": "vincent"} - ) + resp = client.get(f"{raster_endpoint}/searches/list", params={"owner": "vincent"}) assert resp.status_code == 200 assert resp.json()["context"]["matched"] == 7 assert resp.json()["context"]["limit"] == 10 assert resp.json()["context"]["returned"] == 7 # sortBy - resp = client.get( - f"{raster_endpoint}/searches/list", params={"sortby": "lastused"} - ) + resp = client.get(f"{raster_endpoint}/searches/list", params={"sortby": "lastused"}) assert resp.status_code == 200 - resp = client.get( - f"{raster_endpoint}/searches/list", params={"sortby": "usecount"} - ) + resp = client.get(f"{raster_endpoint}/searches/list", params={"sortby": "usecount"}) assert resp.status_code == 200 - resp = client.get( - f"{raster_endpoint}/searches/list", params={"sortby": "-owner"} - ) + resp = client.get(f"{raster_endpoint}/searches/list", params={"sortby": "-owner"}) assert resp.status_code == 200 assert ( "owner" not in resp.json()["searches"][0]["search"]["metadata"] ) # some mosaic don't have owners - resp = client.get( - f"{raster_endpoint}/searches/list", params={"sortby": "owner"} - ) + resp = client.get(f"{raster_endpoint}/searches/list", params={"sortby": "owner"}) assert resp.status_code == 200 assert "owner" in resp.json()["searches"][0]["search"]["metadata"] diff --git a/tests/integration/test_stac.py b/tests/integration/test_stac.py index 15eb07b8..2bbfca5d 100644 --- a/tests/integration/test_stac.py +++ b/tests/integration/test_stac.py @@ -46,9 +46,7 @@ def test_stac_api(stac_endpoint: str) -> None: assert link["href"].startswith(stac_endpoint.split("://")[1]) # items - resp = client.get( - f"{stac_endpoint}/collections/noaa-emergency-response/items" - ) + resp = client.get(f"{stac_endpoint}/collections/noaa-emergency-response/items") assert resp.status_code == 200 items = resp.json() # Verify item links have correct base path @@ -90,9 +88,9 @@ def test_stac_custom_path(stac_endpoint: str) -> None: # All links should use the custom path for link in landing["links"]: if link["href"].startswith("/"): - assert link["href"].startswith(base_path), ( - f"Link {link['href']} doesn't start with {base_path}" - ) + assert link["href"].startswith( + base_path + ), f"Link {link['href']} doesn't start with {base_path}" # Collections should also use the custom path resp = client.get(f"{stac_endpoint}/collections") @@ -102,14 +100,12 @@ def test_stac_custom_path(stac_endpoint: str) -> None: for collection in collections: for link in collection["links"]: if link["href"].startswith("/"): - assert link["href"].startswith(base_path), ( - f"Collection link {link['href']} doesn't start with {base_path}" - ) + assert link["href"].startswith( + base_path + ), f"Collection link {link['href']} doesn't start with {base_path}" # Test a specific item - resp = client.get( - f"{stac_endpoint}/collections/noaa-emergency-response/items" - ) + resp = client.get(f"{stac_endpoint}/collections/noaa-emergency-response/items") assert resp.status_code == 200 items = resp.json() @@ -117,9 +113,9 @@ def test_stac_custom_path(stac_endpoint: str) -> None: for feature in items["features"]: for link in feature["links"]: if link["href"].startswith("/"): - assert link["href"].startswith(base_path), ( - f"Item link {link['href']} doesn't start with {base_path}" - ) + assert link["href"].startswith( + base_path + ), f"Item link {link['href']} doesn't start with {base_path}" # viewer resp = client.get( @@ -156,14 +152,14 @@ def test_stac_queryables(stac_endpoint: str) -> None: # Verify all expected properties are present for prop_name, prop_schema in expected_queryables["properties"].items(): - assert prop_name in actual_queryables["properties"], ( - f"Expected property '{prop_name}' not found in queryables" - ) - assert actual_queryables["properties"][prop_name] == prop_schema, ( - f"Property '{prop_name}' schema doesn't match expected schema" - ) + assert ( + prop_name in actual_queryables["properties"] + ), f"Expected property '{prop_name}' not found in queryables" + assert ( + actual_queryables["properties"][prop_name] == prop_schema + ), f"Property '{prop_name}' schema doesn't match expected schema" # Verify additionalProperties setting - assert actual_queryables.get( + assert actual_queryables.get("additionalProperties") == expected_queryables.get( "additionalProperties" - ) == expected_queryables.get("additionalProperties") + ) diff --git a/tests/integration/test_stac_auth.py b/tests/integration/test_stac_auth.py index 00d252e5..bbdd0913 100644 --- a/tests/integration/test_stac_auth.py +++ b/tests/integration/test_stac_auth.py @@ -39,9 +39,10 @@ def test_stac_auth_without_token(stac_endpoint: str) -> None: if resp.status_code in [200, 201]: # Auth proxy should reject requests without tokens - assert resp.status_code in [401, 403], ( - f"Expected auth error, got {resp.status_code}: {resp.text[:100]}" - ) + assert resp.status_code in [ + 401, + 403, + ], f"Expected auth error, got {resp.status_code}: {resp.text[:100]}" def test_stac_auth_with_invalid_token(stac_endpoint: str) -> None: @@ -65,14 +66,12 @@ def test_stac_auth_with_invalid_token(stac_endpoint: str) -> None: }, ) - assert resp.status_code in [401, 403], ( - f"Expected auth error with invalid token, got {resp.status_code}: {resp.text[:100]}" - ) + assert ( + resp.status_code in [401, 403] + ), f"Expected auth error with invalid token, got {resp.status_code}: {resp.text[:100]}" -def test_stac_auth_with_valid_token( - stac_endpoint: str, valid_token: str -) -> None: +def test_stac_auth_with_valid_token(stac_endpoint: str, valid_token: str) -> None: """Test write operation with valid token - tests actual auth proxy behavior.""" resp = client.post( f"{stac_endpoint}/collections/noaa-emergency-response/items", @@ -94,9 +93,10 @@ def test_stac_auth_with_valid_token( ) # With valid token from mock OIDC server, request should succeed - assert resp.status_code in [200, 201], ( - f"Expected success with valid token, got {resp.status_code}: {resp.text[:100]}" - ) + assert resp.status_code in [ + 200, + 201, + ], f"Expected success with valid token, got {resp.status_code}: {resp.text[:100]}" def test_stac_read_operations_work(stac_endpoint: str) -> None: diff --git a/tests/integration/test_vector.py b/tests/integration/test_vector.py index 7fe25eac..6d90cb6a 100644 --- a/tests/integration/test_vector.py +++ b/tests/integration/test_vector.py @@ -57,13 +57,10 @@ def test_vector_api(vector_endpoint: str) -> None: ) if "collections" in collections_data: available_collections = [ - c.get("id", "unknown") - for c in collections_data["collections"] + c.get("id", "unknown") for c in collections_data["collections"] ] print(f"Available collections: {available_collections}") - assert False, ( - f"Expected 7 collections but found {current_count} after {elapsed_time:.1f}s timeout" - ) + assert False, f"Expected 7 collections but found {current_count} after {elapsed_time:.1f}s timeout" time.sleep(10) resp = client.get(f"{vector_endpoint}/collections") @@ -76,9 +73,7 @@ def test_vector_api(vector_endpoint: str) -> None: f"Expected 7 matched collections, got {matched_count}. " f"Available: {[c.get('id', 'unknown') for c in collections_data.get('collections', [])]}" ) - assert returned_count == 7, ( - f"Expected 7 returned collections, got {returned_count}" - ) + assert returned_count == 7, f"Expected 7 returned collections, got {returned_count}" collections = resp.json()["collections"] ids = [c["id"] for c in collections] diff --git a/tests/load/README.md b/tests/load/README.md new file mode 100644 index 00000000..ff175d3f --- /dev/null +++ b/tests/load/README.md @@ -0,0 +1,442 @@ +# eoAPI Load Testing + +This directory contains load testing utilities and scripts for eoAPI services with comprehensive performance metrics. + +## Features + +- **Response Time Tracking**: Measure p50, p95, p99 latency percentiles +- **Throughput Metrics**: Track requests/second over time +- **Infrastructure Monitoring**: Optional Prometheus integration for pod/HPA metrics +- **Flexible Reporting**: Console output + JSON export for CI/CD +- **Multiple Test Scenarios**: Stress, normal, chaos, and autoscaling tests + +## Components + +### `load_tester.py` +Core module containing the `LoadTester` class and unified CLI for all test types. + +**Usage:** +```bash +# Run stress test with default settings +python3 -m tests.load.load_tester stress + +# Normal load test with metrics export +python3 -m tests.load.load_tester normal \ + --base-url http://my-eoapi.com \ + --duration 60 \ + --users 10 \ + --report-json results.json + +# Stress test with Prometheus integration +python3 -m tests.load.load_tester stress \ + --base-url http://my-eoapi.com \ + --max-workers 100 \ + --prometheus-url http://prometheus:9090 \ + --collect-infra-metrics \ + --report-json stress-results.json + +# Chaos test with pod killing +python3 -m tests.load.load_tester chaos \ + --base-url http://my-eoapi.com \ + --namespace eoapi \ + --duration 300 \ + --kill-interval 60 +``` + +**Common Parameters:** +- `--base-url`: Base URL for eoAPI services +- `--timeout`: Request timeout in seconds (default: 30) +- `--report-json FILE`: Export metrics to JSON file +- `--prometheus-url URL`: Prometheus URL for infrastructure metrics +- `--namespace NAME`: Kubernetes namespace (default: eoapi) +- `--collect-infra-metrics`: Collect Prometheus infrastructure metrics + +**Stress Test Parameters:** +- `--endpoint`: Specific endpoint to test (default: `/stac/collections`) +- `--max-workers`: Maximum concurrent workers (default: 50) +- `--success-threshold`: Minimum success rate % (default: 95.0) +- `--step-size`: Worker increment step (default: 5) +- `--test-duration`: Duration per concurrency level in seconds (default: 10) +- `--cooldown`: Time between test levels in seconds (default: 2) + +**Normal Test Parameters:** +- `--duration`: Test duration in seconds (default: 60) +- `--users`: Concurrent users (default: 10) + +**Chaos Test Parameters:** +- `--duration`: Test duration in seconds (default: 300) +- `--kill-interval`: Seconds between pod kills (default: 60) + +### Test Modules + +#### `test_load.py` +Baseline load tests and shared fixtures for basic functionality verification. + +**Test Classes:** +- `TestLoadBaseline`: Light load tests for basic service functionality +- `TestLoadScalability`: Response time and scalability tests +- `TestLoadIntegration`: Multi-service integration tests + +#### `test_stress.py` +Stress testing to find breaking points and verify resilience under high load. + +**Test Classes:** +- `TestStressBreakingPoints`: Find service breaking points +- `TestStressResilience`: Service recovery and sustained load tests +- `TestStressLimits`: Maximum capacity and error rate tests + +#### `test_normal.py` +Realistic production workload patterns and sustained usage simulation. + +**Test Classes:** +- `TestNormalMixedLoad`: Mixed endpoint realistic traffic patterns +- `TestNormalSustained`: Long-running moderate load tests +- `TestNormalUserPatterns`: User session and interaction simulation + +#### `test_chaos.py` +Chaos engineering tests for infrastructure failure resilience. + +**Test Classes:** +- `TestChaosResilience`: Pod failure and recovery tests +- `TestChaosNetwork`: Network instability and timeout handling +- `TestChaosResource`: Resource exhaustion and constraint tests +- `TestChaosRecovery`: Recovery timing and degradation patterns + +**Running Load Tests:** +```bash +# Run all load tests +pytest tests/load/ + +# Run specific test types +pytest tests/load/test_load.py +pytest tests/load/test_normal.py +pytest tests/load/test_stress.py +pytest tests/load/test_chaos.py + +# Run specific test classes +pytest tests/load/test_stress.py::TestStressBreakingPoints +pytest tests/load/test_normal.py::TestNormalMixedLoad + +### `prometheus_utils.py` +Optional Prometheus integration for collecting infrastructure metrics during load tests. + +**Features:** +- Query pod CPU/memory usage during tests +- Track HPA scaling events and replica counts +- Monitor request rates from ingress controller +- Collect database connection metrics +- Graceful degradation if Prometheus unavailable + +**Usage:** +```python +from tests.load.prometheus_utils import PrometheusClient, collect_test_metrics + +# Create client (automatically checks availability) +client = PrometheusClient("http://prometheus:9090") + +if client.available: + # Collect metrics for test period + metrics = collect_test_metrics( + prometheus_url="http://prometheus:9090", + namespace="eoapi", + start=test_start_time, + end=test_end_time + ) +``` + +## Performance Metrics + +All load tests now collect comprehensive metrics: + +### Response Time Metrics +- **p50 (Median)**: Typical response time +- **p95**: 95th percentile - catches most slow requests +- **p99**: 99th percentile - identifies outliers +- **Min/Max/Avg**: Response time range and average + +### Throughput Metrics +- **Requests/second**: Actual throughput during test +- **Success Rate**: Percentage of successful requests +- **Total Requests**: Count of all requests made + +### Infrastructure Metrics (Optional, via Prometheus) +- **Pod Resources**: CPU/memory usage during test +- **HPA Events**: Autoscaling replica changes +- **Request Rates**: Ingress controller metrics +- **Database**: Connection counts and query times + +### Example Output +``` +============================================================ +Stress Test - Breaking Point at 45 workers +============================================================ +Success Rate: 92.3% (1156/1253) +Latency (ms): p50=45 p95=123 p99=234 (min=12, max=456, avg=67) +Throughput: 41.8 req/s +Duration: 30.0s + +Infrastructure Metrics: + pod_cpu: Collected + pod_memory: Collected + hpa_scaling: Observed + ingress_rate: Collected +============================================================ +``` + +## Integration with Shell Scripts + +The load testing is integrated with the main `eoapi-cli` script: + +```bash +# Run all load tests +./eoapi-cli load all + +# Run stress tests with metrics export +./eoapi-cli load stress --debug --report-json stress.json + +# Run normal load with Prometheus +./eoapi-cli load normal \ + --prometheus-url http://prometheus:9090 \ + --collect-infra-metrics \ + --report-json normal.json + +# Run chaos test +./eoapi-cli load chaos --debug +``` + +The shell script automatically: +- Installs Python dependencies +- Sets up environment variables +- Configures endpoints based on cluster state +- Runs tests with appropriate parameters +- Exports metrics if requested + +## Configuration + +### Environment Variables +- `STAC_ENDPOINT`: STAC service URL +- `RASTER_ENDPOINT`: Raster service URL +- `VECTOR_ENDPOINT`: Vector service URL +- `DEBUG_MODE`: Enable debug output + +### Test Parameters +Tests can be configured via pytest markers: +- `@pytest.mark.slow`: Long-running stress tests +- `@pytest.mark.integration`: Multi-service tests + +### Performance Thresholds +Default success rate thresholds: +- Health endpoints: 98% +- API endpoints: 95% +- Stress tests: 90% + +Default latency expectations (normal load): +- p50: < 100ms +- p95: < 500ms +- p99: < 2000ms + +## Best Practices + +### Local Development +```bash +# Quick smoke test with metrics +python3 -m tests.load.load_tester stress \ + --max-workers 10 \ + --test-duration 5 \ + --report-json quick-test.json + +# Baseline verification +pytest tests/load/test_load.py::TestLoadBaseline -v + +# Check specific latency performance +pytest tests/load/test_normal.py::TestNormalSustained::test_consistent_response_times -v +``` + +### CI/CD Integration +```bash +# Fast load tests for CI with JSON export +pytest tests/load/ -m "not slow" --tb=short +./eoapi-cli load normal --report-json ci-metrics.json + +# Full load testing with metrics +./eoapi-cli test all --debug + +# Store metrics as CI artifacts +./eoapi-cli load stress --report-json artifacts/stress-metrics.json +``` + +### Production Validation +```bash +# Conservative stress test with Prometheus +python3 -m tests.load.load_tester stress \ + --base-url https://prod-eoapi.example.com \ + --max-workers 200 \ + --success-threshold 95.0 \ + --test-duration 30 \ + --cooldown 5 \ + --prometheus-url http://prometheus:9090 \ + --collect-infra-metrics \ + --report-json prod-stress-results.json + +# Review metrics +cat prod-stress-results.json | jq '.metrics[] | {workers: .workers, success_rate, latency_p95, throughput}' +``` + +## Monitoring + +### Real-time Monitoring +During load tests, monitor: +- **Test Metrics**: Watch console output for real-time p50/p95/p99 latencies +- **Pod Resources**: `kubectl top pods -n eoapi` +- **HPA Scaling**: `kubectl get hpa -n eoapi -w` +- **Pod Status**: `kubectl get pods -n eoapi -w` + +### Prometheus Integration +If Prometheus is available, enable infrastructure metrics: +```bash +# Set Prometheus URL +export PROMETHEUS_URL=http://prometheus:9090 + +# Run test with infrastructure metrics +./eoapi-cli load stress --collect-infra-metrics --report-json results.json + +# Review infrastructure metrics +cat results.json | jq '.metrics[].infrastructure' +``` + +### Metrics Analysis +```bash +# Extract latency trends +cat results.json | jq '.metrics[] | {workers, p95: .latency_p95, p99: .latency_p99}' + +# Compare success rates across worker counts +cat results.json | jq '.metrics[] | {workers, success_rate, throughput}' + +# Check if HPA scaled +cat results.json | jq '.metrics[].infrastructure.hpa_metrics' +``` + +## Troubleshooting + +### Common Issues + +**ImportError: No module named 'tests.load'** +- Ensure you're running from the project root directory +- Install dependencies: `pip install -r tests/requirements.txt` + +**Connection refused errors** +- Verify services are running: `kubectl get pods -n eoapi` +- Check endpoints are accessible: `curl http://localhost/stac` +- Ensure ingress is configured correctly + +**Low success rates** +- Check resource limits and requests in Helm values +- Verify HPA is configured for autoscaling +- Monitor pod logs for errors: `kubectl logs -f deployment/eoapi-stac -n eoapi` + +### Debug Mode +Enable debug output for detailed information: +```bash +DEBUG_MODE=true python3 -m tests.load.stress_test +./scripts/load.sh stress --debug +``` + +## Extending + +### Adding New Test Endpoints +1. Add endpoints to appropriate test modules (`test_load.py`, `test_stress.py`, etc.) +2. Update `load_tester.py` with endpoint-specific logic if needed +3. Add endpoint validation to shell scripts + +### Custom Load Patterns +Create new test classes in the appropriate module: +```python +# In test_stress.py +class TestStressCustom: + def test_my_stress_scenario(self, base_url: str): + # Custom stress testing logic + pass + +# In test_normal.py +class TestNormalCustom: + def test_my_normal_scenario(self, base_url: str): + # Custom normal load testing logic + pass +``` + +### Integration with Monitoring +Extend tests to collect metrics: +```python +from tests.load.load_tester import LoadTester +from tests.load.prometheus_utils import PrometheusClient + +class MonitoringLoadTester(LoadTester): + def __init__(self, base_url, prometheus_url=None, **kwargs): + super().__init__(base_url, prometheus_url=prometheus_url, **kwargs) + + def test_with_metrics(self, url, workers, duration): + # Run test with infrastructure metrics + return self.test_concurrency_level( + url, workers, duration, + collect_infra_metrics=True + ) +``` + +### Custom Prometheus Queries +```python +from tests.load.prometheus_utils import PrometheusClient +from datetime import datetime + +client = PrometheusClient("http://prometheus:9090") + +# Custom query +result = client.query('rate(http_requests_total[5m])') + +# Range query +result = client.query_range( + 'container_cpu_usage_seconds_total{namespace="eoapi"}', + start=datetime.now() - timedelta(minutes=10), + end=datetime.now(), + step="15s" +) +``` + +## Metrics Reference + +### Collected Metrics +Every load test collects: +- `success_count`: Number of successful requests (2xx status) +- `total_requests`: Total requests made +- `success_rate`: Percentage (0-100) +- `duration`: Actual test duration in seconds +- `throughput`: Requests per second +- `latency_min`: Minimum response time (ms) +- `latency_max`: Maximum response time (ms) +- `latency_avg`: Average response time (ms) +- `latency_p50`: 50th percentile (ms) +- `latency_p95`: 95th percentile (ms) +- `latency_p99`: 99th percentile (ms) + +### Optional Infrastructure Metrics +When `--collect-infra-metrics` is enabled: +- `infrastructure.pod_metrics.cpu`: Pod CPU usage over time +- `infrastructure.pod_metrics.memory`: Pod memory usage over time +- `infrastructure.hpa_metrics.current_replicas`: HPA replica count +- `infrastructure.hpa_metrics.desired_replicas`: HPA target replicas +- `infrastructure.request_metrics.request_rate`: Ingress request rate +- `infrastructure.request_metrics.request_latency_p95`: Ingress latency +- `infrastructure.database_metrics.db_connections`: Database connections +- `infrastructure.database_metrics.db_query_duration`: Query duration + +### JSON Export Format +```json +{ + "breaking_point": 45, + "metrics": { + "45": { + "success_count": 1156, + "total_requests": 1253, + "success_rate": 92.3, + "duration": 30.0, + "throughput": 41.8, + " diff --git a/tests/load/__init__.py b/tests/load/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/load/config.py b/tests/load/config.py new file mode 100644 index 00000000..2f65f4f2 --- /dev/null +++ b/tests/load/config.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +""" +Load testing configuration and constants + +Centralized configuration for all load testing parameters, thresholds, +and test profiles. +""" + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class TestProfile: + """Test profile with worker and timeout configuration""" + + max_workers: int + timeout: int + + +class Profiles: + """Predefined test profiles for different load scenarios""" + + LIGHT = TestProfile(max_workers=5, timeout=10) + NORMAL = TestProfile(max_workers=10, timeout=15) + STRESS = TestProfile(max_workers=50, timeout=10) + CHAOS = TestProfile(max_workers=20, timeout=8) + + +class Thresholds: + """Success rate thresholds for different test scenarios""" + + HEALTH_ENDPOINTS = 98.0 + API_ENDPOINTS = 95.0 + API_NORMAL = 93.0 + API_SUSTAINED = 90.0 + STRESS_HIGH = 90.0 + STRESS_MODERATE = 80.0 + STRESS_LOW = 70.0 + CHAOS_HIGH = 70.0 + CHAOS_MODERATE = 60.0 + CHAOS_LOW = 50.0 + DEGRADED = 30.0 + RECOVERY = 85.0 + + +class Endpoints: + """Common API endpoints for testing""" + + STAC_COLLECTIONS = "/stac/collections" + STAC_SEARCH = "/stac/search" + RASTER_HEALTH = "/raster/healthz" + VECTOR_HEALTH = "/vector/healthz" + + @classmethod + def all_health(cls) -> list[str]: + """Return all health check endpoints""" + return [cls.RASTER_HEALTH, cls.VECTOR_HEALTH] + + @classmethod + def all_api(cls) -> list[str]: + """Return all API endpoints""" + return [cls.STAC_COLLECTIONS, cls.STAC_SEARCH] + + @classmethod + def all_endpoints(cls) -> list[str]: + """Return all endpoints""" + return cls.all_api() + cls.all_health() + + +class Durations: + """Test duration constants (seconds)""" + + QUICK = 3 + SHORT = 5 + NORMAL = 10 + MODERATE = 30 + LONG = 60 + EXTENDED = 300 + + +class Concurrency: + """Concurrency level constants""" + + SINGLE = 1 + LIGHT = 3 + MODERATE = 5 + NORMAL = 10 + HIGH = 15 + STRESS = 20 + EXTREME = 25 + + +class Latency: + """Latency threshold constants (milliseconds)""" + + P50_FAST = 100 + P50_ACCEPTABLE = 200 + P95_FAST = 500 + P95_ACCEPTABLE = 1000 + P99_FAST = 2000 + P99_ACCEPTABLE = 5000 + + +# Default values +DEFAULT_MAX_WORKERS = 50 +DEFAULT_TIMEOUT = 30 +DEFAULT_SUCCESS_THRESHOLD = 95.0 +DEFAULT_NAMESPACE = "eoapi" + +# Load testing parameters +REQUEST_DELAY = 0.1 # Delay between request submissions +RETRY_TOTAL = 3 +RETRY_BACKOFF_FACTOR = 1 +RETRY_STATUS_CODES = [429, 500, 502, 503, 504] diff --git a/tests/load/conftest.py b/tests/load/conftest.py new file mode 100644 index 00000000..2236388e --- /dev/null +++ b/tests/load/conftest.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +""" +Shared pytest fixtures for load testing + +Enhanced fixtures using centralized configuration and test helpers. +""" + +import os +from typing import Dict + +import pytest + +from .config import Endpoints, Profiles, Thresholds +from .load_tester import LoadTester +from .test_helpers import create_tester + + +@pytest.fixture(scope="session") +def base_url() -> str: + """ + Get the base URL for eoAPI services + + Returns base URL from STAC_ENDPOINT env or defaults to http://localhost + """ + stac_endpoint = os.getenv("STAC_ENDPOINT", "http://localhost/stac") + return stac_endpoint.replace("/stac", "") + + +@pytest.fixture(scope="session") +def endpoints() -> type[Endpoints]: + """Provide endpoints configuration""" + return Endpoints + + +@pytest.fixture(scope="session") +def thresholds() -> type[Thresholds]: + """Provide success rate thresholds""" + return Thresholds + + +@pytest.fixture(scope="function") +def load_tester(base_url: str) -> LoadTester: + """ + Create LoadTester for general load testing + + Args: + base_url: Base URL from base_url fixture + + Returns: + LoadTester configured with normal profile + """ + return create_tester( + base_url=base_url, + max_workers=Profiles.NORMAL.max_workers, + timeout=Profiles.NORMAL.timeout, + ) + + +@pytest.fixture(scope="function") +def stress_tester(base_url: str) -> LoadTester: + """ + Create LoadTester optimized for stress testing + + Args: + base_url: Base URL from base_url fixture + + Returns: + LoadTester configured with stress profile + """ + return create_tester( + base_url=base_url, + max_workers=Profiles.STRESS.max_workers, + timeout=Profiles.STRESS.timeout, + ) + + +@pytest.fixture(scope="function") +def chaos_tester(base_url: str) -> LoadTester: + """ + Create LoadTester for chaos testing + + Args: + base_url: Base URL from base_url fixture + + Returns: + LoadTester configured with chaos profile + """ + return create_tester( + base_url=base_url, + max_workers=Profiles.CHAOS.max_workers, + timeout=Profiles.CHAOS.timeout, + ) + + +@pytest.fixture(scope="function") +def light_tester(base_url: str) -> LoadTester: + """ + Create LoadTester for light load testing + + Args: + base_url: Base URL from base_url fixture + + Returns: + LoadTester configured with light profile + """ + return create_tester( + base_url=base_url, + max_workers=Profiles.LIGHT.max_workers, + timeout=Profiles.LIGHT.timeout, + ) + + +@pytest.fixture(scope="session") +def endpoint_thresholds() -> Dict[str, float]: + """ + Map endpoints to appropriate success rate thresholds + + Returns: + Dictionary mapping endpoint patterns to thresholds + """ + return { + "healthz": Thresholds.HEALTH_ENDPOINTS, + "collections": Thresholds.API_ENDPOINTS, + "search": Thresholds.API_ENDPOINTS, + } diff --git a/tests/load/load_tester.py b/tests/load/load_tester.py new file mode 100644 index 00000000..f2611518 --- /dev/null +++ b/tests/load/load_tester.py @@ -0,0 +1,722 @@ +#!/usr/bin/env python3 +""" +eoAPI Load Testing Utility + +This module provides the core LoadTester class and CLI for all types of +load testing: stress, normal, and chaos testing. +""" + +import argparse +import concurrent.futures +import json +import logging +import os +import random +import statistics +import subprocess +import sys +import time +from datetime import datetime +from typing import Dict, List, Optional, Tuple + +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +try: + from .prometheus_utils import ( + PrometheusClient, + collect_test_metrics, + summarize_metrics, + ) + + PROMETHEUS_AVAILABLE = True +except ImportError: + PROMETHEUS_AVAILABLE = False + logger.warning("Prometheus utilities not available") + +# Constants +DEFAULT_MAX_WORKERS = 50 +DEFAULT_TIMEOUT = 30 +DEFAULT_SUCCESS_THRESHOLD = 95.0 +LIGHT_LOAD_WORKERS = 3 +LIGHT_LOAD_DURATION = 5 +MODERATE_LOAD_WORKERS = 10 +STRESS_TEST_WORKERS = 20 +REQUEST_DELAY = 0.1 # Delay between request submissions +RETRY_TOTAL = 3 +RETRY_BACKOFF_FACTOR = 1 +RETRY_STATUS_CODES = [429, 500, 502, 503, 504] + + +class LoadTester: + """Load tester for eoAPI endpoints supporting stress, normal, and chaos testing""" + + def __init__( + self, + base_url: str, + max_workers: int = DEFAULT_MAX_WORKERS, + timeout: int = DEFAULT_TIMEOUT, + prometheus_url: Optional[str] = None, + namespace: str = "eoapi", + ): + """ + Initialize LoadTester with validation + + Args: + base_url: Base URL for eoAPI services + max_workers: Maximum number of concurrent workers + timeout: Request timeout in seconds + prometheus_url: Optional Prometheus URL for infrastructure metrics + namespace: Kubernetes namespace for Prometheus queries + + Raises: + ValueError: If parameters are invalid + """ + # Validate inputs + if not base_url or not isinstance(base_url, str): + raise ValueError(f"Invalid base_url: {base_url}") + if not base_url.startswith(("http://", "https://")): + raise ValueError( + f"base_url must start with http:// or https://: {base_url}" + ) + if not isinstance(max_workers, int) or max_workers <= 0: + raise ValueError(f"max_workers must be a positive integer: {max_workers}") + if not isinstance(timeout, int) or timeout <= 0: + raise ValueError(f"timeout must be a positive integer: {timeout}") + + self.base_url = base_url.rstrip("/") + self.max_workers = max_workers + self.timeout = timeout + self.prometheus_url = prometheus_url + self.namespace = namespace + self.session = self._create_session() + + # Initialize Prometheus client if available and URL provided + self.prometheus = None + if PROMETHEUS_AVAILABLE and prometheus_url: + self.prometheus = PrometheusClient(prometheus_url) + if self.prometheus.available: + logger.info(f"Prometheus integration enabled: {prometheus_url}") + else: + logger.info("Prometheus integration disabled (unavailable)") + self.prometheus = None + + logger.info( + f"LoadTester initialized: base_url={self.base_url}, " + f"max_workers={self.max_workers}, timeout={self.timeout}" + ) + + def _create_session(self) -> requests.Session: + """Create a session with retry strategy""" + session = requests.Session() + + # Retry strategy + retry_strategy = Retry( + total=RETRY_TOTAL, + backoff_factor=RETRY_BACKOFF_FACTOR, + status_forcelist=RETRY_STATUS_CODES, + ) + + adapter = HTTPAdapter(max_retries=retry_strategy) + session.mount("http://", adapter) + session.mount("https://", adapter) + + logger.debug("HTTP session created with retry strategy") + return session + + def make_request(self, url: str) -> Tuple[bool, float]: + """ + Make a single request and return success status with latency + + Args: + url: URL to request + + Returns: + Tuple of (success, latency_ms) where success is True if 200 status + """ + start_time = time.time() + try: + response = self.session.get(url, timeout=self.timeout) + latency_ms = (time.time() - start_time) * 1000 + success = response.status_code == 200 + if not success: + logger.debug(f"Request to {url} returned status {response.status_code}") + return success, latency_ms + except requests.exceptions.Timeout: + latency_ms = (time.time() - start_time) * 1000 + logger.debug(f"Request to {url} timed out after {self.timeout}s") + return False, latency_ms + except requests.exceptions.ConnectionError as e: + latency_ms = (time.time() - start_time) * 1000 + logger.debug(f"Connection error for {url}: {e}") + return False, latency_ms + except requests.exceptions.RequestException as e: + latency_ms = (time.time() - start_time) * 1000 + logger.debug(f"Request failed for {url}: {e}") + return False, latency_ms + except Exception as e: + latency_ms = (time.time() - start_time) * 1000 + logger.error(f"Unexpected error in make_request for {url}: {e}") + return False, latency_ms + + def test_concurrency_level( + self, + url: str, + workers: int, + duration: int = 10, + collect_infra_metrics: bool = False, + ) -> Dict: + """ + Test a specific concurrency level for a given duration + + Args: + url: URL to test + workers: Number of concurrent workers + duration: Test duration in seconds + collect_infra_metrics: Whether to collect Prometheus infrastructure metrics + + Returns: + Dict with metrics including success rate, latencies, throughput, and optional infra metrics + """ + logger.info(f"Testing {url} with {workers} concurrent requests for {duration}s") + + test_start = datetime.now() + start_time = time.time() + success_count = 0 + total_requests = 0 + latencies: List[float] = [] + + with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: + futures = [] + + # Submit requests for the specified duration + while time.time() - start_time < duration: + future = executor.submit(self.make_request, url) + futures.append(future) + total_requests += 1 + time.sleep(REQUEST_DELAY) + + # Collect results and latencies + for future in concurrent.futures.as_completed(futures): + success, latency_ms = future.result() + if success: + success_count += 1 + latencies.append(latency_ms) + + test_end = datetime.now() + actual_duration = time.time() - start_time + success_rate = ( + (success_count / total_requests) * 100 if total_requests > 0 else 0 + ) + + # Calculate latency metrics + metrics: Dict[str, float | Dict] = { + "success_count": success_count, + "total_requests": total_requests, + "success_rate": success_rate, + "duration": actual_duration, + "throughput": total_requests / actual_duration + if actual_duration > 0 + else 0, + } + + if latencies: + sorted_latencies = sorted(latencies) + metrics.update( + { + "latency_min": min(latencies), + "latency_max": max(latencies), + "latency_avg": statistics.mean(latencies), + "latency_p50": statistics.median(sorted_latencies), + "latency_p95": sorted_latencies[int(len(sorted_latencies) * 0.95)] + if len(sorted_latencies) > 1 + else sorted_latencies[0], + "latency_p99": sorted_latencies[int(len(sorted_latencies) * 0.99)] + if len(sorted_latencies) > 1 + else sorted_latencies[0], + } + ) + + logger.info( + f"Workers: {workers}, Success: {success_rate:.1f}% ({success_count}/{total_requests}), " + f"Latency p50/p95/p99: {metrics.get('latency_p50', 0):.0f}/{metrics.get('latency_p95', 0):.0f}/{metrics.get('latency_p99', 0):.0f}ms, " + f"Throughput: {metrics['throughput']:.1f} req/s" + ) + + # Collect infrastructure metrics if requested and available + if collect_infra_metrics and self.prometheus: + logger.info("Collecting infrastructure metrics from Prometheus...") + infra_metrics = collect_test_metrics( + self.prometheus_url, self.namespace, test_start, test_end + ) + if infra_metrics: + metrics["infrastructure"] = infra_metrics + summary = summarize_metrics(infra_metrics) + logger.info(f"Infrastructure metrics: {summary}") + + return metrics + + def find_breaking_point( + self, + endpoint: str = "/stac/collections", + success_threshold: float = DEFAULT_SUCCESS_THRESHOLD, + step_size: int = 5, + test_duration: int = 10, + cooldown: int = 2, + ) -> Tuple[int, Dict]: + """ + Find the breaking point by gradually increasing concurrent load + + Args: + endpoint: API endpoint to test (relative to base_url) + success_threshold: Minimum success rate to maintain + step_size: Increment for number of workers + test_duration: Duration to test each concurrency level + cooldown: Time to wait between tests + + Returns: + Tuple of (breaking_point_workers, all_metrics) + """ + url = f"{self.base_url}{endpoint}" + logger.info(f"Starting stress test on {url}") + logger.info( + f"Max workers: {self.max_workers}, Success threshold: {success_threshold}%" + ) + + all_metrics = {} + for workers in range(step_size, self.max_workers + 1, step_size): + metrics = self.test_concurrency_level(url, workers, test_duration) + all_metrics[workers] = metrics + + # Stop if success rate drops below threshold + if metrics["success_rate"] < success_threshold: + logger.info( + f"Breaking point found at {workers} concurrent requests " + f"(success rate: {metrics['success_rate']:.1f}%)" + ) + return workers, all_metrics + + # Cool down between test levels + if cooldown > 0: + time.sleep(cooldown) + + logger.info("Stress test completed - no breaking point found") + return self.max_workers, all_metrics + + def run_normal_load( + self, + endpoints: Optional[List[str]] = None, + duration: int = 60, + concurrent_users: int = MODERATE_LOAD_WORKERS, + ramp_up: int = 30, + ) -> Dict: + """ + Run realistic mixed-workload test + + Args: + endpoints: List of endpoints to test + duration: Total test duration + concurrent_users: Peak concurrent users + ramp_up: Time to reach peak load (currently unused) + + Returns: + Dict with results for each endpoint + """ + if endpoints is None: + endpoints = [ + "/stac/collections", + "/raster/healthz", + "/vector/healthz", + ] + + results = {} + logger.info( + f"Starting normal load test ({duration}s, {concurrent_users} users)" + ) + + for endpoint in endpoints: + url = f"{self.base_url}{endpoint}" + logger.info(f"Testing {endpoint}...") + + # Gradual ramp-up + workers = max(1, concurrent_users // len(endpoints)) + metrics = self.test_concurrency_level( + url, workers, duration // len(endpoints) + ) + + results[endpoint] = metrics + + # Add infrastructure summary if Prometheus is enabled + if self.prometheus: + logger.info( + "Note: Use --collect-infra-metrics for detailed infrastructure data" + ) + + return results + + def run_chaos_test( + self, + namespace: str = "eoapi", + duration: int = 300, + kill_interval: int = 60, + endpoint: str = "/stac/collections", + ) -> Dict: + """ + Run chaos test by killing pods during load + + Args: + namespace: Kubernetes namespace + duration: Test duration + kill_interval: Seconds between pod kills + endpoint: Endpoint to test + + Returns: + Test results and pod kill events + """ + url = f"{self.base_url}{endpoint}" + logger.info(f"Starting chaos test on {url} (namespace: {namespace})") + + # Get initial pod list + try: + pods = ( + subprocess.check_output( + [ + "kubectl", + "get", + "pods", + "-n", + namespace, + "-l", + "app.kubernetes.io/component in (stac,raster,vector)", + "-o", + "jsonpath={.items[*].metadata.name}", + ], + text=True, + ) + .strip() + .split() + ) + logger.info(f"Found {len(pods)} pods for chaos testing") + except subprocess.CalledProcessError as e: + logger.warning(f"Could not get pod list, chaos disabled: {e}") + pods = [] + + results: Dict[str, list | float | int] = { + "killed_pods": [], + "success_rate": 0, + } + start_time = time.time() + + # Background load generation + import threading + + load_results: Dict[str, int] = {"success": 0, "total": 0} + + def generate_load(): + while time.time() - start_time < duration: + success, _ = self.make_request(url) + if success: + load_results["success"] += 1 + load_results["total"] += 1 + time.sleep(0.5) + + # Start load generation with daemon thread for cleanup + load_thread = threading.Thread(target=generate_load, daemon=True) + load_thread.start() + + try: + # Kill pods periodically + while time.time() - start_time < duration and pods: + time.sleep(kill_interval) + + if pods: + pod_to_kill = random.choice(pods) + logger.info(f"Killing pod: {pod_to_kill}") + try: + subprocess.run( + [ + "kubectl", + "delete", + "pod", + pod_to_kill, + "-n", + namespace, + ], + check=True, + capture_output=True, + ) + killed_pods = results["killed_pods"] + if isinstance(killed_pods, list): + killed_pods.append(pod_to_kill) + pods.remove(pod_to_kill) + except subprocess.CalledProcessError as e: + logger.error(f"Failed to kill pod {pod_to_kill}: {e}") + finally: + # Ensure we wait for load thread to complete + load_thread.join(timeout=duration + 10) + + if load_results["total"] > 0: + results["success_rate"] = ( + load_results["success"] / load_results["total"] + ) * 100 + results.update(load_results) + + killed_pods_list = results["killed_pods"] + logger.info( + f"Chaos test completed: {results['success_rate']:.1f}% success rate, " + f"killed {len(killed_pods_list) if isinstance(killed_pods_list, list) else 0} pods" + ) + return results + + +def print_metrics_summary(metrics: Dict, title: str = "Test Results"): + """ + Print concise, readable metrics summary + + Args: + metrics: Metrics dictionary from test + title: Title for the summary + """ + print(f"\n{'=' * 60}") + print(f"{title}") + print(f"{'=' * 60}") + print( + f"Success Rate: {metrics.get('success_rate', 0):.1f}% " + f"({metrics.get('success_count', 0)}/{metrics.get('total_requests', 0)})" + ) + + if "latency_p50" in metrics: + print( + f"Latency (ms): p50={metrics['latency_p50']:.0f} " + f"p95={metrics['latency_p95']:.0f} " + f"p99={metrics['latency_p99']:.0f} " + f"(min={metrics['latency_min']:.0f}, max={metrics['latency_max']:.0f}, avg={metrics['latency_avg']:.0f})" + ) + + if "throughput" in metrics: + print(f"Throughput: {metrics['throughput']:.1f} req/s") + + if "duration" in metrics: + print(f"Duration: {metrics['duration']:.1f}s") + + # Infrastructure metrics summary + if "infrastructure" in metrics: + print("\nInfrastructure Metrics:") + summary = summarize_metrics(metrics["infrastructure"]) + for key, value in summary.items(): + print(f" {key}: {value}") + + print(f"{'=' * 60}\n") + + +def export_metrics_json(metrics: Dict, filename: str): + """ + Export metrics to JSON file + + Args: + metrics: Metrics dictionary to export + filename: Output filename + """ + try: + with open(filename, "w") as f: + json.dump(metrics, f, indent=2) + logger.info(f"Metrics exported to {filename}") + except Exception as e: + logger.error(f"Failed to export metrics: {e}") + + +def main(): + """Main entry point for eoAPI load testing CLI""" + parser = argparse.ArgumentParser(description="eoAPI Load Testing CLI") + + # Test type selection + parser.add_argument( + "test_type", + choices=["stress", "normal", "chaos"], + default="stress", + nargs="?", + help="Type of test to run (default: stress)", + ) + + # Common arguments + parser.add_argument( + "--base-url", + default=os.getenv("STAC_ENDPOINT", "http://localhost").replace("/stac", ""), + help="Base URL for eoAPI (default: from STAC_ENDPOINT env or http://localhost)", + ) + parser.add_argument( + "--timeout", + type=int, + default=DEFAULT_TIMEOUT, + help=f"Request timeout in seconds (default: {DEFAULT_TIMEOUT})", + ) + parser.add_argument( + "--verbose", + "-v", + action="store_true", + help="Enable verbose debug logging", + ) + parser.add_argument( + "--report-json", + type=str, + help="Export metrics to JSON file", + ) + parser.add_argument( + "--prometheus-url", + type=str, + default=os.getenv("PROMETHEUS_URL"), + help="Prometheus URL for infrastructure metrics (default: from PROMETHEUS_URL env)", + ) + parser.add_argument( + "--namespace", + type=str, + default="eoapi", + help="Kubernetes namespace (default: eoapi)", + ) + parser.add_argument( + "--collect-infra-metrics", + action="store_true", + help="Collect infrastructure metrics from Prometheus during tests", + ) + + # Stress test arguments + stress_group = parser.add_argument_group("stress test options") + stress_group.add_argument("--endpoint", default="/stac/collections") + stress_group.add_argument("--max-workers", type=int, default=DEFAULT_MAX_WORKERS) + stress_group.add_argument( + "--success-threshold", type=float, default=DEFAULT_SUCCESS_THRESHOLD + ) + stress_group.add_argument("--step-size", type=int, default=5) + stress_group.add_argument("--test-duration", type=int, default=10) + stress_group.add_argument("--cooldown", type=int, default=2) + + # Normal test arguments + normal_group = parser.add_argument_group("normal test options") + normal_group.add_argument( + "--duration", type=int, default=60, help="Test duration (default: 60)" + ) + normal_group.add_argument( + "--users", + type=int, + default=MODERATE_LOAD_WORKERS, + help=f"Concurrent users (default: {MODERATE_LOAD_WORKERS})", + ) + + # Chaos test arguments + chaos_group = parser.add_argument_group("chaos test options") + chaos_group.add_argument( + "--kill-interval", + type=int, + default=60, + help="Seconds between pod kills (default: 60)", + ) + + args = parser.parse_args() + + # Set logging level based on verbosity + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + logger.setLevel(logging.DEBUG) + + try: + tester = LoadTester( + base_url=args.base_url, + max_workers=getattr(args, "max_workers", DEFAULT_MAX_WORKERS), + timeout=args.timeout, + prometheus_url=args.prometheus_url, + namespace=args.namespace, + ) + + if args.test_type == "stress": + breaking_point, all_metrics = tester.find_breaking_point( + endpoint=args.endpoint, + success_threshold=args.success_threshold, + step_size=args.step_size, + test_duration=args.test_duration, + cooldown=args.cooldown, + ) + + # Print summary for breaking point + if breaking_point in all_metrics: + print_metrics_summary( + all_metrics[breaking_point], + f"Stress Test - Breaking Point at {breaking_point} workers", + ) + + # Export if requested + if args.report_json: + export_metrics_json( + {"breaking_point": breaking_point, "metrics": all_metrics}, + args.report_json, + ) + + logger.info( + f"Stress test completed. Breaking point: {breaking_point} workers" + ) + sys.exit(1 if breaking_point < args.max_workers else 0) + + elif args.test_type == "normal": + results = tester.run_normal_load( + duration=args.duration, + concurrent_users=args.users, + ) + + # Print summary for each endpoint + for endpoint, metrics in results.items(): + print_metrics_summary(metrics, f"Normal Load Test - {endpoint}") + + avg_success = sum(r["success_rate"] for r in results.values()) / len( + results + ) + + # Export if requested + if args.report_json: + export_metrics_json(results, args.report_json) + + logger.info( + f"Normal load test completed. Average success rate: {avg_success:.1f}%" + ) + sys.exit(0 if avg_success >= DEFAULT_SUCCESS_THRESHOLD else 1) + + elif args.test_type == "chaos": + results = tester.run_chaos_test( + namespace=args.namespace, + duration=args.duration, + kill_interval=args.kill_interval, + ) + + # Print summary + print_metrics_summary(results, "Chaos Test Results") + + # Export if requested + if args.report_json: + export_metrics_json(results, args.report_json) + + logger.info( + f"Chaos test completed. Success rate: {results['success_rate']:.1f}%" + ) + sys.exit(0 if results["success_rate"] >= 80 else 1) + + except ValueError as e: + logger.error(f"Invalid configuration: {e}") + sys.exit(1) + except KeyboardInterrupt: + logger.info(f"{args.test_type.title()} test interrupted by user") + sys.exit(2) + except Exception as e: + logger.error( + f"{args.test_type.title()} test failed: {e}", exc_info=args.verbose + ) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tests/load/prometheus_utils.py b/tests/load/prometheus_utils.py new file mode 100644 index 00000000..56bc78bd --- /dev/null +++ b/tests/load/prometheus_utils.py @@ -0,0 +1,357 @@ +#!/usr/bin/env python3 +""" +Prometheus Integration for Load Testing + +Optional module for querying Prometheus metrics during load tests. +Gracefully degrades if Prometheus is unavailable. +""" + +import logging +from datetime import datetime +from typing import Dict, Optional + +import requests + +logger = logging.getLogger(__name__) + +DEFAULT_PROMETHEUS_URL = "http://localhost:9090" +DEFAULT_TIMEOUT = 10 + + +class PrometheusClient: + """Simple Prometheus client for querying metrics during load tests""" + + def __init__( + self, url: str = DEFAULT_PROMETHEUS_URL, timeout: int = DEFAULT_TIMEOUT + ): + """ + Initialize Prometheus client + + Args: + url: Prometheus server URL + timeout: Request timeout in seconds + """ + self.url = url.rstrip("/") + self.timeout = timeout + self.available = self._check_availability() + + def _check_availability(self) -> bool: + """Check if Prometheus is available""" + try: + response = requests.get( + f"{self.url}/api/v1/status/config", + timeout=self.timeout, + ) + if response.status_code == 200: + logger.info(f"Prometheus available at {self.url}") + return True + logger.warning(f"Prometheus returned status {response.status_code}") + return False + except Exception as e: + logger.debug(f"Prometheus not available at {self.url}: {e}") + return False + + def query(self, query: str, time: Optional[datetime] = None) -> Optional[Dict]: + """ + Execute instant Prometheus query + + Args: + query: PromQL query string + time: Optional time for query (defaults to now) + + Returns: + Query result dict or None if unavailable + """ + if not self.available: + return None + + try: + params: dict[str, str | float] = {"query": query} + if time: + params["time"] = time.timestamp() + + response = requests.get( + f"{self.url}/api/v1/query", + params=params, + timeout=self.timeout, + ) + + if response.status_code == 200: + data = response.json() + if data.get("status") == "success": + return data.get("data", {}) + + logger.warning(f"Prometheus query failed: {response.status_code}") + return None + except Exception as e: + logger.debug(f"Prometheus query error: {e}") + return None + + def query_range( + self, + query: str, + start: datetime, + end: datetime, + step: str = "15s", + ) -> Optional[Dict]: + """ + Execute range Prometheus query + + Args: + query: PromQL query string + start: Start time + end: End time + step: Query resolution step + + Returns: + Query result dict or None if unavailable + """ + if not self.available: + return None + + try: + params: dict[str, str | float] = { + "query": query, + "start": start.timestamp(), + "end": end.timestamp(), + "step": step, + } + + response = requests.get( + f"{self.url}/api/v1/query_range", + params=params, + timeout=self.timeout, + ) + + if response.status_code == 200: + data = response.json() + if data.get("status") == "success": + return data.get("data", {}) + + logger.warning(f"Prometheus range query failed: {response.status_code}") + return None + except Exception as e: + logger.debug(f"Prometheus range query error: {e}") + return None + + +def get_pod_metrics( + client: PrometheusClient, + namespace: str, + start: datetime, + end: datetime, +) -> Dict[str, Optional[Dict]]: + """ + Get pod CPU and memory metrics for the test duration + + Args: + client: PrometheusClient instance + namespace: Kubernetes namespace + start: Test start time + end: Test end time + + Returns: + Dict with CPU and memory metrics or empty values if unavailable + """ + metrics = {} + + # CPU usage + cpu_query = ( + f'rate(container_cpu_usage_seconds_total{{namespace="{namespace}",' + f'container!="",container!="POD"}}[1m])' + ) + metrics["cpu"] = client.query_range(cpu_query, start, end) + + # Memory usage + memory_query = ( + f'container_memory_working_set_bytes{{namespace="{namespace}",' + f'container!="",container!="POD"}}' + ) + metrics["memory"] = client.query_range(memory_query, start, end) + + return metrics + + +def get_hpa_metrics( + client: PrometheusClient, + namespace: str, + start: datetime, + end: datetime, +) -> Dict[str, Optional[Dict]]: + """ + Get HPA scaling events and replica counts + + Args: + client: PrometheusClient instance + namespace: Kubernetes namespace + start: Test start time + end: Test end time + + Returns: + Dict with HPA metrics or empty values if unavailable + """ + metrics = {} + + # Current replicas + replicas_query = f'kube_horizontalpodautoscaler_status_current_replicas{{namespace="{namespace}"}}' + metrics["current_replicas"] = client.query_range(replicas_query, start, end) + + # Desired replicas + desired_query = f'kube_horizontalpodautoscaler_status_desired_replicas{{namespace="{namespace}"}}' + metrics["desired_replicas"] = client.query_range(desired_query, start, end) + + return metrics + + +def get_request_metrics( + client: PrometheusClient, + namespace: str, + start: datetime, + end: datetime, +) -> Dict[str, Optional[Dict]]: + """ + Get request rate and latency from ingress/service mesh + + Args: + client: PrometheusClient instance + namespace: Kubernetes namespace + start: Test start time + end: Test end time + + Returns: + Dict with request metrics or empty values if unavailable + """ + metrics = {} + + # Request rate (depends on ingress controller) + # Try nginx ingress first + rate_query = ( + f'rate(nginx_ingress_controller_requests{{namespace="{namespace}"}}[1m])' + ) + metrics["request_rate"] = client.query_range(rate_query, start, end) + + # Request duration + latency_query = ( + f"histogram_quantile(0.95, " + f"rate(nginx_ingress_controller_request_duration_seconds_bucket" + f'{{namespace="{namespace}"}}[1m]))' + ) + metrics["request_latency_p95"] = client.query_range(latency_query, start, end) + + return metrics + + +def get_database_metrics( + client: PrometheusClient, + namespace: str, + start: datetime, + end: datetime, +) -> Dict[str, Optional[Dict]]: + """ + Get database connection and query metrics + + Args: + client: PrometheusClient instance + namespace: Kubernetes namespace + start: Test start time + end: Test end time + + Returns: + Dict with database metrics or empty values if unavailable + """ + metrics = {} + + # PostgreSQL connections + connections_query = f'pg_stat_activity_count{{namespace="{namespace}"}}' + metrics["db_connections"] = client.query_range(connections_query, start, end) + + # Query duration + query_duration = ( + f'rate(pg_stat_statements_mean_exec_time{{namespace="{namespace}"}}[1m])' + ) + metrics["db_query_duration"] = client.query_range(query_duration, start, end) + + return metrics + + +def collect_test_metrics( + prometheus_url: Optional[str], + namespace: str, + start: datetime, + end: datetime, +) -> Dict[str, Dict]: + """ + Collect all available infrastructure metrics for a test + + Args: + prometheus_url: Prometheus URL (None to skip) + namespace: Kubernetes namespace + start: Test start time + end: Test end time + + Returns: + Dict with all collected metrics (empty dicts if unavailable) + """ + if not prometheus_url: + logger.info("Prometheus URL not provided, skipping metrics collection") + return {} + + client = PrometheusClient(prometheus_url) + if not client.available: + logger.info("Prometheus unavailable, skipping metrics collection") + return {} + + logger.info(f"Collecting infrastructure metrics from {start} to {end}") + + all_metrics = { + "pod_metrics": get_pod_metrics(client, namespace, start, end), + "hpa_metrics": get_hpa_metrics(client, namespace, start, end), + "request_metrics": get_request_metrics(client, namespace, start, end), + "database_metrics": get_database_metrics(client, namespace, start, end), + } + + # Filter out None values + return {k: v for k, v in all_metrics.items() if v and any(v.values())} + + +def summarize_metrics(metrics: Dict) -> Dict[str, str]: + """ + Create human-readable summary of infrastructure metrics + + Args: + metrics: Infrastructure metrics dict + + Returns: + Dict with summarized metrics for display + """ + summary = {} + + # Pod metrics + if "pod_metrics" in metrics: + pod = metrics["pod_metrics"] + if pod.get("cpu"): + summary["pod_cpu"] = "Collected" + if pod.get("memory"): + summary["pod_memory"] = "Collected" + + # HPA metrics + if "hpa_metrics" in metrics: + hpa = metrics["hpa_metrics"] + if hpa.get("current_replicas"): + summary["hpa_scaling"] = "Observed" + + # Request metrics + if "request_metrics" in metrics: + req = metrics["request_metrics"] + if req.get("request_rate"): + summary["ingress_rate"] = "Collected" + if req.get("request_latency_p95"): + summary["ingress_latency"] = "Collected" + + # Database metrics + if "database_metrics" in metrics: + db = metrics["database_metrics"] + if db.get("db_connections"): + summary["db_connections"] = "Collected" + + return summary if summary else {"status": "No metrics available"} diff --git a/tests/load/test_chaos.py b/tests/load/test_chaos.py new file mode 100644 index 00000000..59c64186 --- /dev/null +++ b/tests/load/test_chaos.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python3 +""" +Pytest-based chaos tests for eoAPI services + +Refactored to use parametrization, centralized config, and test helpers +to reduce duplication and improve maintainability. +""" + +import subprocess +import time + +import pytest + +from .config import Concurrency, Durations, Endpoints, Thresholds +from .test_helpers import ( + assert_recovery, + assert_success_rate, + build_url, + create_tester, +) + + +def check_kubectl_available() -> bool: + """Check if kubectl is available""" + try: + subprocess.run( + ["kubectl", "version", "--client"], + check=True, + capture_output=True, + ) + return True + except (subprocess.CalledProcessError, FileNotFoundError): + return False + + +class TestChaosResilience: + """Tests for service resilience during infrastructure chaos""" + + @pytest.mark.slow + def test_pod_failure_resilience(self, chaos_tester): + """Test service resilience during pod failures""" + if not check_kubectl_available(): + pytest.skip("kubectl not available") + + results = chaos_tester.run_chaos_test( + duration=Durations.LONG // 5, + kill_interval=30, + endpoint=Endpoints.STAC_COLLECTIONS, + ) + + assert_success_rate(results, Thresholds.CHAOS_MODERATE, "pod failure chaos") + + @pytest.mark.slow + @pytest.mark.parametrize( + "endpoint,threshold", + [ + (Endpoints.STAC_COLLECTIONS, Thresholds.CHAOS_MODERATE), + (Endpoints.RASTER_HEALTH, Thresholds.CHAOS_HIGH), + (Endpoints.VECTOR_HEALTH, Thresholds.CHAOS_HIGH), + ], + ) + def test_multiple_service_failures( + self, chaos_tester, endpoint: str, threshold: float + ): + """Test resilience when services experience chaos""" + if not check_kubectl_available(): + pytest.skip("kubectl not available") + + results = chaos_tester.run_chaos_test( + duration=45, kill_interval=20, endpoint=endpoint + ) + + assert_success_rate(results, threshold, f"chaos: {endpoint}") + + def test_gradual_failure_recovery(self, load_tester): + """Test service recovery after gradual failure introduction""" + url = build_url(load_tester.base_url, Endpoints.STAC_COLLECTIONS) + + # Phase 1: Normal operation + normal_metrics = load_tester.test_concurrency_level( + url, Concurrency.LIGHT, Durations.NORMAL + ) + + # Phase 2: Introduce failures (aggressive timeout) + aggressive = create_tester( + base_url=load_tester.base_url, max_workers=10, timeout=1 + ) + degraded_metrics = aggressive.test_concurrency_level( + url, Concurrency.MODERATE, Durations.MODERATE // 2 + ) + + # Phase 3: Recovery + time.sleep(5) + recovery_metrics = load_tester.test_concurrency_level( + url, Concurrency.LIGHT, Durations.NORMAL + ) + + assert_success_rate(normal_metrics, Thresholds.API_SUSTAINED, "baseline") + assert_recovery(degraded_metrics, recovery_metrics, context="gradual") + + +class TestChaosNetwork: + """Tests for network-related chaos scenarios""" + + def test_network_instability(self, load_tester): + """Test behavior under network instability""" + tester = create_tester(base_url=load_tester.base_url, max_workers=5, timeout=2) + url = build_url(tester.base_url, Endpoints.STAC_COLLECTIONS) + + metrics = tester.test_concurrency_level( + url, Concurrency.LIGHT, Durations.NORMAL + ) + + assert_success_rate(metrics, Thresholds.CHAOS_LOW, "network instability") + assert metrics["total_requests"] > 0, "No requests made" + + def test_timeout_cascade_prevention(self, load_tester): + """Test that timeout issues don't cascade across requests""" + timeouts = [5, 3, 1, 2, 4] # Recovery pattern + url = build_url(load_tester.base_url, Endpoints.STAC_COLLECTIONS) + + results = [] + for timeout in timeouts: + tester = create_tester( + base_url=load_tester.base_url, max_workers=3, timeout=timeout + ) + metrics = tester.test_concurrency_level( + url, Concurrency.SINGLE + 1, Durations.SHORT + ) + results.append(metrics) + time.sleep(1) + + recovery_rate = results[-1]["success_rate"] + assert recovery_rate >= Thresholds.STRESS_MODERATE, ( + f"No recovery from timeout cascade: {recovery_rate:.1f}% " + f"< {Thresholds.STRESS_MODERATE:.1f}%" + ) + + @pytest.mark.parametrize( + "endpoint,threshold", + [ + (Endpoints.STAC_COLLECTIONS, Thresholds.CHAOS_MODERATE), + (Endpoints.RASTER_HEALTH, Thresholds.CHAOS_MODERATE), + (Endpoints.VECTOR_HEALTH, Thresholds.CHAOS_MODERATE), + ], + ) + def test_concurrent_failure_modes( + self, load_tester, endpoint: str, threshold: float + ): + """Test multiple failure modes occurring simultaneously""" + metrics = load_tester.test_concurrency_level( + build_url(load_tester.base_url, endpoint), + Concurrency.MODERATE - 1, + Durations.MODERATE // 2 + 2, + ) + + assert_success_rate(metrics, threshold, f"concurrent failure: {endpoint}") + + +class TestChaosResource: + """Tests for resource constraint chaos scenarios""" + + def test_resource_exhaustion_simulation(self, load_tester): + """Test behavior when resources are constrained""" + tester = create_tester(base_url=load_tester.base_url, max_workers=25, timeout=5) + url = build_url(tester.base_url, Endpoints.STAC_COLLECTIONS) + + metrics = tester.test_concurrency_level( + url, Concurrency.STRESS, Durations.MODERATE // 2 + ) + + assert_success_rate(metrics, Thresholds.DEGRADED, "resource exhaustion") + assert ( + metrics["total_requests"] >= 50 + ), f"Insufficient load: {metrics['total_requests']} < 50" + + def test_memory_pressure_resilience(self, load_tester): + """Test resilience under simulated memory pressure""" + tester = create_tester(base_url=load_tester.base_url, max_workers=30, timeout=8) + url = build_url(tester.base_url, Endpoints.RASTER_HEALTH) + + metrics = tester.test_concurrency_level( + url, Concurrency.HIGH, Durations.MODERATE - 10 + ) + + assert_success_rate(metrics, Thresholds.CHAOS_LOW, "memory pressure") + + def test_connection_pool_exhaustion(self, load_tester): + """Test behavior when connection pools are exhausted""" + testers = [ + create_tester(base_url=load_tester.base_url, max_workers=10, timeout=3) + for _ in range(3) + ] + + url = build_url(load_tester.base_url, Endpoints.STAC_COLLECTIONS) + results = [] + + for tester in testers: + metrics = tester.test_concurrency_level( + url, Concurrency.MODERATE + 1, Durations.NORMAL - 2 + ) + results.append(metrics["success_rate"]) + + max_rate = max(results) + assert max_rate >= Thresholds.CHAOS_MODERATE - 20, ( + f"All pools exhausted: max {max_rate:.1f}% " + f"< {Thresholds.CHAOS_MODERATE - 20:.1f}%" + ) + + +class TestChaosRecovery: + """Tests for service recovery patterns after chaos events""" + + def test_automatic_recovery_timing(self, load_tester): + """Test automatic service recovery after failures""" + url = build_url(load_tester.base_url, Endpoints.STAC_COLLECTIONS) + + # Induce failures + failure_tester = create_tester( + base_url=load_tester.base_url, max_workers=20, timeout=1 + ) + # Induce failures (don't need metrics, just stress the system) + _ = failure_tester.test_concurrency_level( + url, Concurrency.HIGH, Durations.NORMAL + ) + + # Monitor recovery + recovery_times = [5, 10, 15] + recovery_rates = [] + + for wait_time in recovery_times: + time.sleep(wait_time) + metrics = load_tester.test_concurrency_level( + url, Concurrency.LIGHT, Durations.SHORT + ) + recovery_rates.append(metrics["success_rate"]) + + final_rate = recovery_rates[-1] + assert final_rate >= Thresholds.STRESS_MODERATE, ( + f"No recovery after {recovery_times[-1]}s: {final_rate:.1f}% " + f"< {Thresholds.STRESS_MODERATE:.1f}%" + ) + + def test_service_degradation_levels(self, load_tester): + """Test graceful degradation under increasing chaos""" + url = build_url(load_tester.base_url, Endpoints.STAC_COLLECTIONS) + + chaos_levels = [ + (5, Concurrency.NORMAL, Durations.SHORT), # Light + (3, Concurrency.HIGH, Durations.NORMAL - 2), # Medium + (1, Concurrency.STRESS, Durations.MODERATE // 2 + 2), # Heavy + ] + + degradation_rates = [] + for timeout, workers, duration in chaos_levels: + tester = create_tester( + base_url=load_tester.base_url, max_workers=25, timeout=timeout + ) + metrics = tester.test_concurrency_level(url, workers, duration) + degradation_rates.append(metrics["success_rate"]) + time.sleep(3) + + assert degradation_rates[0] >= Thresholds.STRESS_LOW, ( + f"Failed at low chaos: {degradation_rates[0]:.1f}% " + f"< {Thresholds.STRESS_LOW:.1f}%" + ) + assert min(degradation_rates) >= Thresholds.DEGRADED - 10, ( + f"Complete failure: {min(degradation_rates):.1f}% " + f"< {Thresholds.DEGRADED - 10:.1f}%" + ) diff --git a/tests/load/test_helpers.py b/tests/load/test_helpers.py new file mode 100644 index 00000000..ed9764eb --- /dev/null +++ b/tests/load/test_helpers.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python3 +""" +Load testing helper utilities + +Common utilities and assertions for load tests to reduce duplication +and improve test consistency. +""" + +from typing import Dict, Optional + +from .config import Thresholds +from .load_tester import LoadTester + + +def build_url(base_url: str, endpoint: str) -> str: + """ + Build full URL from base and endpoint + + Args: + base_url: Base URL (with or without trailing slash) + endpoint: Endpoint path (with or without leading slash) + + Returns: + Complete URL + """ + base = base_url.rstrip("/") + endpoint = endpoint if endpoint.startswith("/") else f"/{endpoint}" + return f"{base}{endpoint}" + + +def create_tester( + base_url: str, + max_workers: int = 10, + timeout: int = 10, + prometheus_url: Optional[str] = None, + namespace: str = "eoapi", +) -> LoadTester: + """ + Factory for creating LoadTester instances + + Args: + base_url: Base URL for testing + max_workers: Maximum concurrent workers + timeout: Request timeout + prometheus_url: Optional Prometheus URL + namespace: Kubernetes namespace + + Returns: + Configured LoadTester instance + """ + return LoadTester( + base_url=base_url, + max_workers=max_workers, + timeout=timeout, + prometheus_url=prometheus_url, + namespace=namespace, + ) + + +def assert_success_rate( + metrics: Dict, + min_rate: float, + context: str = "Test", +) -> None: + """ + Assert success rate meets minimum threshold + + Args: + metrics: Metrics dictionary with success_rate + min_rate: Minimum acceptable success rate (%) + context: Context description for assertion message + + Raises: + AssertionError: If success rate below threshold + """ + success_rate = metrics.get("success_rate", 0) + success_count = metrics.get("success_count", 0) + total = metrics.get("total_requests", 0) + + assert success_rate >= min_rate, ( + f"{context}: {success_rate:.1f}% < {min_rate:.1f}% " + f"({success_count}/{total} successful)" + ) + + +def assert_min_requests( + metrics: Dict, + min_count: int, + context: str = "Test", +) -> None: + """ + Assert minimum number of requests were made + + Args: + metrics: Metrics dictionary with total_requests + min_count: Minimum expected request count + context: Context description for assertion message + + Raises: + AssertionError: If request count below minimum + """ + total = metrics.get("total_requests", 0) + assert total >= min_count, f"{context}: {total} requests < {min_count} expected" + + +def assert_has_latency_metrics(metrics: Dict) -> None: + """ + Assert that latency metrics are present + + Args: + metrics: Metrics dictionary + + Raises: + AssertionError: If latency metrics missing + """ + required = ["latency_p50", "latency_p95", "latency_p99"] + missing = [k for k in required if k not in metrics] + assert not missing, f"Missing latency metrics: {missing}" + + +def assert_latency_bounds( + metrics: Dict, + p50_max: Optional[float] = None, + p95_max: Optional[float] = None, + p99_max: Optional[float] = None, + context: str = "Test", +) -> None: + """ + Assert latency percentiles within bounds + + Args: + metrics: Metrics dictionary with latency_p50, latency_p95, latency_p99 + p50_max: Maximum p50 latency (ms), None to skip + p95_max: Maximum p95 latency (ms), None to skip + p99_max: Maximum p99 latency (ms), None to skip + context: Context description for assertion message + + Raises: + AssertionError: If any latency exceeds bounds + """ + assert_has_latency_metrics(metrics) + + if p50_max is not None: + p50 = metrics["latency_p50"] + assert p50 <= p50_max, f"{context}: p50={p50:.0f}ms > {p50_max:.0f}ms" + + if p95_max is not None: + p95 = metrics["latency_p95"] + assert p95 <= p95_max, f"{context}: p95={p95:.0f}ms > {p95_max:.0f}ms" + + if p99_max is not None: + p99 = metrics["latency_p99"] + assert p99 <= p99_max, f"{context}: p99={p99:.0f}ms > {p99_max:.0f}ms" + + +def assert_has_throughput(metrics: Dict) -> None: + """ + Assert that throughput metric is present + + Args: + metrics: Metrics dictionary + + Raises: + AssertionError: If throughput metric missing + """ + assert "throughput" in metrics, "Missing throughput metric" + + +def assert_recovery( + before_metrics: Dict, + after_metrics: Dict, + min_improvement: float = 0, + context: str = "Recovery", +) -> None: + """ + Assert that service recovered after stress/chaos + + Args: + before_metrics: Metrics before recovery + after_metrics: Metrics after recovery + min_improvement: Minimum improvement required (percentage points) + context: Context description for assertion message + + Raises: + AssertionError: If recovery insufficient + """ + before_rate = before_metrics.get("success_rate", 0) + after_rate = after_metrics.get("success_rate", 0) + improvement = after_rate - before_rate + + assert after_rate >= Thresholds.RECOVERY, ( + f"{context}: {after_rate:.1f}% < {Thresholds.RECOVERY:.1f}% " + f"(recovered from {before_rate:.1f}%)" + ) + + if min_improvement > 0: + assert ( + improvement >= min_improvement + ), f"{context}: improvement {improvement:.1f}% < {min_improvement:.1f}%" + + +def validate_metrics_structure(metrics: Dict) -> None: + """ + Validate that metrics dictionary has required structure + + Args: + metrics: Metrics dictionary to validate + + Raises: + AssertionError: If required fields missing + """ + required_fields = [ + "success_count", + "total_requests", + "success_rate", + "duration", + "throughput", + ] + + missing = [f for f in required_fields if f not in metrics] + assert not missing, f"Metrics missing required fields: {missing}" + + +def run_and_assert( + tester: LoadTester, + endpoint: str, + workers: int, + duration: int, + min_success_rate: float, + min_requests: int = 1, +) -> Dict: + """ + Run load test and assert basic requirements + + Args: + tester: LoadTester instance + endpoint: Endpoint to test (relative path) + workers: Number of concurrent workers + duration: Test duration in seconds + min_success_rate: Minimum acceptable success rate + min_requests: Minimum expected request count + + Returns: + Metrics dictionary + + Raises: + AssertionError: If assertions fail + """ + url = build_url(tester.base_url, endpoint) + metrics = tester.test_concurrency_level(url, workers, duration) + + validate_metrics_structure(metrics) + assert_success_rate(metrics, min_success_rate, endpoint) + assert_min_requests(metrics, min_requests, endpoint) + + return metrics diff --git a/tests/load/test_load.py b/tests/load/test_load.py new file mode 100644 index 00000000..7d88108e --- /dev/null +++ b/tests/load/test_load.py @@ -0,0 +1,128 @@ +#!/usr/bin/env python3 +""" +Pytest-based baseline load tests for eoAPI services + +Refactored to use parametrization, centralized config, and test helpers +to reduce duplication and improve maintainability. +""" + +import time + +import pytest +import requests + +from .config import Concurrency, Durations, Endpoints, Thresholds +from .test_helpers import ( + assert_min_requests, + assert_success_rate, + build_url, + run_and_assert, +) + + +class TestLoadBaseline: + """Basic load tests to verify service functionality under light load""" + + @pytest.mark.parametrize( + "endpoint,threshold", + [ + (Endpoints.STAC_COLLECTIONS, Thresholds.API_ENDPOINTS), + (Endpoints.RASTER_HEALTH, Thresholds.HEALTH_ENDPOINTS), + (Endpoints.VECTOR_HEALTH, Thresholds.HEALTH_ENDPOINTS), + ], + ) + def test_endpoint_light_load(self, light_tester, endpoint: str, threshold: float): + """Test endpoints with light concurrent load""" + metrics = run_and_assert( + light_tester, + endpoint, + workers=Concurrency.LIGHT, + duration=Durations.SHORT, + min_success_rate=threshold, + min_requests=1, + ) + + assert_min_requests(metrics, 1, endpoint) + + +class TestLoadScalability: + """Tests for service scalability characteristics""" + + def test_response_time_under_load(self, base_url: str): + """Test that response times remain reasonable under moderate load""" + url = build_url(base_url, Endpoints.STAC_COLLECTIONS) + + # Baseline measurement + start = time.time() + response = requests.get(url, timeout=10) + baseline_time = time.time() - start + + assert response.status_code == 200, "Baseline request failed" + + # Concurrent load measurement + session = requests.Session() + times = [] + + for _ in range(5): + start = time.time() + response = session.get(url, timeout=10) + times.append(time.time() - start) + assert response.status_code == 200, "Request under load failed" + + avg_load_time = sum(times) / len(times) + + # Response time shouldn't degrade more than 5x + max_allowed = max(baseline_time * 5, 0.1) + assert avg_load_time <= max_allowed, ( + f"Response degraded: {avg_load_time:.2f}s vs " + f"{baseline_time:.2f}s baseline (max: {max_allowed:.2f}s)" + ) + + @pytest.mark.parametrize( + "endpoint,threshold,min_reqs", + [ + (Endpoints.STAC_COLLECTIONS, Thresholds.API_ENDPOINTS, 10), + (Endpoints.RASTER_HEALTH, Thresholds.HEALTH_ENDPOINTS, 10), + (Endpoints.VECTOR_HEALTH, Thresholds.HEALTH_ENDPOINTS, 10), + ], + ) + def test_endpoint_availability( + self, light_tester, endpoint: str, threshold: float, min_reqs: int + ): + """Test that endpoints remain available under light load""" + metrics = run_and_assert( + light_tester, + endpoint, + workers=Concurrency.SINGLE + 1, + duration=Durations.QUICK, + min_success_rate=threshold, + min_requests=min_reqs, + ) + + assert_success_rate(metrics, threshold, endpoint) + + +@pytest.mark.integration +class TestLoadIntegration: + """Integration load tests across multiple services""" + + @pytest.mark.parametrize( + "endpoint,threshold", + [ + (Endpoints.STAC_COLLECTIONS, Thresholds.API_NORMAL), + (Endpoints.RASTER_HEALTH, Thresholds.HEALTH_ENDPOINTS), + (Endpoints.VECTOR_HEALTH, Thresholds.HEALTH_ENDPOINTS), + ], + ) + def test_mixed_endpoint_load(self, light_tester, endpoint: str, threshold: float): + """Test load across multiple endpoints simultaneously""" + metrics = run_and_assert( + light_tester, + endpoint, + workers=Concurrency.SINGLE + 1, + duration=Durations.QUICK, + min_success_rate=threshold, + ) + + assert_success_rate(metrics, threshold, f"mixed load: {endpoint}") + assert_min_requests(metrics, 1, endpoint) diff --git a/tests/load/test_normal.py b/tests/load/test_normal.py new file mode 100644 index 00000000..a4418b62 --- /dev/null +++ b/tests/load/test_normal.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +""" +Pytest-based normal load tests for eoAPI services + +Refactored to use parametrization, centralized config, and test helpers +to reduce duplication and improve maintainability. +""" + +import time + +from .config import Concurrency, Durations, Endpoints, Thresholds +from .test_helpers import ( + assert_has_latency_metrics, + assert_has_throughput, + assert_min_requests, + assert_success_rate, + build_url, + run_and_assert, +) + + +class TestNormalMixedLoad: + """Tests with realistic mixed workload patterns""" + + def test_mixed_endpoint_load(self, load_tester): + """Test normal load with mixed endpoints simultaneously""" + results = load_tester.run_normal_load( + duration=Durations.MODERATE, concurrent_users=8, ramp_up=10 + ) + + for endpoint, metrics in results.items(): + assert_success_rate(metrics, Thresholds.API_SUSTAINED, endpoint) + assert_min_requests(metrics, 1, endpoint) + assert_has_latency_metrics(metrics) + assert_has_throughput(metrics) + + def test_stac_workflow_simulation(self, load_tester): + """Simulate typical STAC API workflow""" + workflow = [ + ( + Endpoints.STAC_COLLECTIONS, + Concurrency.LIGHT, + Durations.NORMAL - 2, + ), + (Endpoints.STAC_SEARCH, Concurrency.LIGHT, Durations.NORMAL - 2), + ( + Endpoints.STAC_COLLECTIONS, + Concurrency.LIGHT, + Durations.NORMAL - 2, + ), + ] + + total_success = 0 + total_requests = 0 + + for endpoint, workers, duration in workflow: + metrics = run_and_assert( + load_tester, + endpoint, + workers, + duration, + min_success_rate=Thresholds.API_NORMAL, + ) + total_success += metrics["success_count"] + total_requests += metrics["total_requests"] + time.sleep(1) + + workflow_success_rate = (total_success / total_requests) * 100 + assert workflow_success_rate >= Thresholds.API_NORMAL, ( + f"STAC workflow: {workflow_success_rate:.1f}% < {Thresholds.API_NORMAL:.1f}% " + f"({total_success}/{total_requests})" + ) + + def test_realistic_traffic_pattern(self, load_tester): + """Test with realistic traffic pattern variations""" + traffic_pattern = [ + (Concurrency.SINGLE + 1, Durations.SHORT), # Low morning + (Concurrency.MODERATE, Durations.NORMAL - 2), # Moderate midday + (Concurrency.LIGHT, Durations.SHORT), # Afternoon dip + (Concurrency.MODERATE + 1, Durations.NORMAL), # Peak evening + ] + + results = [] + for workers, duration in traffic_pattern: + metrics = run_and_assert( + load_tester, + Endpoints.STAC_COLLECTIONS, + workers, + duration, + min_success_rate=Thresholds.API_ENDPOINTS, + ) + results.append(metrics["success_rate"]) + time.sleep(2) + + avg_performance = sum(results) / len(results) + assert ( + avg_performance >= Thresholds.API_ENDPOINTS + ), f"Traffic pattern handling: {avg_performance:.1f}% < {Thresholds.API_ENDPOINTS:.1f}%" + + +class TestNormalSustained: + """Tests for sustained normal load over extended periods""" + + def test_sustained_moderate_load(self, load_tester): + """Test sustained moderate load over time""" + metrics = run_and_assert( + load_tester, + Endpoints.STAC_COLLECTIONS, + workers=Concurrency.MODERATE, + duration=Durations.MODERATE + 15, + min_success_rate=Thresholds.API_ENDPOINTS, + min_requests=200, + ) + + assert_success_rate(metrics, Thresholds.API_ENDPOINTS, "sustained load") + assert_min_requests(metrics, 200, "sustained load") + + def test_consistent_response_times(self, load_tester): + """Test that response times remain consistent under normal load""" + url = build_url(load_tester.base_url, Endpoints.STAC_COLLECTIONS) + + response_times = [] + for _ in range(10): + success, latency_ms = load_tester.make_request(url) + if success: + response_times.append(latency_ms / 1000) + time.sleep(0.5) + + assert response_times, "No successful requests collected" + + avg_time = sum(response_times) / len(response_times) + max_time = max(response_times) + + assert avg_time <= 2.0, f"Avg response time {avg_time:.2f}s > 2.0s" + assert max_time <= 5.0, f"Max response time {max_time:.2f}s > 5.0s" + + def test_memory_stability_under_load(self, load_tester): + """Test that service remains stable under prolonged normal load""" + metrics = run_and_assert( + load_tester, + Endpoints.RASTER_HEALTH, + workers=Concurrency.MODERATE - 1, + duration=Durations.LONG, + min_success_rate=Thresholds.HEALTH_ENDPOINTS, + ) + + assert_success_rate(metrics, Thresholds.HEALTH_ENDPOINTS, "60s stability test") + + +class TestNormalUserPatterns: + """Tests simulating realistic user interaction patterns""" + + def test_concurrent_user_sessions(self, load_tester): + """Test multiple concurrent user sessions""" + metrics = run_and_assert( + load_tester, + Endpoints.STAC_COLLECTIONS, + workers=Concurrency.MODERATE + 1, + duration=Durations.MODERATE - 5, + min_success_rate=Thresholds.API_NORMAL, + min_requests=100, + ) + + assert_success_rate(metrics, Thresholds.API_NORMAL, "concurrent users") + assert_min_requests(metrics, 100, "concurrent users") + + def test_user_session_duration(self, load_tester): + """Test typical user session duration patterns""" + session_patterns = [ + ( + Endpoints.STAC_COLLECTIONS, + Concurrency.LIGHT, + Durations.NORMAL - 2, + ), + ( + Endpoints.STAC_SEARCH, + Concurrency.SINGLE + 1, + Durations.MODERATE // 2 + 2, + ), + (Endpoints.VECTOR_HEALTH, Concurrency.SINGLE, Durations.SHORT), + ] + + total_success_rate = 0 + for endpoint, workers, duration in session_patterns: + metrics = run_and_assert( + load_tester, + endpoint, + workers, + duration, + min_success_rate=Thresholds.API_NORMAL, + ) + total_success_rate += metrics["success_rate"] + + avg_session_success = total_success_rate / len(session_patterns) + assert ( + avg_session_success >= Thresholds.API_NORMAL + 1 + ), f"User sessions: {avg_session_success:.1f}% < {Thresholds.API_NORMAL + 1:.1f}%" + + def test_api_usage_distribution(self, load_tester): + """Test realistic API endpoint usage distribution""" + usage_pattern = [ + ( + Endpoints.STAC_COLLECTIONS, + Concurrency.MODERATE - 1, + Durations.MODERATE // 2, + ), + (Endpoints.STAC_SEARCH, Concurrency.SINGLE + 1, Durations.NORMAL), + (Endpoints.RASTER_HEALTH, Concurrency.SINGLE, Durations.SHORT), + (Endpoints.VECTOR_HEALTH, Concurrency.SINGLE, Durations.SHORT), + ] + + for endpoint, workers, duration in usage_pattern: + metrics = run_and_assert( + load_tester, + endpoint, + workers, + duration, + min_success_rate=Thresholds.API_SUSTAINED, + ) + assert_success_rate(metrics, Thresholds.API_SUSTAINED, endpoint) diff --git a/tests/load/test_stress.py b/tests/load/test_stress.py new file mode 100644 index 00000000..bbb51029 --- /dev/null +++ b/tests/load/test_stress.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +""" +Pytest-based stress tests for eoAPI services + +Refactored to use parametrization, centralized config, and test helpers +to reduce duplication and improve maintainability. +""" + +import time + +import pytest + +from .config import Concurrency, Durations, Endpoints, Thresholds +from .test_helpers import ( + assert_recovery, + assert_success_rate, + build_url, + run_and_assert, +) + + +class TestStressBreakingPoints: + """Tests to find service breaking points under increasing load""" + + @pytest.mark.slow + @pytest.mark.parametrize( + "endpoint,threshold,step,duration", + [ + (Endpoints.STAC_COLLECTIONS, Thresholds.STRESS_HIGH, 3, 5), + (Endpoints.STAC_SEARCH, Thresholds.RECOVERY, 2, 8), + ], + ) + def test_api_endpoint_stress( + self, + stress_tester, + endpoint: str, + threshold: float, + step: int, + duration: int, + ): + """Find breaking point for API endpoints""" + breaking_point, all_metrics = stress_tester.find_breaking_point( + endpoint=endpoint, + success_threshold=threshold, + step_size=step, + test_duration=duration, + cooldown=1, + ) + + min_workers = 4 if "search" in endpoint else 6 + assert ( + breaking_point >= min_workers + ), f"{endpoint} breaking point {breaking_point} < {min_workers} workers" + + @pytest.mark.parametrize( + "endpoint,threshold", + [ + (Endpoints.RASTER_HEALTH, Thresholds.API_ENDPOINTS), + (Endpoints.VECTOR_HEALTH, Thresholds.API_ENDPOINTS), + ], + ) + def test_health_endpoints_stress( + self, stress_tester, endpoint: str, threshold: float + ): + """Test health endpoints under stress - should handle high load""" + breaking_point, _ = stress_tester.find_breaking_point( + endpoint=endpoint, + success_threshold=threshold, + step_size=5, + test_duration=3, + cooldown=1, + ) + + assert breaking_point >= 10, ( + f"{endpoint} breaking point {breaking_point} < 10 workers " + f"(health endpoints should handle higher load)" + ) + + +class TestStressResilience: + """Tests for service resilience and recovery under stress""" + + @pytest.mark.slow + def test_service_recovery_after_stress(self, load_tester): + """Test that services recover properly after high stress""" + url = build_url(load_tester.base_url, Endpoints.STAC_COLLECTIONS) + + # Apply high stress + stress_metrics = load_tester.test_concurrency_level( + url, workers=Concurrency.HIGH, duration=Durations.SHORT + ) + + # Recovery period + time.sleep(3) + + # Test normal load + recovery_metrics = load_tester.test_concurrency_level( + url, workers=Concurrency.SINGLE + 1, duration=Durations.SHORT + ) + + assert_recovery(stress_metrics, recovery_metrics, context="stress recovery") + + def test_sustained_high_load(self, load_tester): + """Test service behavior under sustained high load""" + metrics = run_and_assert( + load_tester, + Endpoints.STAC_COLLECTIONS, + workers=Concurrency.NORMAL - 2, + duration=Durations.MODERATE, + min_success_rate=Thresholds.STRESS_MODERATE, + ) + + assert_success_rate(metrics, Thresholds.STRESS_MODERATE, "sustained load") + + def test_burst_load_handling(self, load_tester): + """Test handling of burst traffic patterns""" + url = build_url(load_tester.base_url, Endpoints.STAC_COLLECTIONS) + + # Burst pattern: low -> high -> low -> high + burst_pattern = [ + (Concurrency.SINGLE, Durations.QUICK), + (Concurrency.MODERATE + 7, Durations.SHORT), + (Concurrency.SINGLE + 1, Durations.QUICK), + (Concurrency.MODERATE + 10, Durations.SHORT), + ] + + results = [] + for workers, duration in burst_pattern: + metrics = load_tester.test_concurrency_level(url, workers, duration) + results.append(metrics["success_rate"]) + time.sleep(1) + + avg_performance = sum(results) / len(results) + assert avg_performance >= Thresholds.RECOVERY, ( + f"Burst handling failed: {avg_performance:.1f}% avg " + f"< {Thresholds.RECOVERY:.1f}%" + ) + + +class TestStressLimits: + """Tests to verify service limits and thresholds""" + + @pytest.mark.slow + def test_maximum_concurrent_users(self, stress_tester): + """Test behavior at maximum designed concurrent user limit""" + metrics = run_and_assert( + stress_tester, + Endpoints.STAC_COLLECTIONS, + workers=Concurrency.EXTREME, + duration=Durations.NORMAL, + min_success_rate=Thresholds.STRESS_LOW, + ) + + assert_success_rate(metrics, Thresholds.STRESS_LOW, "high concurrency") + + def test_timeout_behavior_under_load(self, load_tester): + """Test timeout behavior when system is under stress""" + # Use short timeout to trigger timeouts + from .test_helpers import create_tester + + tester = create_tester(base_url=load_tester.base_url, max_workers=20, timeout=2) + + url = build_url(tester.base_url, Endpoints.STAC_COLLECTIONS) + metrics = tester.test_concurrency_level( + url, workers=Concurrency.NORMAL, duration=Durations.NORMAL - 2 + ) + + # Should make reasonable attempts despite timeouts + assert metrics["total_requests"] >= 30, ( + f"Too few requests under stress with short timeout: " + f"{metrics['total_requests']} < 30" + ) + + def test_error_rate_under_stress(self, load_tester): + """Test that error rates remain within acceptable bounds under stress""" + url = build_url(load_tester.base_url, Endpoints.STAC_COLLECTIONS) + + metrics = load_tester.test_concurrency_level( + url, workers=Concurrency.STRESS, duration=Durations.MODERATE // 2 + ) + + error_count = metrics["total_requests"] - metrics["success_count"] + total = metrics["total_requests"] + error_rate = (error_count / total * 100) if total > 0 else 0 + + assert error_rate <= 30.0, ( + f"Error rate too high: {error_rate:.1f}% " + f"({error_count}/{total} failures)" + ) diff --git a/tests/notification/test_notifications.py b/tests/notification/test_notifications.py index 5fb60ef8..76874902 100644 --- a/tests/notification/test_notifications.py +++ b/tests/notification/test_notifications.py @@ -28,14 +28,12 @@ def test_eoapi_notifier_deployment() -> None: text=True, ) - assert result.returncode == 0, ( - "eoapi-notifier deployment not found - notifications not enabled" - ) + assert ( + result.returncode == 0 + ), "eoapi-notifier deployment not found - notifications not enabled" ready_replicas = result.stdout.strip() - assert ready_replicas == "1", ( - f"Expected 1 ready replica, got {ready_replicas}" - ) + assert ready_replicas == "1", f"Expected 1 ready replica, got {ready_replicas}" def test_cloudevents_sink_exists() -> None: @@ -57,13 +55,11 @@ def test_cloudevents_sink_exists() -> None: text=True, ) - assert result.returncode == 0 and result.stdout.strip(), ( - "Knative CloudEvents sink not found - notifications not configured" - ) + assert ( + result.returncode == 0 and result.stdout.strip() + ), "Knative CloudEvents sink not found - notifications not configured" - assert "cloudevents-sink" in result.stdout, ( - "Knative CloudEvents sink should exist" - ) + assert "cloudevents-sink" in result.stdout, "Knative CloudEvents sink should exist" def test_notification_configuration() -> None: @@ -90,12 +86,10 @@ def test_notification_configuration() -> None: config_yaml = result.stdout.strip() assert "pgstac" in config_yaml, "Should have pgstac configured" - assert "cloudevents" in config_yaml, ( - "Should have cloudevents output configured" - ) - assert "pgstac_items_change" in config_yaml or "pgstac" in config_yaml, ( - "Should have pgstac configuration" - ) + assert "cloudevents" in config_yaml, "Should have cloudevents output configured" + assert ( + "pgstac_items_change" in config_yaml or "pgstac" in config_yaml + ), "Should have pgstac configuration" def test_cloudevents_sink_logs_show_startup() -> None: @@ -122,9 +116,7 @@ def test_cloudevents_sink_logs_show_startup() -> None: # CloudEvents sink can be either a real sink or the helloworld sample container assert ( "listening on port" in logs or "helloworld: received a request" in logs - ), ( - "Knative CloudEvents sink should be running (either real sink or helloworld sample)" - ) + ), "Knative CloudEvents sink should be running (either real sink or helloworld sample)" def test_eoapi_notifier_logs_show_connection() -> None: @@ -174,9 +166,9 @@ def test_database_notification_triggers_exist() -> None: text=True, ) - assert result.stdout.strip(), ( - "eoapi-notifier not deployed - notifications not enabled" - ) + assert ( + result.stdout.strip() + ), "eoapi-notifier not deployed - notifications not enabled" # Check that the notifier pod is ready result = subprocess.run( @@ -221,7 +213,6 @@ def test_end_to_end_notification_flow(auth_token: str) -> None: # Use the ingress endpoint by default (tests run from outside cluster) stac_endpoint = os.getenv("STAC_ENDPOINT", "http://localhost/stac") namespace = os.getenv("NAMESPACE", "eoapi") - release_name = os.getenv("RELEASE_NAME", "eoapi") test_item = { "id": f"e2e-test-{int(time.time())}", @@ -246,8 +237,8 @@ def test_end_to_end_notification_flow(auth_token: str) -> None: ], } - # Get notifier logs before the operation - before_logs = subprocess.run( + # Get notifier logs before the operation (baseline) + _ = subprocess.run( [ "kubectl", "logs", @@ -272,15 +263,14 @@ def test_end_to_end_notification_flow(auth_token: str) -> None: timeout=10, ) - assert response.status_code in [200, 201], ( - f"Failed to create item: {response.text}" - ) + assert response.status_code in [200, 201], f"Failed to create item: {response.text}" # Wait briefly for notification to propagate time.sleep(3) # Get notifier logs after the operation - after_logs = subprocess.run( + # Get logs after the operation + after_logs: str = subprocess.run( [ "kubectl", "logs", @@ -295,8 +285,9 @@ def test_end_to_end_notification_flow(auth_token: str) -> None: ).stdout # Clean up + item_id = str(test_item.get("id", "")) # type: ignore[union-attr] requests.delete( - f"{stac_endpoint}/collections/noaa-emergency-response/items/{test_item['id']}", + f"{stac_endpoint}/collections/noaa-emergency-response/items/{item_id}", headers={ "Content-Type": "application/json", "Authorization": auth_token, @@ -306,10 +297,10 @@ def test_end_to_end_notification_flow(auth_token: str) -> None: # Verify notification was processed # Check if the new event appears in the after_logs + keywords: list[str] = ["pgstac_items_change", item_id, "INSERT"] assert any( - keyword in after_logs - for keyword in ["pgstac_items_change", test_item["id"], "INSERT"] - ), f"Notification for item {test_item['id']} should be in logs" + keyword in after_logs for keyword in keywords + ), f"Notification for item {item_id} should be in logs" # Check Knative CloudEvents sink logs for any CloudEvents result = subprocess.run( @@ -368,9 +359,9 @@ def test_k_sink_injection() -> None: k_sink_value = result.stdout.strip() if k_sink_value: - assert "cloudevents-sink" in k_sink_value, ( - f"K_SINK should point to CloudEvents sink service, got: {k_sink_value}" - ) + assert ( + "cloudevents-sink" in k_sink_value + ), f"K_SINK should point to CloudEvents sink service, got: {k_sink_value}" print(f"✅ K_SINK properly injected: {k_sink_value}") else: # Check if SinkBinding exists - it may take time to inject @@ -389,10 +380,7 @@ def test_k_sink_injection() -> None: text=True, ) - if ( - sinkbinding_result.returncode == 0 - and sinkbinding_result.stdout.strip() - ): + if sinkbinding_result.returncode == 0 and sinkbinding_result.stdout.strip(): pytest.fail( "SinkBinding exists but K_SINK not yet injected - may need more time" ) diff --git a/tests/notification/test_pgstac_notifications.py b/tests/notification/test_pgstac_notifications.py index 752f3486..65ae6cd8 100644 --- a/tests/notification/test_pgstac_notifications.py +++ b/tests/notification/test_pgstac_notifications.py @@ -89,9 +89,7 @@ def test_notification_triggers_exist( stac_client: Dict[str, Any], notifications_enabled: bool ) -> None: """Test that notification system is working by performing a simple operation.""" - assert notifications_enabled, ( - "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" - ) + assert notifications_enabled, "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" namespace = os.getenv("NAMESPACE", "eoapi") result = subprocess.run( @@ -133,9 +131,10 @@ def test_notification_triggers_exist( timeout=stac_client["timeout"], ) - assert response.status_code in [200, 201], ( - f"Failed to create test item: {response.text}" - ) + assert response.status_code in [ + 200, + 201, + ], f"Failed to create test item: {response.text}" time.sleep(2) logs = get_notifier_logs_since(before_time) @@ -147,9 +146,7 @@ def test_notification_triggers_exist( ) assert ( - "pgstac_items_change" in logs - or "INSERT" in logs - or test_item_id in logs + "pgstac_items_change" in logs or "INSERT" in logs or test_item_id in logs ), "Notification system should process item changes" @@ -157,9 +154,7 @@ def test_insert_notification( stac_client: Dict[str, Any], notifications_enabled: bool ) -> None: """Test that INSERT operations trigger notifications.""" - assert notifications_enabled, ( - "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" - ) + assert notifications_enabled, "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" test_item_id = f"test-insert-{int(time.time())}" test_item = { @@ -183,9 +178,7 @@ def test_insert_notification( timeout=stac_client["timeout"], ) - assert response.status_code in [200, 201], ( - f"Failed to create item: {response.text}" - ) + assert response.status_code in [200, 201], f"Failed to create item: {response.text}" time.sleep(2) logs = get_notifier_logs_since(before_time) @@ -206,9 +199,7 @@ def test_update_notification( stac_client: Dict[str, Any], notifications_enabled: bool ) -> None: """Test that UPDATE operations trigger notifications.""" - assert notifications_enabled, ( - "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" - ) + assert notifications_enabled, "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" test_item_id = f"test-update-{int(time.time())}" test_item = { @@ -233,14 +224,12 @@ def test_update_notification( timeout=stac_client["timeout"], ) - assert response.status_code in [200, 201], ( - f"Failed to create item: {response.text}" - ) + assert response.status_code in [200, 201], f"Failed to create item: {response.text}" before_time = time.time() - test_item["properties"]["description"] = "Updated for notification test" - test_item["properties"]["test_version"] = "v2" + test_item["properties"]["description"] = "Updated for notification test" # type: ignore[index] + test_item["properties"]["test_version"] = "v2" # type: ignore[index] response = requests.put( f"{stac_client['base_url']}/collections/noaa-emergency-response/items/{test_item_id}", @@ -249,9 +238,7 @@ def test_update_notification( timeout=stac_client["timeout"], ) - assert response.status_code in [200, 201], ( - f"Failed to update item: {response.text}" - ) + assert response.status_code in [200, 201], f"Failed to update item: {response.text}" time.sleep(2) logs = get_notifier_logs_since(before_time) @@ -273,9 +260,7 @@ def test_delete_notification( stac_client: Dict[str, Any], notifications_enabled: bool ) -> None: """Test that DELETE operations trigger notifications.""" - assert notifications_enabled, ( - "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" - ) + assert notifications_enabled, "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" test_item_id = f"test-delete-{int(time.time())}" test_item = { @@ -297,9 +282,7 @@ def test_delete_notification( timeout=stac_client["timeout"], ) - assert response.status_code in [200, 201], ( - f"Failed to create item: {response.text}" - ) + assert response.status_code in [200, 201], f"Failed to create item: {response.text}" before_time = time.time() @@ -309,9 +292,7 @@ def test_delete_notification( timeout=stac_client["timeout"], ) - assert response.status_code in [200, 204], ( - f"Failed to delete item: {response.text}" - ) + assert response.status_code in [200, 204], f"Failed to delete item: {response.text}" time.sleep(2) logs = get_notifier_logs_since(before_time) @@ -326,9 +307,7 @@ def test_bulk_operations_notification( stac_client: Dict[str, Any], notifications_enabled: bool ) -> None: """Test that bulk operations trigger appropriate notifications.""" - assert notifications_enabled, ( - "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" - ) + assert notifications_enabled, "PgSTAC notifications not enabled - set notifications.sources.pgstac=true to test" test_items = [] for i in range(3): @@ -356,16 +335,15 @@ def test_bulk_operations_notification( timeout=stac_client["timeout"], ) - assert response.status_code in [200, 201], ( - f"Failed to create item: {response.text}" - ) + assert response.status_code in [ + 200, + 201, + ], f"Failed to create item: {response.text}" time.sleep(3) logs = get_notifier_logs_since(before_time) - found_count = sum( - 1 for item in test_items if f"item_id='{item['id']}'" in logs - ) + found_count = sum(1 for item in test_items if f"item_id='{item['id']}'" in logs) for item in test_items: requests.delete( @@ -374,6 +352,4 @@ def test_bulk_operations_notification( timeout=stac_client["timeout"], ) - assert found_count >= 2, ( - f"Expected at least 2 notifications, found {found_count}" - ) + assert found_count >= 2, f"Expected at least 2 notifications, found {found_count}" diff --git a/tests/requirements.txt b/tests/requirements.txt index 19142e14..5b03d347 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -2,6 +2,10 @@ httpx==0.27.0 requests==2.31.0 +urllib3==2.0.7 pytest==8.3.2 pytest-timeout==2.3.1 + +# Optional: Prometheus integration for load testing metrics +prometheus-client==0.20.0