Skip to content

Commit e9ad3cb

Browse files
committed
Create mod_cdr_main.py
1 parent b87216d commit e9ad3cb

File tree

1 file changed

+217
-0
lines changed

1 file changed

+217
-0
lines changed

mod_cdr_main.py

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
import csv, logging, os, shutil, time
2+
import mod_conf, mod_cdr_decode
3+
4+
from datetime import datetime
5+
from elasticsearch import Elasticsearch
6+
7+
logger = logging.getLogger(__name__)
8+
logger.setLevel(logging.DEBUG)
9+
10+
#Logger Console Handler
11+
ch = logging.StreamHandler() #StreamHandler logs to console
12+
ch.setLevel(logging.DEBUG)
13+
ch_format = logging.Formatter('%(asctime)s - %(message)s')
14+
ch.setFormatter(ch_format)
15+
logger.addHandler(ch)
16+
17+
#Logger File Handler
18+
fh = logging.FileHandler('/var/log/cdr_parser/{0}.log'.format(__name__))
19+
fh.setLevel(logging.INFO)
20+
fh_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)-8s - %(message)s')
21+
fh.setFormatter(fh_format)
22+
logger.addHandler(fh)
23+
24+
def initial_program_setup():
25+
print "initial_program_setup"
26+
27+
def do_main_program():
28+
29+
# by default connect to localhost:9200
30+
es = Elasticsearch()
31+
32+
for file in os.listdir(mod_conf.cdr_path):
33+
# Get pathes from config file
34+
src_file = os.path.join(mod_conf.cdr_path, file)
35+
dest_file = os.path.join(mod_conf.archive_path,file)
36+
37+
logger.debug("src_file = %s" % src_file)
38+
logger.debug("dest_file = %s" % dest_file)
39+
40+
# Check for configured type, if none use filename
41+
if mod_conf.es_type == "":
42+
es_type = file
43+
else:
44+
es_type = mod_conf.es_type
45+
46+
logger.debug("es_type = %s" % es_type)
47+
48+
try:
49+
shutil.move(src_file, mod_conf.archive_path)
50+
except:
51+
logger.warning("Error moving file to archive.")
52+
53+
time.sleep(.1)
54+
55+
if os.access(dest_file, os.W_OK):
56+
# Open CDR Log File
57+
csv_file = open(dest_file, 'rb')
58+
csv_read = csv.reader(csv_file, delimiter=',', quotechar='"')
59+
60+
# Read CDR header row
61+
csv_keys = next(csv_read)
62+
63+
# Skip CDR types row
64+
csv_types = next(csv_read)
65+
66+
# Process actual CDR rows
67+
for csv_line in csv_read:
68+
# Combine headers with data
69+
csv_zip = zip(csv_keys, csv_line)
70+
71+
# Check for empty lines
72+
if csv_zip:
73+
74+
es_body = {}
75+
76+
# Call Sequence Variables
77+
origNum = ""
78+
finNum = ""
79+
80+
for csv_data in csv_zip:
81+
# Decode cdrRecordType
82+
if csv_data[0] == "cdrRecordType":
83+
es_body[csv_data[0]] = mod_cdr_decode.decode_RecordType(val=csv_data[1])
84+
# Check for configured type, if none use cdr/cmr-YYYY.MM.DD
85+
if mod_conf.es_index == "":
86+
if csv_data[1] == "1":
87+
es_index = "cdr-%s" % datetime.utcnow().strftime("%Y.%m.%d")
88+
elif csv_data[1] == "2":
89+
es_index = "cmr-%s" % datetime.utcnow().strftime("%Y.%m.%d")
90+
else:
91+
es_index = "index_err"
92+
else:
93+
es_index = mod_conf.es_index
94+
# Decode dateTimeStamp
95+
elif csv_data[0] == "dateTimeStamp":
96+
es_body[csv_data[0]] = mod_cdr_decode.decode_Time(val=csv_data[1])
97+
# Build @timestamp
98+
es_body['@timestamp'] = mod_cdr_decode.decode_Time(val=csv_data[1])
99+
# Decode dateTimeOrigination
100+
elif csv_data[0] == "dateTimeOrigination":
101+
es_body[csv_data[0]] = mod_cdr_decode.decode_Time(val=csv_data[1])
102+
# Build @timestamp
103+
es_body['@timestamp'] = mod_cdr_decode.decode_Time(val=csv_data[1])
104+
# Decode origIpAddr
105+
elif csv_data[0] == "origIpAddr":
106+
es_body[csv_data[0]] = mod_cdr_decode.decode_IP(val=csv_data[1])
107+
# Decode origCause_location
108+
elif csv_data[0] == "origCause_location":
109+
es_body[csv_data[0]] = mod_cdr_decode.decode_TermCauseCode(val=csv_data[1])
110+
# Decode origCause_value
111+
elif csv_data[0] == "origCause_value":
112+
es_body[csv_data[0]] = mod_cdr_decode.decode_TermCauseCode(val=csv_data[1])
113+
# Decode origPrecedenceLevel
114+
elif csv_data[0] == "origPrecedenceLevel":
115+
es_body[csv_data[0]] = mod_cdr_decode.decode_PrecedenceLevel(val=csv_data[1])
116+
# Decode origMediaTransportAddress_IP
117+
elif csv_data[0] == "origMediaTransportAddress_IP":
118+
es_body[csv_data[0]] = mod_cdr_decode.decode_IP(val=csv_data[1])
119+
# Decode origMediaCap_payloadCapability
120+
elif csv_data[0] == "origMediaCap_payloadCapability":
121+
es_body[csv_data[0]] = mod_cdr_decode.decode_CodecType(val=csv_data[1])
122+
# Decode origVideoCap_Codec
123+
elif csv_data[0] == "origVideoCap_Codec":
124+
es_body[csv_data[0]] = mod_cdr_decode.decode_CodecType(val=csv_data[1])
125+
# Decode origVideoCap_Resolution
126+
elif csv_data[0] == "origVideoCap_Resolution":
127+
es_body[csv_data[0]] = mod_cdr_decode.decode_VideoRes(val=csv_data[1])
128+
# Decode origRSVPAudioStat
129+
elif csv_data[0] == "origRSVPAudioStat":
130+
es_body[csv_data[0]] = mod_cdr_decode.decode_RSVPStat(val=csv_data[1])
131+
# Decode origRSVPVideoStat
132+
elif csv_data[0] == "origRSVPVideoStat":
133+
es_body[csv_data[0]] = mod_cdr_decode.decode_RSVPStat(val=csv_data[1])
134+
# Decode destIpAddr
135+
elif csv_data[0] == "destIpAddr":
136+
es_body[csv_data[0]] = mod_cdr_decode.decode_IP(val=csv_data[1])
137+
# Decode destCause_location
138+
elif csv_data[0] == "destCause_location":
139+
es_body[csv_data[0]] = mod_cdr_decode.decode_TermCauseCode(val=csv_data[1])
140+
# Decode destCause_value
141+
elif csv_data[0] == "destCause_value":
142+
es_body[csv_data[0]] = mod_cdr_decode.decode_TermCauseCode(val=csv_data[1])
143+
# Decode destPrecedenceLevel
144+
elif csv_data[0] == "destCause_value":
145+
es_body[csv_data[0]] = mod_cdr_decode.decode_PrecedenceLevel(val=csv_data[1])
146+
# Decode destMediaTransportAddress_IP
147+
elif csv_data[0] == "destMediaTransportAddress_IP":
148+
es_body[csv_data[0]] = mod_cdr_decode.decode_IP(val=csv_data[1])
149+
# Decode destMediaCap_payloadCapability
150+
elif csv_data[0] == "destMediaCap_payloadCapability":
151+
es_body[csv_data[0]] = mod_cdr_decode.decode_CodecType(val=csv_data[1])
152+
# Decode destVideoCap_Codec
153+
elif csv_data[0] == "destVideoCap_Codec":
154+
es_body[csv_data[0]] = mod_cdr_decode.decode_CodecType(val=csv_data[1])
155+
# Decode destVideoCap_Resolution
156+
elif csv_data[0] == "destVideoCap_Resolution":
157+
es_body[csv_data[0]] = mod_cdr_decode.decode_VideoRes(val=csv_data[1])
158+
# Decode destVideoTransportAddressdest_IP
159+
elif csv_data[0] == "destVideoTransportAddress_IP":
160+
es_body[csv_data[0]] = mod_cdr_decode.decode_IP(val=csv_data[1])
161+
# Decode destRSVPAudioStat
162+
elif csv_data[0] == "destRSVPAudioStat":
163+
es_body[csv_data[0]] = mod_cdr_decode.decode_RSVPStat(val=csv_data[1])
164+
# Decode destRSVPVideoStat
165+
elif csv_data[0] == "destRSVPVideoStat":
166+
es_body[csv_data[0]] = mod_cdr_decode.decode_RSVPStat(val=csv_data[1])
167+
# Decode dateTimeConnect
168+
elif csv_data[0] == "dateTimeConnect":
169+
es_body[csv_data[0]] = mod_cdr_decode.decode_Time(val=csv_data[1])
170+
# Decode dateTimeDisconnect
171+
elif csv_data[0] == "dateTimeDisconnect":
172+
es_body[csv_data[0]] = mod_cdr_decode.decode_Time(val=csv_data[1])
173+
# Decode origDTMFMethod
174+
elif csv_data[0] == "origDTMFMethod":
175+
es_body[csv_data[0]] = mod_cdr_decode.decode_DTMFMethod(val=csv_data[1])
176+
# Decode destDTMFMethod
177+
elif csv_data[0] == "destDTMFMethod":
178+
es_body[csv_data[0]] = mod_cdr_decode.decode_DTMFMethod(val=csv_data[1])
179+
# Decode callSecuredStatus
180+
elif csv_data[0] == "callSecuredStatus":
181+
es_body[csv_data[0]] = mod_cdr_decode.decode_SecuredStatus(val=csv_data[1])
182+
# Link origLegCallIdentifier to callIdentifier
183+
elif csv_data[0] == "origLegCallIdentifier":
184+
es_body[csv_data[0]] = csv_data[1]
185+
es_body["callIdentifier"] = csv_data[1]
186+
# Decode duration
187+
elif csv_data[0] == "duration":
188+
es_body[csv_data[0]] = mod_cdr_decode.decode_duration(val=csv_data[1])
189+
# Save originalCalledPartyNumber for call sequence summary
190+
elif csv_data[0] == "originalCalledPartyNumber":
191+
origNum = csv_data[1]
192+
es_body[csv_data[0]] = csv_data[1]
193+
# Save finalCalledPartyNumber for call sequence summary
194+
elif csv_data[0] == "finalCalledPartyNumber":
195+
finNum = csv_data[1]
196+
es_body[csv_data[0]] = csv_data[1]
197+
# Write non-decoded values as is
198+
else:
199+
es_body[csv_data[0]] = csv_data[1]
200+
201+
# Build call sequence summary if data is present
202+
if not origNum == "":
203+
if origNum == finNum:
204+
es_body["Call Sequence"] = "Call to: " + origNum
205+
else:
206+
es_body["Call Sequence"] = "Call to: " + origNum + " Forwarded to: " + finNum
207+
208+
# Send CDR to ElasticSearch
209+
logger.debug(es.index(index=es_index,doc_type=es_type,body=es_body))
210+
211+
time.sleep(.1)
212+
213+
def program_cleanup():
214+
print "program_cleanup - not implemented"
215+
216+
def reload_program_config():
217+
print "reload_program_config - not implemented"

0 commit comments

Comments
 (0)