diff --git a/data-loader/performance-test/README.md b/data-loader/performance-test/README.md new file mode 100644 index 0000000000..0e1426f9f1 --- /dev/null +++ b/data-loader/performance-test/README.md @@ -0,0 +1,88 @@ +# Performance test execution for ScalarDB Data Loader + +## Instructions to run the script + +Execute the e2e_test.sh script from the `performance-test` folder of the repository: + +``` +./e2e_test.sh [options] +``` + +## Available command-line arguments + +``` +./e2e_test.sh [--memory=mem1,mem2,...] [--cpu=cpu1,cpu2,...] [--data-size=size] [--image-tag=tag] [--import-args=args] [--export-args=args] [--network=network-name] [--skip-data-gen] [--disable-import] [--disable-export] [--no-clean-data] [--database-dir=path] [--use-jar] [--jar-path=path] +``` + +Options: + +- `--memory=mem1,mem2,...`: Comma-separated list of memory limits for Docker containers (e.g., 1g,2g,4g) +- `--cpu=cpu1,cpu2,...`: Comma-separated list of CPU limits for Docker containers (e.g., 1,2,4) +- `--data-size=size`: Size of data to generate (e.g., 1MB, 2GB) +- `--num-rows=number`: Number of rows to generate (e.g., 1000, 10000) + +Note: Either `--data-size` or `--num-rows` must be provided, but not both. + +- `--image-tag=tag`: Docker image tag to use (default: 4.0.0-SNAPSHOT) +- `--import-args=args`: Arguments for import command +- `--export-args=args`: Arguments for export command +- `--network=network-name`: Docker network name (default: my-network) +- `--skip-data-gen`: Skip data generation step +- `--disable-import`: Skip import test +- `--disable-export`: Skip export test +- `--no-clean-data`: Don't clean up generated files after test +- `--database-dir=path`: Path to database directory +- `--use-jar`: Use JAR file instead of Docker container +- `--jar-path=path`: Path to JAR file (when using --use-jar) + +Examples: + +``` +# Using data size +./e2e_test.sh --memory=1g,2g,4g --cpu=1,2,4 --data-size=2MB --image-tag=4.0.0-SNAPSHOT +``` + +``` +# Using number of rows +./e2e_test.sh --memory=1g,2g,4g --cpu=1,2,4 --num-rows=10000 --image-tag=4.0.0-SNAPSHOT +``` + +Example with JAR: + +``` +./e2e_test.sh --use-jar --jar-path=./scalardb-data-loader-cli.jar --import-args="--format csv --import-mode insert --mode transaction --transaction-size 10 --data-chunk-size 500 --max-threads 16" --export-args="--format csv --max-threads 8 --data-chunk-size 500" +``` + +### Import-Only Examples + +To run only the import test (skipping export): + +``` +# Using data size +./e2e_test.sh --disable-export --memory=2g --cpu=2 --data-size=1MB --import-args="--format csv --import-mode insert --mode transaction --transaction-size 10 --max-threads 16" +``` + +``` +# Using number of rows +./e2e_test.sh --disable-export --memory=2g --cpu=2 --num-rows=10000 --import-args="--format csv --import-mode insert --mode transaction --transaction-size 10 --max-threads 16" +``` + +With JAR: + +``` +./e2e_test.sh --disable-export --use-jar --jar-path=./scalardb-data-loader-cli.jar --import-args="--format csv --import-mode insert --mode transaction --transaction-size 10 --max-threads 16" +``` + +### Export-Only Examples + +To run only the export test (skipping import): + +``` +./e2e_test.sh --disable-import --memory=2g --cpu=2 --export-args="--format csv --max-threads 8 --data-chunk-size 500" +``` + +With JAR: + +``` +./e2e_test.sh --disable-import --use-jar --jar-path=./scalardb-data-loader-cli.jar --export-args="--format csv --max-threads 8 --data-chunk-size 500" +``` diff --git a/data-loader/performance-test/database/db_setup.sh b/data-loader/performance-test/database/db_setup.sh new file mode 100755 index 0000000000..8d7115ceca --- /dev/null +++ b/data-loader/performance-test/database/db_setup.sh @@ -0,0 +1,40 @@ +#!/bin/bash + +# Set variables +NETWORK_NAME="my-network" +POSTGRES_CONTAINER="postgres-db" +SCALARDB_PROPERTIES="$(pwd)/scalardb.properties" +SCHEMA_JSON="$(pwd)/schema.json" + +# Step 1: Create a Docker network (if not exists) +docker network inspect $NETWORK_NAME >/dev/null 2>&1 || \ + docker network create $NETWORK_NAME + +# Step 2: Start PostgreSQL container +docker run -d --name $POSTGRES_CONTAINER \ + --network $NETWORK_NAME \ + -e POSTGRES_USER=myuser \ + -e POSTGRES_PASSWORD=mypassword \ + -e POSTGRES_DB=mydatabase \ + -p 5432:5432 \ + postgres:16 + +# Wait for PostgreSQL to be ready +echo "Waiting for PostgreSQL to start..." +sleep 10 + +# Step 3: Create 'test' schema +docker exec -i $POSTGRES_CONTAINER psql -U myuser -d mydatabase -c "CREATE SCHEMA IF NOT EXISTS test;" + +# Step 4: Run ScalarDB Schema Loader +docker run --rm --network $NETWORK_NAME \ + -v "$SCHEMA_JSON:/schema.json" \ + -v "$SCALARDB_PROPERTIES:/scalardb.properties" \ + ghcr.io/scalar-labs/scalardb-schema-loader:3.15.2-SNAPSHOT \ + -f /schema.json --config /scalardb.properties --coordinator + +# Step 5: Verify schema creation +docker exec -i $POSTGRES_CONTAINER psql -U myuser -d mydatabase -c "\dn" + +echo "✅ Schema Loader execution completed." + diff --git a/data-loader/performance-test/database/scalardb.properties b/data-loader/performance-test/database/scalardb.properties new file mode 100644 index 0000000000..78baaee01f --- /dev/null +++ b/data-loader/performance-test/database/scalardb.properties @@ -0,0 +1,6 @@ +scalar.db.storage=jdbc +scalar.db.contact_points=jdbc:postgresql://postgres-db:5432/mydatabase +scalar.db.username=myuser +scalar.db.password=mypassword +scalar.db.cross_partition_scan.enabled=true +scalar.db.transaction_manager=single-crud-operation diff --git a/data-loader/performance-test/database/schema.json b/data-loader/performance-test/database/schema.json new file mode 100644 index 0000000000..ddf91a14e4 --- /dev/null +++ b/data-loader/performance-test/database/schema.json @@ -0,0 +1,25 @@ +{ + "test.all_columns": { + "transaction": true, + "partition-key": [ + "col1" + ], + "clustering-key": [ + "col2", + "col3" + ], + "columns": { + "col1": "BIGINT", + "col2": "INT", + "col3": "BOOLEAN", + "col4": "FLOAT", + "col5": "DOUBLE", + "col6": "TEXT", + "col7": "BLOB", + "col8": "DATE", + "col9": "TIME", + "col10": "TIMESTAMP", + "col11": "TIMESTAMPTZ" + } + } +} diff --git a/data-loader/performance-test/e2e_test.sh b/data-loader/performance-test/e2e_test.sh new file mode 100755 index 0000000000..ef2dad7a96 --- /dev/null +++ b/data-loader/performance-test/e2e_test.sh @@ -0,0 +1,415 @@ +#!/bin/bash + +set -e + +# === FUNCTIONS === + +info() { echo -e "\033[1;34m▶ $1\033[0m"; } +success() { echo -e "\033[1;32m✅ $1\033[0m"; } +warn() { echo -e "\033[1;33m⚠️ $1\033[0m"; } +error() { echo -e "\033[1;31m❌ $1\033[0m"; } + +# Timestamp and elapsed time functions +timestamp() { + date +"%Y-%m-%d %H:%M:%S" +} + +start_timer() { + TIMER_START=$(date +%s) + echo -e "\033[1;36m[$(timestamp)] Starting: $1\033[0m" +} + +end_timer() { + local end_time=$(date +%s) + local elapsed=$((end_time - TIMER_START)) + local hours=$((elapsed / 3600)) + local minutes=$(( (elapsed % 3600) / 60 )) + local seconds=$((elapsed % 60)) + + if [[ $hours -gt 0 ]]; then + echo -e "\033[1;36m[$(timestamp)] Completed: $1 (Elapsed time: ${hours}h ${minutes}m ${seconds}s)\033[0m" + elif [[ $minutes -gt 0 ]]; then + echo -e "\033[1;36m[$(timestamp)] Completed: $1 (Elapsed time: ${minutes}m ${seconds}s)\033[0m" + else + echo -e "\033[1;36m[$(timestamp)] Completed: $1 (Elapsed time: ${seconds}s)\033[0m" + fi +} + +cleanup() { + info "Cleaning up database container..." + docker kill "$DATABASE_CONTAINER" &>/dev/null || true + docker rm -v "$DATABASE_CONTAINER" &>/dev/null || true + success "Removed database container" +} + +setup_database() { + info "Starting database Docker container..." + start_timer "Setting up database and load schema" + pushd "$DATABASE_ROOT_PATH" >/dev/null + bash "$DATABASE_SETUP_SCRIPT" + popd >/dev/null + end_timer "Setting up database and load schema" + success "Database container is running" +} + +ensure_docker_network() { + if ! docker network inspect $DOCKER_NETWORK >/dev/null 2>&1; then + info "Creating Docker network '$DOCKER_NETWORK'" + docker network create $DOCKER_NETWORK + fi +} + +run_import_container() { + local mem=$1 + local cpu=$2 + local container_name="import-mem${mem}-cpu${cpu}" + + info "Running Import Docker container: $container_name with --memory=$mem --cpus=$cpu" + start_timer "Import container $container_name" + + docker run --rm \ + --name "$container_name" \ + --network $DOCKER_NETWORK \ + --memory="$mem" \ + --cpus="$cpu" \ + -v "$PWD/$INPUT_SOURCE_FILE":"$CONTAINER_INPUT_FILE" \ + -v "$PWD/$PROPERTIES_PATH":"$CONTAINER_PROPERTIES_PATH" \ + -v "$PWD/$LOG_DIR_HOST":"$CONTAINER_LOG_DIR" \ + "$IMAGE_NAME" \ + import --config "$CONTAINER_PROPERTIES_PATH" \ + --file "$CONTAINER_INPUT_FILE" \ + --namespace test \ + --table all_columns \ + $IMPORT_ARGS \ + --log-dir "$CONTAINER_LOG_DIR_PATH" + + end_timer "Import container $container_name" + success "Finished Import: $container_name" + echo "----------------------------------------" +} + +run_export_container() { + local mem=$1 + local cpu=$2 + local container_name="export-mem${mem}-cpu${cpu}" + + mkdir -p dumps + + info "Running Export Docker container: $container_name with --memory=$mem --cpus=$cpu" + start_timer "Export container $container_name" + + docker run --rm \ + --name "$container_name" \ + --network $DOCKER_NETWORK \ + --memory="$mem" \ + --cpus="$cpu" \ + -v "$PWD/$PROPERTIES_PATH":"$CONTAINER_PROPERTIES_PATH" \ + -v "$PWD/$OUTPUT_DIR_HOST":"$CONTAINER_OUTPUT_DIR" \ + -v "$PWD/dumps":/tmp \ + --entrypoint java \ + "$IMAGE_NAME" \ + -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/heapdump.hprof -XX:+UseContainerSupport -XX:MaxRAMPercentage=80.0 -jar /app.jar \ + export --config "$CONTAINER_PROPERTIES_PATH" \ + --namespace test \ + --table all_columns \ + --output-dir "$CONTAINER_OUTPUT_DIR_PATH" \ + $EXPORT_ARGS + + end_timer "Export container $container_name" + success "Finished Export: $container_name" + echo "----------------------------------------" +} + +run_import_jar() { + info "Running Import using JAR file" + start_timer "Import using JAR" + + java -jar "$JAR_PATH" \ + import --config "$PROPERTIES_PATH" \ + --file "$INPUT_SOURCE_FILE" \ + --namespace test \ + --table all_columns \ + $IMPORT_ARGS \ + --log-dir "$LOG_DIR_HOST" + + end_timer "Import using JAR" + success "Finished Import using JAR" + echo "----------------------------------------" +} + +run_export_jar() { + info "Running Export using JAR file" + start_timer "Export using JAR" + + java -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/heapdump.hprof -XX:MaxRAMPercentage=80.0 -jar "$JAR_PATH" \ + export --config "$PROPERTIES_PATH" \ + --namespace test \ + --table all_columns \ + --output-dir "$OUTPUT_DIR_HOST" \ + $EXPORT_ARGS + + end_timer "Export using JAR" + success "Finished Export using JAR" + echo "----------------------------------------" +} + +# === PREREQUISITE CHECK === + +command -v docker >/dev/null 2>&1 || { error "Docker is not installed or not in PATH"; exit 1; } +# Java is checked when --use-jar is specified in the argument parsing section + +# === CONFIGURATION VARIABLES === + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +IMAGE_BASE="ghcr.io/scalar-labs/scalardb-data-loader-cli" +IMAGE_TAG="4.0.0-SNAPSHOT" +IMAGE_NAME="${IMAGE_BASE}:${IMAGE_TAG}" +DATABASE_DIR="database" +DATABASE_ROOT_PATH="$SCRIPT_DIR/$DATABASE_DIR" +DATABASE_SETUP_SCRIPT="$DATABASE_ROOT_PATH/db_setup.sh" +DATABASE_CONTAINER="postgres-db-1" +SCHEMA_PATH="$DATABASE_ROOT_PATH/schema.json" +PROPERTIES_PATH="$DATABASE_DIR/scalardb.properties" +PYTHON_SCRIPT_PATH="scripts/import-data-generator.py" +DATA_SIZE="" +NUM_ROWS="" +PYTHON_ARGUMENTS="" +INPUT_SOURCE_FILE="./generated-imports.csv" +CONTAINER_INPUT_FILE="/app/generated-imports.csv" +CONTAINER_PROPERTIES_PATH="/app/scalardb.properties" +LOG_DIR_HOST="import-logs" +CONTAINER_LOG_DIR="/app/logs" +CONTAINER_LOG_DIR_PATH="/app/logs/" +OUTPUT_DIR_HOST="export-output" +CONTAINER_OUTPUT_DIR="/app/export-output" +CONTAINER_OUTPUT_DIR_PATH="/app/export-output/" +DOCKER_NETWORK="my-network" +JAR_PATH="./scalardb-data-loader-cli-4.0.0-SNAPSHOT.jar" + +# Feature flags +SKIP_DATA_GEN=false +DISABLE_IMPORT=false +DISABLE_EXPORT=false +NO_CLEAN_DATA=false +USE_JAR=false + +MEMORY_CONFIGS=("2g") +CPU_CONFIGS=("2") +IMPORT_ARGS="--format csv --import-mode insert --mode transaction --transaction-size 10 --data-chunk-size 500 --max-threads 8" +EXPORT_ARGS="--format csv --max-threads 8 --data-chunk-size 500" + +# Parse command line arguments +while [[ $# -gt 0 ]]; do + case $1 in + --memory=*) + IFS=',' read -r -a MEMORY_CONFIGS <<< "${1#*=}" + shift + ;; + --cpu=*) + IFS=',' read -r -a CPU_CONFIGS <<< "${1#*=}" + shift + ;; + --data-size=*) + if [[ -n "$NUM_ROWS" ]]; then + error "Cannot specify both --data-size and --num-rows" + echo "Usage: $0 [--memory=mem1,mem2,...] [--cpu=cpu1,cpu2,...] [--data-size=size] [--num-rows=number] [--image-tag=tag] [--import-args=args] [--export-args=args] [--network=network-name] [--skip-data-gen] [--disable-import] [--disable-export] [--no-clean-data] [--database-dir=path] [--use-jar] [--jar-path=path]" + echo "Note: Either --data-size or --num-rows must be provided, but not both." + exit 1 + fi + DATA_SIZE=${1#*=} + PYTHON_ARGUMENTS="-s $DATA_SIZE -o /app/generated-imports.csv /app/$DATABASE_DIR/schema.json test.all_columns" + shift + ;; + --num-rows=*) + if [[ -n "$DATA_SIZE" ]]; then + error "Cannot specify both --data-size and --num-rows" + echo "Usage: $0 [--memory=mem1,mem2,...] [--cpu=cpu1,cpu2,...] [--data-size=size] [--num-rows=number] [--image-tag=tag] [--import-args=args] [--export-args=args] [--network=network-name] [--skip-data-gen] [--disable-import] [--disable-export] [--no-clean-data] [--database-dir=path] [--use-jar] [--jar-path=path]" + echo "Note: Either --data-size or --num-rows must be provided, but not both." + exit 1 + fi + NUM_ROWS=${1#*=} + PYTHON_ARGUMENTS="-n $NUM_ROWS -o /app/generated-imports.csv /app/$DATABASE_DIR/schema.json test.all_columns" + shift + ;; + --image-tag=*) + IMAGE_TAG=${1#*=} + IMAGE_NAME="${IMAGE_BASE}:${IMAGE_TAG}" + shift + ;; + --import-args=*) + IMPORT_ARGS=${1#*=} + shift + ;; + --export-args=*) + EXPORT_ARGS=${1#*=} + shift + ;; + --network=*) + DOCKER_NETWORK=${1#*=} + shift + ;; + --skip-data-gen) + SKIP_DATA_GEN=true + shift + ;; + --disable-import) + DISABLE_IMPORT=true + shift + ;; + --disable-export) + DISABLE_EXPORT=true + shift + ;; + --no-clean-data) + NO_CLEAN_DATA=true + shift + ;; + --database-dir=*) + DATABASE_DIR=${1#*=} + DATABASE_ROOT_PATH="$SCRIPT_DIR/$DATABASE_DIR" + DATABASE_SETUP_SCRIPT="$DATABASE_ROOT_PATH/db_setup.sh" + SCHEMA_PATH="$DATABASE_ROOT_PATH/schema.json" + PROPERTIES_PATH="$DATABASE_DIR/scalardb.properties" + # Update Python arguments based on whether data-size or num-rows is specified + if [[ -n "$DATA_SIZE" ]]; then + PYTHON_ARGUMENTS="-s $DATA_SIZE -o /app/generated-imports.csv /app/$DATABASE_DIR/schema.json test.all_columns" + elif [[ -n "$NUM_ROWS" ]]; then + PYTHON_ARGUMENTS="-n $NUM_ROWS -o /app/generated-imports.csv /app/$DATABASE_DIR/schema.json test.all_columns" + fi + shift + ;; + --use-jar) + USE_JAR=true + command -v java >/dev/null 2>&1 || { error "Java is not installed or not in PATH"; exit 1; } + shift + ;; + --jar-path=*) + JAR_PATH=${1#*=} + shift + ;; + *) + echo "Unknown option: $1" + echo "Usage: $0 [--memory=mem1,mem2,...] [--cpu=cpu1,cpu2,...] [--data-size=size] [--num-rows=number] [--image-tag=tag] [--import-args=args] [--export-args=args] [--network=network-name] [--skip-data-gen] [--disable-import] [--disable-export] [--no-clean-data] [--database-dir=path] [--use-jar] [--jar-path=path]" + echo "Note: Either --data-size or --num-rows must be provided, but not both." + echo "Example: $0 --memory=1g,2g,4g --cpu=1,2,4 --data-size=2MB --image-tag=4.0.0-SNAPSHOT --import-args=\"--format csv --import-mode insert --mode transaction --transaction-size 10 --data-chunk-size 50 --max-threads 16\" --export-args=\"--format csv --max-threads 8 --data-chunk-size 10\" --network=my-custom-network" + echo "Example with num-rows: $0 --memory=1g,2g,4g --cpu=1,2,4 --num-rows=1000 --image-tag=4.0.0-SNAPSHOT" + echo "Example with JAR: $0 --use-jar --jar-path=./scalardb-data-loader-cli.jar --import-args=\"--format csv --import-mode insert --mode transaction --transaction-size 10 --data-chunk-size 50 --max-threads 16\" --export-args=\"--format csv --max-threads 8 --data-chunk-size 10\"" + exit 1 + ;; + esac +done + +trap cleanup EXIT + +# Set default if neither is provided +if [[ -z "$DATA_SIZE" && -z "$NUM_ROWS" ]]; then + DATA_SIZE="1MB" + PYTHON_ARGUMENTS="-s $DATA_SIZE -o /app/generated-imports.csv /app/$DATABASE_DIR/schema.json test.all_columns" + info "No data size or row count specified, using default: $DATA_SIZE" +fi + +# === SCRIPT START === +start_timer "End-to-end test script" + +# === SETUP DIRECTORIES === +start_timer "Setup directories" +mkdir -p "$LOG_DIR_HOST" +chmod 777 "$LOG_DIR_HOST" + +mkdir -p "$OUTPUT_DIR_HOST" +chmod 777 "$OUTPUT_DIR_HOST" +end_timer "Setup directories" + +# === GENERATE IMPORT DATA === +if [[ "$SKIP_DATA_GEN" != true ]]; then + if [[ ! -f "$PYTHON_SCRIPT_PATH" ]]; then + error "Python script not found at $PYTHON_SCRIPT_PATH" + exit 1 + fi + + info "Running Python script to generate input data using Docker..." + start_timer "Generate import data" + docker run --rm \ + -v "$PWD":/app \ + python:alpine \ + python3 /app/"$PYTHON_SCRIPT_PATH" $PYTHON_ARGUMENTS + end_timer "Generate import data" + success "Input file generated." +else + info "Skipping data generation as requested" +fi + +# === ENSURE DOCKER NETWORK === +start_timer "Ensure Docker network" +ensure_docker_network +end_timer "Ensure Docker network" + +# === SETTING UP DATABASE AND LOAD SCHEMA === +setup_database + +# === RUN IMPORT AND EXPORT TESTS === +info "Running tests..." +start_timer "All tests" + +if [[ "$USE_JAR" == true ]]; then + # Check if JAR file exists + if [[ ! -f "$JAR_PATH" ]]; then + error "JAR file not found at $JAR_PATH" + exit 1 + fi + + info "Using JAR file for tests: $JAR_PATH" + + # Run import and export using JAR + if [[ "$DISABLE_IMPORT" != true ]]; then + run_import_jar + else + info "Skipping import test as requested" + fi + + if [[ "$DISABLE_EXPORT" != true ]]; then + run_export_jar + else + info "Skipping export test as requested" + fi +else + # Run tests using Docker containers with different memory and CPU configurations + first_iteration=true + for mem in "${MEMORY_CONFIGS[@]}"; do + for cpu in "${CPU_CONFIGS[@]}"; do + # Clean up and restart the database for each test iteration, except the first one + if [ "$first_iteration" = true ]; then + first_iteration=false + else + cleanup + setup_database + fi + + if [[ "$DISABLE_IMPORT" != true ]]; then + run_import_container "$mem" "$cpu" + else + info "Skipping import test as requested" + fi + + if [[ "$DISABLE_EXPORT" != true ]]; then + run_export_container "$mem" "$cpu" + else + info "Skipping export test as requested" + fi + done + done +fi +end_timer "All tests" + +# === CLEANUP === +if [[ "$NO_CLEAN_DATA" != true ]]; then + info "Cleaning up generated files..." + start_timer "Cleanup generated files" + rm -rf "$LOG_DIR_HOST" "$INPUT_SOURCE_FILE" "$OUTPUT_DIR_HOST" + end_timer "Cleanup generated files" +else + info "Skipping cleanup of generated files as requested" +fi + +end_timer "End-to-end test script" +success "End-to-end test completed successfully 🎉" diff --git a/data-loader/performance-test/scripts/import-data-generator.py b/data-loader/performance-test/scripts/import-data-generator.py new file mode 100755 index 0000000000..41186c6836 --- /dev/null +++ b/data-loader/performance-test/scripts/import-data-generator.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python3 + +import argparse +import binascii +import datetime +import json +import multiprocessing +import os +import random +import string +import sys +from datetime import timedelta +from typing import Dict, List, Any + +# --- Configuration --- +DEFAULT_OUTPUT_FILE = "output.csv" +RANDOM_STRING_LENGTH = 12 +RANDOM_INT_MIN = -2 ** 31 +RANDOM_INT_MAX = 2 ** 31 - 1 +RANDOM_FLOAT_MAX = 10000.0 +RANDOM_FLOAT_PRECISION = 6 +RANDOM_DATE_MAX_DAYS_AGO = 3650 # Approx 10 years +RANDOM_BLOB_LENGTH = 16 + + +def parse_arguments(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description='Generate random CSV data based on a ScalarDB JSON schema using multiple cores.', + formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument('schema_file', help='Path to the ScalarDB JSON schema file') + parser.add_argument('table_name', help='Name of the table within the schema (e.g., "namespace.tablename")') + + # Create a mutually exclusive group for num_rows or target_size + size_group = parser.add_mutually_exclusive_group(required=True) + size_group.add_argument('-n', '--num-rows', type=int, help='Number of data rows to generate') + size_group.add_argument('-s', '--target-size', type=str, + help='Target file size (e.g., "10MB", "1GB"). Supports B, KB, MB, GB suffixes.') + + parser.add_argument('-o', '--output-file', default=DEFAULT_OUTPUT_FILE, + help=f'Name of the output CSV file (default: {DEFAULT_OUTPUT_FILE})') + + args = parser.parse_args() + + # Basic validation + if not os.path.isfile(args.schema_file): + parser.error(f"Schema file not found: {args.schema_file}") + + if args.num_rows is not None and args.num_rows <= 0: + parser.error("Number of rows must be a positive integer") + + # Parse target size if provided + if args.target_size: + try: + args.target_size_bytes = parse_size(args.target_size) + except ValueError as e: + parser.error(str(e)) + + return args + + +def parse_size(size_str): + """Parse a size string like '10MB' and return the size in bytes.""" + size_str = size_str.upper().strip() + + # Define size multipliers + multipliers = { + 'KB': 1024, + 'MB': 1024 * 1024, + 'GB': 1024 * 1024 * 1024, + } + + # Try to match with a suffix + for suffix, multiplier in multipliers.items(): + if size_str.endswith(suffix): + try: + # Extract the numeric part without the suffix + value = float(size_str[:-len(suffix)]) + if value <= 0: + raise ValueError("Size value must be positive") + return int(value * multiplier) + except ValueError as e: + print(e) + raise ValueError(f"Invalid size format: {size_str}") + + # Try parsing as a plain number (bytes) + try: + value = float(size_str) + if value <= 0: + raise ValueError("Size value must be positive") + return int(value) + except ValueError: + raise ValueError(f"Invalid size format: {size_str}. Use formats like '10MB', '1.5GB', etc.") + + +def generate_random_data(data_type: str) -> str: + """Generate random data based on ScalarDB type.""" + # Convert type to uppercase for case-insensitive matching + upper_type = data_type.upper() + + if upper_type in ("INT", "BIGINT"): + return str(random.randint(RANDOM_INT_MIN, RANDOM_INT_MAX)) + + elif upper_type == "TEXT": + return ''.join(random.choices(string.ascii_letters + string.digits, k=RANDOM_STRING_LENGTH)) + + elif upper_type in ("FLOAT", "DOUBLE"): + return f"{random.uniform(0, RANDOM_FLOAT_MAX):.{RANDOM_FLOAT_PRECISION}f}" + + elif upper_type == "DATE": + random_days_ago = random.randint(0, RANDOM_DATE_MAX_DAYS_AGO) + return (datetime.datetime.now() - timedelta(days=random_days_ago)).strftime('%Y-%m-%d') + + elif upper_type == "TIME": + hour = random.randint(0, 23) + minute = random.randint(0, 59) + second = random.randint(0, 59) + millisecond = random.randint(0, 999) + return f"{hour:02d}:{minute:02d}:{second:02d}.{millisecond:03d}" + + elif upper_type == "TIMESTAMP": + random_days_ago = random.randint(0, RANDOM_DATE_MAX_DAYS_AGO) + random_seconds_ago = random.randint(0, 86400) # Seconds in a day + timestamp = datetime.datetime.now() - timedelta(days=random_days_ago, seconds=random_seconds_ago) + return timestamp.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] # Trim microseconds to milliseconds + + elif upper_type == "TIMESTAMPTZ": + random_days_ago = random.randint(0, RANDOM_DATE_MAX_DAYS_AGO) + random_seconds_ago = random.randint(0, 86400) # Seconds in a day + timestamp = datetime.datetime.now() - timedelta(days=random_days_ago, seconds=random_seconds_ago) + return timestamp.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' # Add Z for UTC + + elif upper_type == "BOOLEAN": + return random.choice(["true", "false"]) + + elif upper_type == "BLOB": + random_bytes = os.urandom(RANDOM_BLOB_LENGTH) + return binascii.hexlify(random_bytes).decode('ascii') + + else: + return f"UNSUPPORTED_TYPE({data_type})" + + +def generate_row(column_types: List[str]) -> str: + """Generate a single CSV row based on the schema.""" + row_values = [generate_random_data(col_type) for col_type in column_types] + return ','.join(row_values) + + +def estimate_row_size(column_types: List[str], sample_size: int = 10) -> int: + """Estimate the average size of a row in bytes by generating sample rows.""" + total_size = 0 + for _ in range(sample_size): + row = generate_row(column_types) + total_size += len(row.encode('utf-8')) + 1 # +1 for newline character + return total_size // sample_size + + +def worker_task(args): + """Worker function for multiprocessing.""" + _, column_types = args + return generate_row(column_types) + + +def format_size(size_bytes): + """Format size in bytes to a human-readable string.""" + if size_bytes < 1024: + return f"{size_bytes}B" + elif size_bytes < pow(1024, 2): + return f"{round(size_bytes / 1024, 2)}KB" + elif size_bytes < pow(1024, 3): + return f"{round(size_bytes / (pow(1024, 2)), 2)}MB" + elif size_bytes < pow(1024, 4): + return f"{round(size_bytes / (pow(1024, 3)), 2)}GB" + + +def main(): + args = parse_arguments() + + try: + # Read schema file + with open(args.schema_file, 'r') as f: + schema = json.load(f) + + # Validate table existence in schema + if args.table_name not in schema: + print(f"Error: Table '{args.table_name}' not found in schema file '{args.schema_file}'") + sys.exit(1) + + # Extract column information + columns = schema[args.table_name].get('columns', {}) + if not columns: + print(f"Error: No columns found for table '{args.table_name}' in schema file") + sys.exit(1) + + # Get column names in order (keys of the columns dictionary) + column_names = list(columns.keys()) + column_header = ','.join(column_names) + + # Get corresponding column types + column_types = [columns[name] for name in column_names] + + # Calculate the number of rows if target size is specified + if hasattr(args, 'target_size_bytes'): + # Estimate size with a sample + avg_row_size = estimate_row_size(column_types) + header_size = len(column_header.encode('utf-8')) + 1 # +1 for newline + num_rows = max(1, (args.target_size_bytes - header_size) // avg_row_size) + print(f"Targeting ~{args.target_size} file size. Estimated row size: {avg_row_size} bytes") + print(f"Will generate {num_rows} rows to approximate the target size") + else: + num_rows = args.num_rows + + # Set up multiprocessing + num_cores = multiprocessing.cpu_count() + print(f"Generating {num_rows} rows for table '{args.table_name}' using {num_cores} cores...") + print(f"Schema File: {args.schema_file}") + print(f"Output File: {args.output_file}") + + # Write header to output file + with open(args.output_file, 'w') as f: + f.write(column_header + '\n') + + # Generate data using multiprocessing + with multiprocessing.Pool(processes=num_cores) as pool: + # Process chunks of data to avoid memory issues with very large datasets + chunk_size = min(1000, max(1, num_rows // (num_cores * 10))) + + with open(args.output_file, 'a') as f: + # Process in chunks to avoid storing all tasks in memory + for chunk_start in range(0, num_rows, chunk_size): + # Calculate the actual size of this chunk (might be smaller for the last chunk) + current_chunk_size = min(chunk_size, num_rows - chunk_start) + + # Use imap_unordered with a generator expression and appropriate chunksize for better memory efficiency + # imap_unordered processes items as they become available and doesn't wait for order preservation + # This is faster and uses less memory than regular imap + # The chunksize parameter controls how many tasks each worker gets at once + worker_chunksize = max(1, min(100, current_chunk_size // num_cores)) + results = pool.imap_unordered( + worker_task, + ((i, column_types) for i in range(chunk_start, chunk_start + current_chunk_size)), + chunksize=worker_chunksize + ) + + # Write results and explicitly flush to ensure data is written to disk + for row in results: + f.write(row + '\n') + + # Flush after each chunk to ensure data is written to disk + f.flush() + + # Report progress for large datasets + if num_rows > 10000 and (chunk_start + current_chunk_size) % (num_rows // 10) < chunk_size: + progress = (chunk_start + current_chunk_size) / num_rows * 100 + print(f"Progress: {progress:.1f}% ({chunk_start + current_chunk_size}/{num_rows} rows)", + flush=True) + + # Report final file size + final_size = os.path.getsize(args.output_file) + size_str = format_size(final_size) + print( + f"Successfully generated {args.output_file} with {num_rows} data rows ({size_str}) for table '{args.table_name}'.") + + except json.JSONDecodeError: + print(f"Error: Invalid JSON in schema file '{args.schema_file}'") + sys.exit(1) + except Exception as e: + print(f"Error: {str(e)}") + sys.exit(1) + + +if __name__ == "__main__": + main()