|
| 1 | +#!/bin/bash |
| 2 | + |
| 3 | +# set -e |
| 4 | + |
| 5 | +# Check if the required arguments are provided |
| 6 | +if [[ $# -lt 6 ]]; then |
| 7 | + echo "Usage: $0 <directory> <database_name> <table_name> <max_files> <success_log> <error_log>" |
| 8 | + exit 1 |
| 9 | +fi |
| 10 | + |
| 11 | +# Arguments |
| 12 | +DIRECTORY="$1" |
| 13 | +DIRECTORY=`realpath $DIRECTORY` |
| 14 | +DB_NAME="$2" |
| 15 | +TABLE_NAME="$3" |
| 16 | +MAX_FILES="$4" |
| 17 | +SUCCESS_LOG="$5" |
| 18 | +ERROR_LOG="$6" |
| 19 | +PSQL_CMD="$HOLOGRES_PSQL -d $DB_NAME" |
| 20 | + |
| 21 | +FORCE_REPROCESS=0 |
| 22 | +SAVE_INTO_CACHE=1 |
| 23 | +CACHE_DIR=${DIRECTORY}/cleaned |
| 24 | + |
| 25 | +# Validate that MAX_FILES is a number |
| 26 | +if ! [[ "$MAX_FILES" =~ ^[0-9]+$ ]]; then |
| 27 | + echo "Error: <max_files> must be a positive integer." |
| 28 | + exit 1 |
| 29 | +fi |
| 30 | + |
| 31 | +echo "[$(date '+%Y-%m-%d %H:%M:%S')] $(basename "$0") START" |
| 32 | + |
| 33 | +# Ensure the log files exist |
| 34 | +touch "$SUCCESS_LOG" "$ERROR_LOG" |
| 35 | +echo "SUCCESS_LOG $SUCCESS_LOG" |
| 36 | +echo "ERROR_LOG $ERROR_LOG" |
| 37 | + |
| 38 | +echo "---------------------------" |
| 39 | +echo "FORCE_REPROCESS=$FORCE_REPROCESS" |
| 40 | +echo "SAVE_INTO_CACHE=$SAVE_INTO_CACHE" |
| 41 | +echo "CACHE_DIR=$CACHE_DIR" |
| 42 | +echo "---------------------------" |
| 43 | + |
| 44 | +# Create a temporary directory in /var/tmp and ensure it's accessible |
| 45 | +TEMP_DIR=$(mktemp -d /var/tmp/cleaned_files.XXXXXX) |
| 46 | +chmod 777 "$TEMP_DIR" # Allow access for all users |
| 47 | +trap "rm -rf $TEMP_DIR" EXIT # Ensure cleanup on script exit |
| 48 | + |
| 49 | +# Counter to track processed files |
| 50 | +counter=0 |
| 51 | + |
| 52 | +# Loop through each .json.gz file in the directory |
| 53 | +for file in $(ls "$DIRECTORY"/*.json.gz | sort); do |
| 54 | + if [[ -f "$file" ]]; then |
| 55 | + |
| 56 | + echo "[$(date '+%Y-%m-%d %H:%M:%S')] Processing $file ..." |
| 57 | + counter=$((counter + 1)) |
| 58 | + |
| 59 | + filename=$(basename "$file" .gz) # e.g., data.json |
| 60 | + cleaned_basename="${filename%.json}_cleaned.json" # e.g., data_cleaned.json |
| 61 | + |
| 62 | + # 定义缓存文件路径(最终保存位置) |
| 63 | + cached_file=`realpath $CACHE_DIR/$cleaned_basename` |
| 64 | + |
| 65 | + # 如果缓存文件已经存在,就不再处理 |
| 66 | + if [[ -f "$cached_file" && "$FORCE_REPROCESS" == 0 ]]; then |
| 67 | + echo "[$(date '+%Y-%m-%d %H:%M:%S')] Cached file exists: $cached_file - skipping processing." |
| 68 | + cleaned_file="$cached_file" |
| 69 | + else |
| 70 | + # Uncompress the file into the temporary directory |
| 71 | + uncompressed_file="$TEMP_DIR/$filename" |
| 72 | + echo "[$(date '+%Y-%m-%d %H:%M:%S')] gunzip: $file ..." |
| 73 | + gunzip -c "$file" > "$uncompressed_file" |
| 74 | + |
| 75 | + # Check if uncompression was successful |
| 76 | + if [[ $? -ne 0 ]]; then |
| 77 | + echo "[$(date '+%Y-%m-%d %H:%M:%S')] Failed to uncompress $file." | tee -a "$ERROR_LOG" |
| 78 | + continue |
| 79 | + fi |
| 80 | + echo "[$(date '+%Y-%m-%d %H:%M:%S')] gunzip done: $uncompressed_file" |
| 81 | + # head -n 1 "$uncompressed_file" |
| 82 | + |
| 83 | + # Preprocess the file to remove null characters |
| 84 | + cleaned_file="$TEMP_DIR/$(basename "${uncompressed_file%.json}_cleaned.json")" |
| 85 | + cleaned_file_realpath=`realpath $cleaned_file` |
| 86 | + # sed 's/\\u0000//g' "$uncompressed_file" > "$cleaned_file" |
| 87 | + # 将跨越两行的 JSON 合并为一行(可以使导入成功率超过 99% ) |
| 88 | + sed 's/\\u0000//g' "$uncompressed_file" | awk 'NR == 1 { printf "%s", $0; next } /^{/ { printf "\n%s", $0; next } { printf "%s", $0 } END { print "" }' > "$cleaned_file" |
| 89 | + |
| 90 | + # head -n 1 "$cleaned_file" |
| 91 | + |
| 92 | + # Grant read permissions for the postgres user |
| 93 | + chmod 644 "$cleaned_file" |
| 94 | + |
| 95 | + if [[ "$SAVE_INTO_CACHE" != 0 ]]; then |
| 96 | + # 将 clean 后的文件保存到指定目录作为缓存 |
| 97 | + mkdir -p "$CACHE_DIR" |
| 98 | + cp "$cleaned_file" "$cached_file" |
| 99 | + echo "[$(date '+%Y-%m-%d %H:%M:%S')] Saved cleaned file to cache: `realpath $cached_file`" |
| 100 | + fi |
| 101 | + fi |
| 102 | + |
| 103 | + # cp "$cleaned_file" /tmp/1.json |
| 104 | + echo `wc -l $cleaned_file` |
| 105 | + |
| 106 | + echo "[$(date '+%Y-%m-%d %H:%M:%S')] Start importing $cleaned_file into Hologres." | tee -a "$SUCCESS_LOG" |
| 107 | + |
| 108 | + max_retries=3 |
| 109 | + timeout_seconds=90 |
| 110 | + attempt=1 |
| 111 | + |
| 112 | + # Import the cleaned JSON file into Hologres |
| 113 | + |
| 114 | + until [ $attempt -gt $max_retries ]; do |
| 115 | + echo "($attempt) Try to copy data ..." |
| 116 | + timeout $timeout_seconds $PSQL_CMD -c "\COPY $TABLE_NAME FROM '$cleaned_file' WITH (format csv, quote e'\x01', delimiter e'\x02', escape e'\x01');" |
| 117 | + |
| 118 | + import_status=$? |
| 119 | + |
| 120 | + if [ $import_status -ne 124 ]; then |
| 121 | + break |
| 122 | + fi |
| 123 | + |
| 124 | + attempt=$((attempt + 1)) |
| 125 | + sleep 1 |
| 126 | + done |
| 127 | + |
| 128 | + # Check if the import was successful |
| 129 | + if [[ $import_status -eq 0 ]]; then |
| 130 | + echo "[$(date '+%Y-%m-%d %H:%M:%S')] Successfully imported $cleaned_file into Hologres." | tee -a "$SUCCESS_LOG" |
| 131 | + # Delete both the uncompressed and cleaned files after successful processing |
| 132 | + rm -f "$uncompressed_file" "$cleaned_file_realpath" |
| 133 | + else |
| 134 | + echo "[$(date '+%Y-%m-%d %H:%M:%S')] Failed to import $cleaned_file. See errors above." | tee -a "$ERROR_LOG" |
| 135 | + # Keep the files for debugging purposes |
| 136 | + fi |
| 137 | + |
| 138 | + # Stop processing if the max number of files is reached |
| 139 | + if [[ $counter -ge $MAX_FILES ]]; then |
| 140 | + echo "Processed maximum number of files: $MAX_FILES" |
| 141 | + break |
| 142 | + fi |
| 143 | + else |
| 144 | + echo "No .json.gz files found in the directory." |
| 145 | + fi |
| 146 | +done |
| 147 | + |
| 148 | +echo "[$(date '+%Y-%m-%d %H:%M:%S')] $(basename "$0") DONE" |
0 commit comments