|
| 1 | +#!/bin/bash |
| 2 | + |
| 3 | +# Check if the required arguments are provided |
| 4 | +if [[ $# -lt 6 ]]; then |
| 5 | + echo "Usage: $0 <DATA_DIRECTORY> <DB_NAME> <TABLE_NAME> <MAX_FILES> <SUCCESS_LOG> <ERROR_LOG>" |
| 6 | + exit 1 |
| 7 | +fi |
| 8 | + |
| 9 | + |
| 10 | +# Arguments |
| 11 | +DATA_DIRECTORY="$1" |
| 12 | +DB_NAME="$2" |
| 13 | +TABLE_NAME="$3" |
| 14 | +MAX_FILES="$4" |
| 15 | +SUCCESS_LOG="$5" |
| 16 | +ERROR_LOG="$6" |
| 17 | + |
| 18 | +# Validate arguments |
| 19 | +[[ ! -d "$DATA_DIRECTORY" ]] && { echo "Error: Data directory '$DATA_DIRECTORY' does not exist."; exit 1; } |
| 20 | +[[ ! "$MAX_FILES" =~ ^[0-9]+$ ]] && { echo "Error: MAX_FILES must be a positive integer."; exit 1; } |
| 21 | + |
| 22 | +# Create a temporary directory for uncompressed files |
| 23 | +TEMP_DIR=$(mktemp -d /var/tmp/json_files.XXXXXX) |
| 24 | +trap "rm -rf $TEMP_DIR" EXIT # Cleanup temp directory on script exit |
| 25 | + |
| 26 | +# Load data |
| 27 | +counter=0 |
| 28 | +start=0 |
| 29 | +for file in $(ls "$DATA_DIRECTORY"/*.json.gz | head -n "$MAX_FILES"); do |
| 30 | + echo "Processing file: $file" |
| 31 | + num=$(echo "$file" | sed -n 's/[^0-9]*\([0-9]\+\).*/\1/p') |
| 32 | + if [ "$num" -le "$start" ]; then |
| 33 | + continue |
| 34 | + fi |
| 35 | + |
| 36 | + # Uncompress the file into the TEMP_DIR |
| 37 | + uncompressed_file="$TEMP_DIR/$(basename "${file%.gz}")" |
| 38 | + gunzip -c "$file" > "$uncompressed_file" |
| 39 | + |
| 40 | + if [[ $? -ne 0 ]]; then |
| 41 | + echo "Error: Failed to uncompress $file" >> "$ERROR_LOG" |
| 42 | + continue |
| 43 | + fi |
| 44 | + MAX_ATTEMPT=10 |
| 45 | + attempt=0 |
| 46 | + while [ $attempt -lt $MAX_ATTEMPT ] |
| 47 | + do |
| 48 | + # Attempt the import |
| 49 | + http_code=$(curl -s -w "%{http_code}" -o >(cat >/tmp/curl_body) --location-trusted -u root: -H "max_filter_ratio: 0.1" -H "Expect:100-continue" -H "columns: data" -T "$uncompressed_file" -XPUT http://127.0.0.1:8030/api/"$DB_NAME"/"$TABLE_NAME"/_stream_load) |
| 50 | + response_body="$(cat /tmp/curl_body)" |
| 51 | + response_status="$(cat /tmp/curl_body | jq -r '.Status')" |
| 52 | + echo $response_status |
| 53 | + if [[ "$http_code" -ge 200 && "$http_code" -lt 300 ]]; then |
| 54 | + if [ "$response_status" = "Success" ] |
| 55 | + then |
| 56 | + echo "[$(date '+%Y-%m-%d %H:%M:%S')] Successfully imported $file. Response: $response_body" >> "$SUCCESS_LOG" |
| 57 | + rm -f "$uncompressed_file" # Delete the uncompressed file after successful processing |
| 58 | + attempt=$((MAX_ATTEMPT)) |
| 59 | + else |
| 60 | + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $attempt attempt failed for $file with status code $http_code. Response: $response_body" >> "$ERROR_LOG" |
| 61 | + attempt=$((attempt + 1)) |
| 62 | + sleep 2 |
| 63 | + fi |
| 64 | + else |
| 65 | + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $attempt attempt failed for $file with status code $http_code. Response: $response_body" >> "$ERROR_LOG" |
| 66 | + attempt=$((attempt + 1)) |
| 67 | + sleep 2 |
| 68 | + fi |
| 69 | + done |
| 70 | + |
| 71 | + counter=$((counter + 1)) |
| 72 | + if [[ $counter -ge $MAX_FILES ]]; then |
| 73 | + break |
| 74 | + fi |
| 75 | +done |
0 commit comments