-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathimport_flow.sh
More file actions
executable file
·136 lines (124 loc) · 4.25 KB
/
import_flow.sh
File metadata and controls
executable file
·136 lines (124 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/bin/bash
# Author: Joshua Chen <iesugrace@gmail.com>
# Date: 2016-05-19
# Location: Shenzhen
# Desc: Temporary solution for the import monitor,
# it shall be retired when the new Access
# Statistics System is online.
#
# log the report state
logReport() {
local stat=$1 group=$2 api=$3 msg=$4 wd localLog ts
msg=$(xargs <<< "$msg")
wd=$(cd -P $(dirname $0); pwd)
localLog="$wd/data/sent_messages.log"
ts=$(date '+%F %T')
echo "$ts stat=$stat group=$group api=$api msg=$msg" >> "$localLog"
}
report() {
local groupName apiUrl msg
if test "$1" = "warn"; then
groupName="PLCDN-SUPPORT"
else
groupName="PLCDN-STATUS"
fi
apiUrl="http://push.plcdn.net:7890/20160128"
msg=$'Flow import:\n'
msg+="$2"$'\n'
msg+="Time: $(date +'%F %T')"
wget -q --header="To: $groupName" \
--post-data="$msg" "$apiUrl" \
-O /dev/null
logReport $? "$groupName" "$apiUrl" "$msg"
}
checkImporterStat() {
if test $1 -ne 0; then
msg="importer failed, code=$1"
report warn "$msg"
fi
}
# When failure occurred due to primary key conflict,
# the record files will be placed to a directory,
# we check if there is any new file in that directory.
checkImportFailure() {
failureList=$(find "$failureDir" -newerct "$lastTime" -name "countfile.*" -type f)
if test -n "$failureList"; then
n=$(wc -l <<< "$failureList")
amount=$(echo "$failureList" | xargs cat | awk '{a+=$5}END{print a/1024/1024/1024}')
msg=$'import failed:\n'
msg+="Number of files: $n"$'\n'
msg+="Amount of flow: ${amount}GB"$'\n'
msg+="File list:"$'\n'
msg+="$failureList"
report warn "$msg"
fi
}
# We check for the flow records only for now.
checkDelayedUpload() {
local missedNamePat='.TempFlowUploadInfo.??????????????'
local missedList missedRecordList
local n msg
missedList=$(find "$importerLogDir" -newerct "$lastTime" \
-name "$missedNamePat" \! -empty)
if test -n "$missedList"; then
missedRecordList=$(echo "$missedList" | xargs -r cat | sort -u)
missedRecordList=$(filterMissedRecord "$missedRecordList")
n=$(echo "$missedRecordList" | wc -l)
msg=$'delayed upload:\n'
msg+="Number of records: $n"$'\n'
msg+="Record list:"$'\n'
msg+="$missedRecordList"
report info "$msg"
fi
}
# remove the uploaded-missed records from the missed list
filterMissedRecord() {
local prefix="/data/back/flow"
local input=$1
local period1=1500 # 25 minutes, determine directories
local period2=1800 # 30 minutes, filter files
# 1. select the directories to search from,
# today or today and yesterday.
today=$(date '+%Y%m%d')
today_ts=$(date +%s -d $today)
now_ts=$(date +%s)
if test $((now_ts - today_ts)) -lt $period1; then
yesterday=$(date '+%Y%m%d' -d "$today -1day")
dirs="$prefix/$yesterday $prefix/$today"
else
dirs="$prefix/$today"
fi
# 2. build the find arguments
timearg=$(date '+%Y-%m-%d %H:%M' -d "-$period2 seconds")
node_nums=$(awk '{print $2}' <<< "$input" | sort -u)
find_args=
while read node
do
find_args+="-name 'countfile.*.$node' -o "
done <<< "$node_nums"
find_args=$(sed -r 's/ -o $//' <<< "$find_args")
find_args="'(' $find_args ')' -newerct '$timearg'"
# 3. match file name pattern and time for each node,
# concatenate all found files' field 2 and field 3.
tmpfile=$(mktemp)
eval find $dirs $find_args | xargs -r awk '{print $2, $3}' | sort -u > $tmpfile
# 4. remove all that are in the 'uploaded' list
result=$(diff <(sort <<< "$input") $tmpfile | sed -r -n '/^</s/^..//p')
rm -f $tmpfile
echo "$result"
}
progDir=$(cd $(dirname $0); pwd)
bin_importer="$progDir/bin_importer"
bin_importer_conf="$progDir/bin_importer.conf"
timeLog="$progDir/time.log"
failureDir="/data/import_failure"
importerLogDir="/data/program/flow/new_importer/log"
importType=1 # 1 is for importing flow records
$bin_importer $bin_importer_conf $importType
stat=$?
lastTime=$(cat $timeLog)
test -z "$lastTime" && lastTime="1970-01-01"
checkImporterStat $stat
checkImportFailure
checkDelayedUpload
date '+%Y-%m-%d %H:%M:%S' > "$timeLog"