diff --git a/Cargo.lock b/Cargo.lock index 5cc606f2a..79faa52de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1310,7 +1310,7 @@ dependencies = [ "crossterm_winapi", "mio", "parking_lot", - "rustix", + "rustix 0.38.44", "signal-hook", "signal-hook-mio", "winapi", @@ -2928,6 +2928,12 @@ version = "0.4.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" +[[package]] +name = "linux-raw-sys" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" + [[package]] name = "litemap" version = "0.7.4" @@ -3515,6 +3521,7 @@ dependencies = [ "static-files", "sysinfo", "temp-dir", + "tempfile", "thiserror 2.0.11", "tokio", "tokio-stream", @@ -3730,7 +3737,7 @@ dependencies = [ "hex", "lazy_static", "procfs-core", - "rustix", + "rustix 0.38.44", ] [[package]] @@ -4251,7 +4258,20 @@ dependencies = [ "bitflags 2.8.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.15", + "windows-sys 0.59.0", +] + +[[package]] +name = "rustix" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" +dependencies = [ + "bitflags 2.8.0", + "errno", + "libc", + "linux-raw-sys 0.9.4", "windows-sys 0.59.0", ] @@ -4848,15 +4868,14 @@ checksum = "bc1ee6eef34f12f765cb94725905c6312b6610ab2b0940889cfe58dae7bc3c72" [[package]] name = "tempfile" -version = "3.16.0" +version = "3.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38c246215d7d24f48ae091a2902398798e05d978b24315d6efbc00ede9a8bb91" +checksum = "e8a64e3985349f2441a1a9ef0b853f869006c3855f2cda6862a94d26ebb9d6a1" dependencies = [ - "cfg-if", "fastrand 2.3.0", "getrandom 0.3.1", "once_cell", - "rustix", + "rustix 1.0.7", "windows-sys 0.59.0", ] diff --git a/Cargo.toml b/Cargo.toml index aa2d2de11..191640b06 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,6 +122,7 @@ thiserror = "2.0" ulid = { version = "1.0", features = ["serde"] } xxhash-rust = { version = "0.8", features = ["xxh3"] } futures-core = "0.3.31" +tempfile = "3.20.0" [build-dependencies] cargo_toml = "0.21" diff --git a/resources/ingest_demo_data.sh b/resources/ingest_demo_data.sh new file mode 100755 index 000000000..94216ad72 --- /dev/null +++ b/resources/ingest_demo_data.sh @@ -0,0 +1,918 @@ +#!/usr/bin/env bash + +# Configuration +P_URL=${P_URL:-"http://localhost:8000"} +P_USERNAME=${P_USERNAME:-"admin"} +P_PASSWORD=${P_PASSWORD:-"admin"} +P_STREAM=${P_STREAM:-"demodata"} +ACTION=${ACTION:-"ingest"} +TARGET_RECORDS=10000 +BATCH_SIZE=1000 + +# Pre-compute auth header +AUTH_HEADER="Authorization: Basic $(echo -n "$P_USERNAME:$P_PASSWORD" | base64)" + +# Logging functions +log_error() { + echo "$@" >&2 +} + +# Common curl function with retry logic +curl_with_retry() { + local url="$1" + local method="$2" + local data="$3" + local content_type="${4:-application/json}" + local max_retries="${5:-3}" + local data_file="$6" + local retry_count=0 + + local temp_file="" + # Create temp file if data is provided (either as string or file) + if [[ -n "$data_file" ]]; then + temp_file="$data_file" + elif [[ -n "$data" ]]; then + temp_file=$(mktemp) + if [[ $? -ne 0 ]]; then + return 1 + fi + printf "%s" "$data" > "$temp_file" + fi + + while [[ $retry_count -lt $max_retries ]]; do + local max_time=$((10 + (retry_count * 10))) + local connect_timeout=5 + + local curl_args=( + -s + --max-time "$max_time" + --connect-timeout "$connect_timeout" + -H "Content-Type: $content_type" + -H "$AUTH_HEADER" + ) + + # Add stream header for ingestion requests + if [[ "$url" == *"/ingest"* ]]; then + curl_args+=(-H "X-P-STREAM: $P_STREAM") + fi + + # Add method and data + if [[ "$method" == "POST" ]]; then + curl_args+=(-X POST) + if [[ -n "$temp_file" ]]; then + curl_args+=(--data-binary "@$temp_file") + elif [[ -n "$data" ]]; then + curl_args+=(-d "$data") + fi + elif [[ "$method" == "PUT" ]]; then + curl_args+=(-X PUT) + if [[ -n "$temp_file" ]]; then + curl_args+=(--data-binary "@$temp_file") + elif [[ -n "$data" ]]; then + curl_args+=(-d "$data") + fi + fi + + # Add URL + curl_args+=("$url") + + # Create temporary files for response body and stderr + local response_file + response_file=$(mktemp) || { log_error "Failed to create temporary file"; return 1; } + local stderr_file + stderr_file=$(mktemp) || { log_error "Failed to create temporary file"; rm -f "$response_file"; return 1; } + + # Add options to capture status code separately + curl_args+=("-w" "%{http_code}" "-o" "$response_file") + + # Execute curl and capture status code and stderr + local status_code + status_code=$(curl "${curl_args[@]}" 2>"$stderr_file" | tr -d '\n') + local curl_exit_code=$? + + # Check curl exit code + if [[ $curl_exit_code -eq 0 ]]; then + local response_body + response_body=$(cat "$response_file" 2>/dev/null) + + # Clean up temporary files + rm -f "$response_file" "$stderr_file" + + # Clean up temp data file (only if we created it) + if [[ -n "$temp_file" && -z "$data_file" ]]; then + rm -f "$temp_file" + fi + + if [[ "$status_code" =~ ^2[0-9][0-9]$ ]]; then + echo "$response_body" + return 0 + else + log_error "HTTP $status_code: Request failed" + return 1 + fi + elif [[ $curl_exit_code -eq 28 ]]; then + # Timeout - retry + rm -f "$response_file" "$stderr_file" + retry_count=$((retry_count + 1)) + sleep 1 + else + # Other curl error - cleanup and exit + rm -f "$response_file" "$stderr_file" + break + fi + done + + # Clean up temp file on failure (only if we created it) + if [[ -n "$temp_file" && -z "$data_file" ]]; then + rm -f "$temp_file" + fi + + return 1 +} + +# ==================== INGEST FUNCTIONALITY ==================== + +# Pre-compute static data for ingestion +init_ingest_data() { + TRACE_IDS=() + SPAN_IDS=() + IP_ADDRESSES=() + TIMESTAMPS=() + UNIX_NANOS=() + + # Generate 100 of each for cycling through + for i in {1..100}; do + TRACE_IDS+=("$(printf '%032x' $((RANDOM * RANDOM)))") + SPAN_IDS+=("$(printf '%016x' $((RANDOM * RANDOM)))") + IP_ADDRESSES+=("192.168.$((RANDOM % 256)).$((RANDOM % 256))") + TIMESTAMPS+=("$(date -u +%Y-%m-%dT%H:%M:%S.%03dZ -d "+$((RANDOM % 3600)) seconds")") + UNIX_NANOS+=("$(date +%s)$(printf '%09d' $((RANDOM % 1000000000)))") + done + + # Static arrays + METHODS=("GET" "GET" "GET" "GET" "POST" "PUT") + STATUS_CODES=(200 200 200 201 400 500) + SERVICES=("frontend" "api" "auth" "cart" "payment") + ENDPOINTS=("/products" "/cart" "/login" "/checkout" "/search") + USER_AGENTS=("curl/7.88.1" "python-requests/2.32.3" "Mozilla/5.0") + CLUSTERS=("web" "api" "db") +} + +# Generate batch data +generate_batch() { + batch_size=$1 + + if [[ -z "$batch_size" || "$batch_size" -eq 0 ]]; then + return 1 + fi + + if [[ ${#TRACE_IDS[@]} -eq 0 ]]; then + return 1 + fi + + batch_data='[' + + for ((i=0; i BATCH_SIZE ? BATCH_SIZE : remaining)) + + # Progress tracking + progress=$((RECORDS_SENT * 100 / TARGET_RECORDS)) + elapsed=$(($(date +%s) - START_TIME)) + rate=$((RECORDS_SENT / (elapsed == 0 ? 1 : elapsed))) + + echo "Progress: $progress% ($RECORDS_SENT/$TARGET_RECORDS) - Rate: $rate records/sec" + + # Generate and send batch + batch_data=$(generate_batch $current_batch_size) + + if [[ -z "$batch_data" ]]; then + echo "Failed to generate batch data" + exit 1 + fi + + if send_batch "$batch_data"; then + RECORDS_SENT=$((RECORDS_SENT + current_batch_size)) + else + echo "Failed to send batch" + exit 1 + fi + + sleep 0.1 + done + + # Final statistics + TOTAL_TIME=$(($(date +%s) - START_TIME)) + FINAL_RATE=$((TARGET_RECORDS / (TOTAL_TIME == 0 ? 1 : TOTAL_TIME))) + + echo "Ingestion completed: $TARGET_RECORDS records in $TOTAL_TIME seconds (avg: $FINAL_RATE records/sec)" +} + +# ==================== FILTERS FUNCTIONALITY ==================== + +# Create SQL filters +create_sql_filters() { + echo "Creating SQL filters..." + + sql_filters=( + "error_logs|Monitor all ERROR and FATAL severity events|SELECT * FROM $P_STREAM WHERE severity_text IN ('ERROR', 'FATAL') ORDER BY time_unix_nano DESC LIMIT 100" + "high_response_time|Identify requests with extended response times|SELECT \"service.name\", \"url.path\", body FROM $P_STREAM WHERE body LIKE '%duration%' ORDER BY time_unix_nano DESC LIMIT 50" + "service_health_summary|Service health metrics by severity|SELECT \"service.name\", severity_text, COUNT(*) as count FROM $P_STREAM GROUP BY \"service.name\", severity_text ORDER BY count DESC" + "api_endpoint_performance|API endpoint request patterns|SELECT \"url.path\", COUNT(*) as request_count, \"service.name\" FROM $P_STREAM GROUP BY \"url.path\", \"service.name\" ORDER BY request_count DESC LIMIT 20" + "authentication_failures|Monitor auth-related warnings and errors|SELECT * FROM $P_STREAM WHERE \"url.path\" LIKE '%login%' AND severity_text IN ('WARN', 'ERROR') ORDER BY time_unix_nano DESC LIMIT 100" + "upstream_cluster_analysis|Request distribution across clusters|SELECT \"upstream.cluster\", COUNT(*) as request_count, \"service.name\" FROM $P_STREAM GROUP BY \"upstream.cluster\", \"service.name\" ORDER BY request_count DESC" + "trace_analysis|Multi-span traces for distributed tracking|SELECT trace_id, COUNT(*) as span_count, \"service.name\" FROM $P_STREAM GROUP BY trace_id, \"service.name\" HAVING span_count > 1 ORDER BY span_count DESC LIMIT 10" + "user_agent_distribution|Client types and user agent patterns|SELECT \"user_agent.original\", COUNT(*) as usage_count FROM $P_STREAM GROUP BY \"user_agent.original\" ORDER BY usage_count DESC LIMIT 15" + "source_address_analysis|Request distribution by source IP|SELECT \"source.address\", COUNT(*) as request_count, COUNT(DISTINCT \"service.name\") as services_accessed FROM $P_STREAM GROUP BY \"source.address\" ORDER BY request_count DESC LIMIT 20" + "severity_timeline|Severity trends over time|SELECT \"severity_text\", COUNT(*) as count, \"service.name\" FROM $P_STREAM GROUP BY \"severity_text\", \"service.name\" ORDER BY count DESC" + ) + + sql_success_count=0 + + for filter_config in "${sql_filters[@]}"; do + IFS='|' read -r name description query <<< "$filter_config" + + # Escape quotes for JSON + escaped_query=$(echo "$query" | sed 's/"/\\"/g') + escaped_desc=$(echo "$description" | sed 's/"/\\"/g') + + json="{\"stream_name\":\"sql\",\"filter_name\":\"$name\",\"filter_description\":\"$escaped_desc\",\"query\":{\"filter_type\":\"sql\",\"filter_query\":\"$escaped_query\"},\"time_filter\":null}" + + if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3; then + sql_success_count=$((sql_success_count + 1)) + echo "Created SQL filter: $name" + else + echo "Failed to create SQL filter: $name" + fi + + sleep 0.5 + done + + echo "Created $sql_success_count SQL filters" + sleep 3 +} + +# Create saved filters +create_saved_filters() { + echo "Creating saved filters..." + + saved_filters=( + "service_errors|Monitor service errors and failures|SELECT * FROM $P_STREAM WHERE severity_text IN ('ERROR', 'FATAL') LIMIT 500|Ingestion Time,Data,service.name,severity_text,url.path|service.name" + "auth_security_events|Authentication and authorization monitoring|SELECT * FROM $P_STREAM WHERE url.path LIKE '%login%' AND severity_text IN ('WARN', 'ERROR', 'FATAL') LIMIT 500|Ingestion Time,Data,service.name,severity_text,source.address,user_agent.original|severity_text" + "high_latency_requests|High response time requests|SELECT * FROM $P_STREAM WHERE body LIKE '%duration%' LIMIT 500|Ingestion Time,Data,service.name,url.path,upstream.cluster,body|service.name" + "upstream_cluster_health|Upstream cluster performance|SELECT * FROM $P_STREAM WHERE upstream.cluster IS NOT NULL LIMIT 500|Ingestion Time,Data,upstream.cluster,service.name,severity_text,destination.address|upstream.cluster" + "api_endpoint_monitoring|API endpoint usage patterns|SELECT * FROM $P_STREAM WHERE url.path IS NOT NULL LIMIT 500|Ingestion Time,Data,url.path,service.name,severity_text,source.address|url.path" + "trace_correlation_view|Correlated traces for distributed tracking|SELECT * FROM $P_STREAM WHERE trace_id IS NOT NULL AND span_id IS NOT NULL LIMIT 500|Ingestion Time,Data,trace_id,span_id,service.name,url.path|trace_id" + "user_agent_analysis|Client types and patterns|SELECT * FROM $P_STREAM WHERE user_agent.original IS NOT NULL LIMIT 500|Ingestion Time,Data,user_agent.original,source.address,url.path,service.name|user_agent.original" + "network_monitoring|Network traffic and server interactions|SELECT * FROM $P_STREAM WHERE source.address IS NOT NULL LIMIT 500|Ingestion Time,Data,source.address,destination.address,service.name,severity_text,url.path|source.address" + "service_overview|Comprehensive service activity view|SELECT * FROM $P_STREAM LIMIT 500|Ingestion Time,Data,service.name,url.path,source.address,destination.address,upstream.cluster|service.name" + "recent_activity|Most recent system activity|SELECT * FROM $P_STREAM ORDER BY time_unix_nano DESC LIMIT 500|Ingestion Time,Data,service.name,severity_text,url.path,source.address|severity_text" + ) + + saved_success_count=0 + + for filter_config in "${saved_filters[@]}"; do + IFS='|' read -r name description query visible_columns group_by <<< "$filter_config" + + # Escape quotes + escaped_query=$(echo "$query" | sed 's/"/\\"/g') + escaped_desc=$(echo "$description" | sed 's/"/\\"/g') + + # Convert visible columns to JSON array + IFS=',' read -ra col_array <<< "$visible_columns" + visible_cols_json="" + for i in "${!col_array[@]}"; do + [[ $i -gt 0 ]] && visible_cols_json+="," + visible_cols_json+="\"${col_array[$i]}\"" + done + + json="{\"stream_name\":\"$P_STREAM\",\"filter_name\":\"$name\",\"filter_description\":\"$escaped_desc\",\"query\":{\"filter_type\":\"filter\",\"filter_query\":\"$escaped_query\"},\"time_filter\":null,\"tableConfig\":{\"visibleColumns\":[$visible_cols_json],\"pinnedColumns\":[]},\"groupBy\":\"$group_by\"}" + + if curl_with_retry "$P_URL/api/v1/filters" "POST" "$json" "application/json" 3; then + saved_success_count=$((saved_success_count + 1)) + echo "Created saved filter: $name" + else + echo "Failed to create saved filter: $name" + fi + + sleep 0.5 + done + + echo "Created $saved_success_count saved filters" +} + +# Main filters function +run_filters() { + echo "Starting filter creation..." + create_sql_filters + create_saved_filters + echo "Filter creation completed" +} + +# ==================== ALERTS FUNCTIONALITY ==================== + +# Create webhook target +create_target() { + echo "Creating webhook target..." >&2 + + response=$(curl -s -H "Content-Type: application/json" -H "$AUTH_HEADER" -X POST "$P_URL/api/v1/targets" -d @- << EOF +{"type":"webhook","endpoint":"https://webhook.site/8e1f26bd-2f5b-47a2-9d0b-3b3dabb30710","name":"Test Webhook","auth":{"username":"","password":""},"skipTlsCheck":false,"notificationConfig":{"interval":1,"times":1}} +EOF +) + + curl_exit_code=$? + + if [[ $curl_exit_code -eq 0 && -n "$response" ]]; then + # Extract target ID from response + target_id=$(echo "$response" | jq -r '.id // empty') + if [[ -n "$target_id" ]]; then + echo "Target created successfully with ID: $target_id" >&2 + echo "$target_id" + return 0 + else + echo "Failed to extract target ID from response" >&2 + echo "Response: $response" >&2 + return 1 + fi + else + echo "Failed to create target" >&2 + echo "Curl exit code: $curl_exit_code" >&2 + echo "Response: $response" >&2 + return 1 + fi +} + +# Create alerts +create_alerts() { + local target_id="$1" + + if [[ -z "$target_id" ]]; then + echo "Target ID is required to create alerts" + return 1 + fi + + echo "Creating alerts with target ID: $target_id" + + # Alert 1: Error Count (severity_number = 18) + alert1_json="{\"severity\":\"high\",\"title\":\"error count\",\"stream\":\"$P_STREAM\",\"alertType\":\"threshold\",\"aggregates\":{\"aggregateConfig\":[{\"aggregateFunction\":\"count\",\"conditions\":{\"operator\":null,\"conditionConfig\":[{\"column\":\"severity_number\",\"operator\":\"=\",\"value\":\"18\"}]},\"column\":\"severity_number\",\"operator\":\">\",\"value\":1000}]},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" + + response1=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert1_json" "application/json" 3) + if [[ $? -eq 0 ]]; then + echo "Alert 1 (Error Count) created successfully" + else + echo "Failed to create Alert 1 (Error Count)" + echo "Response: $response1" + fi + + # Alert 2: 400 Errors + alert2_json="{\"severity\":\"critical\",\"title\":\"400 Errors\",\"stream\":\"$P_STREAM\",\"alertType\":\"threshold\",\"aggregates\":{\"aggregateConfig\":[{\"aggregateFunction\":\"count\",\"conditions\":{\"operator\":null,\"conditionConfig\":[{\"column\":\"body\",\"operator\":\"contains\",\"value\":\"400\"}]},\"column\":\"body\",\"operator\":\">\",\"value\":10}]},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" + + response2=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert2_json" "application/json" 3) + if [[ $? -eq 0 ]]; then + echo "Alert 2 (400 Errors) created successfully" + else + echo "Failed to create Alert 2 (400 Errors)" + echo "Response: $response2" + fi + + # Alert 3: Trace ID or Span ID null + alert3_json="{\"severity\":\"high\",\"title\":\"Trace ID or Span ID null\",\"stream\":\"$P_STREAM\",\"alertType\":\"threshold\",\"aggregates\":{\"aggregateConfig\":[{\"aggregateFunction\":\"count\",\"conditions\":{\"operator\":null,\"conditionConfig\":[{\"column\":\"trace_id\",\"operator\":\"is null\",\"value\":\"\"}]},\"column\":\"trace_id\",\"operator\":\">\",\"value\":0}]},\"evalConfig\":{\"rollingWindow\":{\"evalStart\":\"5h\",\"evalEnd\":\"now\",\"evalFrequency\":1}},\"targets\":[\"$target_id\"]}" + + response3=$(curl_with_retry "$P_URL/api/v1/alerts" "POST" "$alert3_json" "application/json" 3) + if [[ $? -eq 0 ]]; then + echo "Alert 3 (Trace ID null) created successfully" + else + echo "Failed to create Alert 3 (Trace ID null)" + echo "Response: $response3" + fi + + sleep 1 +} + +# Main alerts function +run_alerts() { + echo "Starting alert creation..." + + # Create target and get ID + target_id=$(create_target) + + if [[ $? -eq 0 && -n "$target_id" ]]; then + echo "Target creation successful, proceeding with alerts..." + sleep 2 + + # Create alerts using the target ID + create_alerts "$target_id" + echo "Alert creation completed" + else + echo "Failed to create target, cannot proceed with alerts" + return 1 + fi +} + +# ==================== DASHBOARDS FUNCTIONALITY ==================== + +# Create dashboard +create_dashboard() { + echo "Creating dashboard..." >&2 + + response=$(curl -s -H "Content-Type: application/json" -H "$AUTH_HEADER" -X POST "$P_URL/api/v1/dashboards" -d @- << EOF +{ + "title": "Demo Dashboard", + "tags": [ + "demo", + "oss" + ] +} +EOF +) + + curl_exit_code=$? + + if [[ $curl_exit_code -eq 0 && -n "$response" ]]; then + # Extract dashboard ID from response + dashboard_id=$(echo "$response" | jq -r '.dashboardId // empty') + if [[ -n "$dashboard_id" ]]; then + echo "Dashboard created successfully with ID: $dashboard_id" >&2 + echo "$dashboard_id" + return 0 + else + echo "Failed to extract dashboard ID from response" >&2 + echo "Response: $response" >&2 + return 1 + fi + else + echo "Failed to create dashboard" >&2 + echo "Curl exit code: $curl_exit_code" >&2 + echo "Response: $response" >&2 + return 1 + fi +} + +# Update dashboard with tiles +update_dashboard() { + local dashboard_id="$1" + + if [[ -z "$dashboard_id" ]]; then + echo "Dashboard ID is required to update dashboard" + return 1 + fi + + echo "Updating dashboard with ID: $dashboard_id" + + # Create the dashboard configuration with updated tiles + dashboard_config=$(cat << EOF +{ + "title": "Demo Dashboard", + "dashboardId": "$dashboard_id", + "tags": [ + "demo", + "oss" + ], + "isFavorite": true, + "dashboardType": "Dashboard", + "tiles": [ + { + "tile_id": "01K017X5NG2SZ20PJ0EEYG9376", + "title": "Service Error Rate Over Time", + "chartQuery": { + "x": { + "fields": [ + { + "name": "p_timestamp", + "type": "time" + } + ], + "granularity": "minute" + }, + "y": { + "fields": [ + { + "name": "severity_number", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "severity_text" + ] + }, + "filters": [] + }, + "dbName": "$P_STREAM", + "chartType": "timeseries", + "config": { + "type": "bar", + "colourScheme": "forest", + "layout": { + "legendPosition": "bottom" + }, + "axes": { + "x": { + "field": "time_bucket", + "title": "Time" + }, + "y": { + "field": "COUNT_severity_number", + "title": "Event Count" + } + }, + "advanced": { + "dataLabels": { + "enabled": false + }, + "tooltip": { + "enabled": true, + "mode": "index", + "intersect": false + } + } + }, + "layout": { + "w": 12, + "h": 8, + "x": 0, + "y": 0, + "i": "01K017X5NG2SZ20PJ0EEYG9376", + "moved": false, + "static": false + } + }, + { + "tile_id": "01K027HTD413T9MP39KYEE42GS", + "title": "Request Count by Service", + "chartQuery": { + "x": { + "fields": [ + { + "name": "service.name", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "service.name" + ] + }, + "y": { + "fields": [ + { + "name": "url.path", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "url.path" + ] + }, + "filters": [] + }, + "dbName": "$P_STREAM", + "chartType": "line", + "config": { + "type": "line", + "colourScheme": "cyber", + "layout": { + "legendPosition": "bottom" + }, + "axes": { + "x": { + "field": "service.name", + "title": "Service" + }, + "y": { + "field": "COUNT_url.path", + "title": "Request Count" + } + }, + "advanced": { + "dataLabels": { + "enabled": false + }, + "tooltip": { + "enabled": true, + "mode": "index", + "intersect": false + } + } + }, + "layout": { + "w": 4, + "h": 8, + "x": 0, + "y": 8, + "i": "01K027HTD413T9MP39KYEE42GS", + "moved": false, + "static": false + } + }, + { + "tile_id": "01K027MQ5K75VSCFGVVN86MBMJ", + "title": "Response Status Distribution by Upstream Cluster", + "chartQuery": { + "x": { + "fields": [ + { + "name": "upstream.cluster", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "upstream.cluster" + ] + }, + "y": { + "fields": [ + { + "name": "severity_text", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "severity_text" + ] + }, + "filters": [] + }, + "dbName": "$P_STREAM", + "chartType": "bar", + "config": { + "type": "bar", + "colourScheme": "dusk", + "layout": { + "legendPosition": "bottom" + }, + "axes": { + "x": { + "field": "upstream.cluster", + "title": "Upstream Cluster" + }, + "y": { + "field": "COUNT_severity_text", + "title": "Response Count" + } + }, + "advanced": { + "dataLabels": { + "enabled": false + }, + "tooltip": { + "enabled": true, + "mode": "index", + "intersect": false + } + } + }, + "layout": { + "w": 8, + "h": 8, + "x": 4, + "y": 8, + "i": "01K027MQ5K75VSCFGVVN86MBMJ", + "moved": false, + "static": false + } + }, + { + "tile_id": "01K027RM6R3EQ6K960ECSKP5PX", + "title": "User Agent Distribution by Source Address", + "chartQuery": { + "x": { + "fields": [ + { + "name": "source.address", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "source.address" + ] + }, + "y": { + "fields": [ + { + "name": "user_agent.original", + "aggregate": "COUNT" + } + ], + "groupBy": [ + "user_agent.original" + ] + }, + "filters": [] + }, + "dbName": "$P_STREAM", + "chartType": "area", + "config": { + "type": "area", + "colourScheme": "forest", + "layout": { + "legendPosition": "bottom" + }, + "axes": { + "x": { + "field": "source.address", + "title": "Source IP Address" + }, + "y": { + "field": "COUNT_user_agent.original", + "title": "User Agent Count" + } + }, + "advanced": { + "dataLabels": { + "enabled": false + }, + "tooltip": { + "enabled": true, + "mode": "index", + "intersect": false + } + } + }, + "layout": { + "w": 7, + "h": 7, + "x": 0, + "y": 16, + "i": "01K027RM6R3EQ6K960ECSKP5PX", + "moved": false, + "static": false + } + } + ] +} +EOF +) + + response=$(curl_with_retry "$P_URL/api/v1/dashboards/$dashboard_id" "PUT" "$dashboard_config" "application/json" 3) + if [[ $? -eq 0 ]]; then + echo "Dashboard updated successfully" + return 0 + else + echo "Failed to update dashboard" + echo "Response: $response" + return 1 + fi +} + +# Main dashboards function +run_dashboards() { + echo "Starting dashboard creation..." + + # Create dashboard and get ID + dashboard_id=$(create_dashboard) + + if [[ $? -eq 0 && -n "$dashboard_id" ]]; then + echo "Dashboard creation successful, proceeding with tiles..." + sleep 2 + + # Update dashboard with tiles + update_dashboard "$dashboard_id" + echo "Dashboard creation completed" + else + echo "Failed to create dashboard, cannot proceed with tiles" + return 1 + fi +} + +# ==================== MAIN EXECUTION ==================== + +# Display usage +show_usage() { + echo "Usage: $0 [ACTION=ingest|filters|alerts|dashboards|all]" + echo "" + echo "Environment variables:" + echo " P_URL - API URL (default: http://localhost:8000)" + echo " P_USERNAME - Username (default: admin)" + echo " P_PASSWORD - Password (default: admin)" + echo " P_STREAM - Stream name (default: demodata)" + echo " ACTION - Action to perform (default: ingest)" + echo "" + echo "Actions:" + echo " ingest - Ingest demo log data" + echo " filters - Create SQL and saved filters" + echo " alerts - Create alerts and webhook targets" + echo " dashboards - Create demo dashboard with tiles" + echo " all - Run all actions in sequence" + echo "" + echo "Examples:" + echo " ACTION=ingest ./script.sh" + echo " ACTION=filters P_STREAM=mystream ./script.sh" + echo " ACTION=dashboards ./script.sh" + echo " ACTION=all ./script.sh" +} + +# Main execution logic +main() { + case "$ACTION" in + "ingest") + run_ingest + ;; + "filters") + run_filters + ;; + "alerts") + run_alerts + ;; + "dashboards") + run_dashboards + ;; + "all") + echo "Running all actions..." + run_ingest + echo "Waiting before creating filters..." + sleep 5 + run_filters + echo "Waiting before creating alerts..." + sleep 5 + run_alerts + echo "Waiting before creating dashboards..." + sleep 5 + run_dashboards + echo "All actions completed" + ;; + "help"|"--help"|"-h") + show_usage + exit 0 + ;; + *) + echo "Unknown action: $ACTION" + show_usage + exit 1 + ;; + esac +} + +# Execute main function +main "$@" +exit $? \ No newline at end of file diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index 6eab244bc..39cc4ded9 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -156,6 +156,62 @@ pub async fn sync_streams_with_ingestors( ).await } +// forward the demo data request to one of the live ingestor +pub async fn get_demo_data_from_ingestor(action: &str) -> Result<(), PostError> { + let ingestor_infos: Vec = + get_node_info(NodeType::Ingestor).await.map_err(|err| { + error!("Fatal: failed to get ingestor info: {:?}", err); + PostError::Invalid(err) + })?; + + let mut live_ingestors: Vec = Vec::new(); + for ingestor in ingestor_infos { + if utils::check_liveness(&ingestor.domain_name).await { + live_ingestors.push(ingestor); + break; + } + } + + if live_ingestors.is_empty() { + return Err(PostError::Invalid(anyhow::anyhow!( + "No live ingestors found" + ))); + } + + // Pick the first live ingestor + let ingestor = &live_ingestors[0]; + + let url = format!( + "{}{}/demodata?action={action}", + ingestor.domain_name, + base_path_without_preceding_slash() + ); + + let res = INTRA_CLUSTER_CLIENT + .get(url) + .header(header::AUTHORIZATION, &ingestor.token) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await + .map_err(|err| { + error!( + "Fatal: failed to forward request to ingestor: {}\n Error: {:?}", + ingestor.domain_name, err + ); + PostError::Invalid(err.into()) + })?; + + if !res.status().is_success() { + return Err(PostError::Invalid(anyhow::anyhow!( + "failed to forward request to ingestor: {}\nResponse status: {}", + ingestor.domain_name, + res.status() + ))); + } + + Ok(()) +} + // forward the role update request to all ingestors to keep them in sync pub async fn sync_users_with_roles_with_ingestors( username: &str, @@ -919,7 +975,7 @@ where for result in results { match result { Ok(Some(node_metrics)) => metrics.push(node_metrics), - Ok(None) => {} // node was not live or metrics couldn't be fetched + Ok(_) => {} // node was not live or metrics couldn't be fetched Err(err) => return Err(err), } } diff --git a/src/handlers/http/demo_data.rs b/src/handlers/http/demo_data.rs new file mode 100644 index 000000000..db74d5d70 --- /dev/null +++ b/src/handlers/http/demo_data.rs @@ -0,0 +1,139 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use crate::{ + handlers::http::{cluster::get_demo_data_from_ingestor, ingest::PostError}, + option::Mode, + parseable::PARSEABLE, +}; +use actix_web::{web, HttpRequest, HttpResponse}; +use std::{collections::HashMap, fs, process::Command}; + +#[cfg(unix)] +use std::os::unix::fs::PermissionsExt; + +// Embed the scripts at compile time +const DEMO_SCRIPT: &str = include_str!("../../../resources/ingest_demo_data.sh"); + +pub async fn get_demo_data(req: HttpRequest) -> Result { + let query_map = web::Query::>::from_query(req.query_string()) + .map_err(|_| PostError::InvalidQueryParameter)?; + + if query_map.is_empty() { + return Err(PostError::MissingQueryParameter); + } + + let action = query_map + .get("action") + .cloned() + .ok_or(PostError::MissingQueryParameter)?; + + let url = &PARSEABLE.options.address; + let username = &PARSEABLE.options.username; + let password = &PARSEABLE.options.password; + let scheme = PARSEABLE.options.get_scheme(); + let url = format!("{scheme}://{url}"); + + match action.as_str() { + "ingest" => match PARSEABLE.options.mode { + Mode::Ingest | Mode::All => { + // Fire the script execution asynchronously + tokio::spawn(async move { + execute_demo_script(&action, &url, username, password).await + }); + + Ok(HttpResponse::Accepted().finish()) + } + Mode::Query | Mode::Prism => { + // Forward the request to ingestor asynchronously + match get_demo_data_from_ingestor(&action).await { + Ok(()) => Ok(HttpResponse::Accepted().finish()), + Err(e) => Err(PostError::Invalid(anyhow::anyhow!(e))), + } + } + _ => Err(PostError::Invalid(anyhow::anyhow!( + "Demo data is not available in this mode" + ))), + }, + "filters" | "alerts" | "dashboards" => { + // Fire the script execution asynchronously + tokio::spawn( + async move { execute_demo_script(&action, &url, username, password).await }, + ); + + Ok(HttpResponse::Accepted().finish()) + } + _ => Err(PostError::InvalidQueryParameter), + } +} + +async fn execute_demo_script( + action: &str, + url: &str, + username: &str, + password: &str, +) -> Result<(), anyhow::Error> { + // Create a temporary file to write the script + let temp_file = tempfile::NamedTempFile::new() + .map_err(|e| anyhow::anyhow!("Failed to create temporary file: {}", e))?; + + let temp_path = temp_file.path(); + // Write the script content to the temporary file + fs::write(temp_path, DEMO_SCRIPT) + .map_err(|e| anyhow::anyhow!("Failed to write script to temp file: {}", e))?; + + // Make the temporary file executable (Unix only) + #[cfg(unix)] + { + let mut permissions = fs::metadata(temp_path) + .map_err(|e| anyhow::anyhow!("Failed to read temp file metadata: {}", e))? + .permissions(); + permissions.set_mode(0o755); + fs::set_permissions(temp_path, permissions) + .map_err(|e| anyhow::anyhow!("Failed to set temp file permissions: {}", e))?; + } + + let output = Command::new("bash") + .arg(temp_path) + .env("P_URL", url) + .env("P_USERNAME", username) + .env("P_PASSWORD", password) + .env("ACTION", action) + .output() + .map_err(|e| { + anyhow::anyhow!( + "Failed to execute script: {}. Make sure bash is available.", + e + ) + })?; + + drop(temp_file); + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + let stdout = String::from_utf8_lossy(&output.stdout); + return Err(anyhow::anyhow!( + "Script execution failed. Exit code: {:?}, stdout: {}, stderr: {}", + output.status.code(), + stdout, + stderr + )); + } + + Ok(()) +} diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 34b832f6a..78a0f4525 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -493,6 +493,10 @@ pub enum PostError { IncorrectLogFormat(String), #[error("Failed to ingest events in dataset {0}. Total number of fields {1} exceeds the permissible limit of {2}. We recommend creating a new dataset beyond {2} for better query performance.")] FieldsCountLimitExceeded(String, usize, usize), + #[error("Invalid query parameter")] + InvalidQueryParameter, + #[error("Missing query parameter")] + MissingQueryParameter, } impl actix_web::ResponseError for PostError { @@ -522,6 +526,8 @@ impl actix_web::ResponseError for PostError { PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, PostError::IncorrectLogFormat(_) => StatusCode::BAD_REQUEST, PostError::FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST, + PostError::InvalidQueryParameter => StatusCode::BAD_REQUEST, + PostError::MissingQueryParameter => StatusCode::BAD_REQUEST, } } diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index 1d6c78246..f56b156e5 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -34,6 +34,7 @@ pub mod alerts; mod audit; pub mod cluster; pub mod correlation; +pub mod demo_data; pub mod health_check; pub mod ingest; mod kinesis; diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index 1ecc5dff8..a0bf9ed99 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -78,7 +78,8 @@ impl ParseableServer for IngestServer { .service(Self::get_user_webscope()) .service(Self::get_user_role_webscope()) .service(Server::get_metrics_webscope()) - .service(Server::get_readiness_factory()), + .service(Server::get_readiness_factory()) + .service(Server::get_demo_data_webscope()), ) .service(Server::get_ingest_otel_factory().wrap(from_fn( resource_check::check_resource_utilization_middleware, diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 335e29896..78c5d5383 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -75,7 +75,8 @@ impl ParseableServer for QueryServer { .service(Server::get_metrics_webscope()) .service(Server::get_alerts_webscope()) .service(Server::get_targets_webscope()) - .service(Self::get_cluster_web_scope()), + .service(Self::get_cluster_web_scope()) + .service(Server::get_demo_data_webscope()), ) .service( web::scope(&prism_base_path()) diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 615af1d8e..6de0c5413 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -23,6 +23,7 @@ use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::alerts; use crate::handlers::http::base_path; +use crate::handlers::http::demo_data::get_demo_data; use crate::handlers::http::health_check; use crate::handlers::http::prism_base_path; use crate::handlers::http::query; @@ -97,7 +98,8 @@ impl ParseableServer for Server { ))) .service(Self::get_alerts_webscope()) .service(Self::get_targets_webscope()) - .service(Self::get_metrics_webscope()), + .service(Self::get_metrics_webscope()) + .service(Self::get_demo_data_webscope()), ) .service( web::scope(&prism_base_path()) @@ -201,6 +203,10 @@ impl Server { ) } + pub fn get_demo_data_webscope() -> Scope { + web::scope("/demodata").service(web::resource("").route(web::get().to(get_demo_data))) + } + pub fn get_metrics_webscope() -> Scope { web::scope("/metrics").service( web::resource("").route(web::get().to(metrics::get).authorize(Action::Metrics)),