1+ """
2+ * @file data_parser.py
3+ * @brief Mavlink-Router Data parser,
4+ * This Program is writing Metrics for Promepheus / Grafana in a textfile
5+ * The Grafana-agent installed on the system redirects the periodic Poll to the Textfile generated by this program
6+ * @author Oemer Yilmaz <yilmaz@consider-it.de>
7+ * @copyright (c) Consider-IT, 2022
8+ """
9+
10+ import sys
11+ import time
12+ import re
13+ from enum import Enum
14+
15+ # ============== Metric definitions for grafana ==================
16+ METRIC_REC_CRCERR_CNT = "mavlinkrouter_receive_crcerror_count"
17+ METRIC_REC_CRCERR_PCT = "mavlinkrouter_receive_crcerror_percent"
18+ METRIC_REC_CRCERR_KB = "mavlinkrouter_receive_crcerror_kilo_byte"
19+ METRIC_REC_SEQLOST_CNT = "mavlinkrouter_receive_seqlost_count"
20+ METRIC_REC_SEQLOST_PCT = "mavlinkrouter_receive_seqlost_percent"
21+ METRIC_REC_HANDLED_CNT = "mavlinkrouter_receive_handled_count"
22+ METRIC_REC_HANDLED_KB = "mavlinkrouter_receive_handled_kilo_byte"
23+ METRIC_REC_TOTAL_CNT = "mavlinkrouter_receive_total_count"
24+ METRIC_TRANSM_TOTAL_CNT = "mavlinkrouter_transmit_total_count"
25+ METRIC_TRANSM_TOTAL_KB = "mavlinkrouter_transmit_total_kilo_byte"
26+
27+
28+ # ================== File paths ======================
29+ PROM_FILE_PATH = "/var/local/exporter.prom"
30+ CACHE_FILE_PATH = "/var/local/cache.prom"
31+
32+
33+ # =============== State machine states ==========================
34+ class state (Enum ):
35+ IDLE = 0
36+ EVAL_RECEIVED = 1
37+ EVAL_RECEIVED_CRC_ERR = 2
38+ EVAL_RECEIVED_SEQ_LOST = 3
39+ EVAL_RECEIVED_HANDLED = 4
40+ EVAL_RECEIVED_TOTAL = 5
41+ EVAL_TRANSMITTED = 6
42+ EVAL_TRANSMITTED_TOTAL = 7
43+ SEND_INFO = 8
44+
45+
46+ # initialize states
47+ currentState = state .IDLE
48+ nextState = state .IDLE
49+ # timestamp (in seconds.. as float). initially set to float max,
50+ # because lastUpdateWasOverASecondAgo() shouldnt be triggered in the first iterataiton
51+ lastTimeStamp = sys .float_info .max
52+ deviceID = "DEFAULT"
53+ deviceName = "DEFAULT"
54+ deviceConnType = "DEFAULT"
55+ # this cache file is a textfile where the single lines (data) are written successively.
56+ cacheFile = open (CACHE_FILE_PATH , 'w+' )
57+
58+
59+ # Writes a line to the cache textfile
60+ def writeDataToCacheTextfile (dataAsString ):
61+ cacheFile .write (dataAsString )
62+
63+
64+ # The prom file needs to be complete in order to ensure integrity.
65+ # thats why single lines where written to the cache file successively and
66+ # only once the data is complete, the content is written to the prom file as a whole
67+ def updatePROMTextFile ():
68+ global cacheFile
69+ cacheFile .close () # close it for safety
70+ cacheFile = open (CACHE_FILE_PATH , 'r' ) # reopen to (only) read
71+ promFile = open (PROM_FILE_PATH , 'w+' ) # open prom file
72+ for line in cacheFile .readlines (): # transfer all data from cache to prom file
73+ promFile .write (line )
74+ promFile .close () # save/close prom file
75+ cacheFile .close () # close cache file
76+ open (CACHE_FILE_PATH , 'w' ).close () # clear cache file
77+ # reopens file, because its needed to be written again
78+ cacheFile = open (CACHE_FILE_PATH , 'w+' )
79+
80+
81+ # Checks, if the last data / update was over 0.9 sec ago
82+ def lastUpdateWasOverASecondAgo (timeStamp ):
83+ if (timeStamp < (time .time ()- 0.9 )):
84+ return True
85+ else :
86+ return False
87+
88+
89+ # Reads device id out of the string
90+ def readDevID (inputStr ):
91+ start = inputStr .find ("[" ) + 1
92+ end = inputStr .find ("]" , start , len (inputStr ))
93+ devID = inputStr [start :end ]
94+ return devID
95+
96+
97+ # Reads device name out of the string
98+ def readDevName (inputStr ):
99+ start = inputStr .find ("]" ) + 1
100+ end = inputStr .find ("{" , start , len (inputStr )) - 1
101+ devName = inputStr [start :end ]
102+ return devName
103+
104+
105+ # Writes a metric to the cache file with the according device name, conn type and so on
106+ def writeMetric (metricStr , value ):
107+ writeDataToCacheTextfile (
108+ metricStr + "{endpoint_name=\" " + deviceName + "\" ,conn_type=\" " + deviceConnType + "\" ,endpoint_id=\" " + deviceID + "\" } " + str (value ) + "\n " )
109+
110+
111+ # inf. loop
112+ while True :
113+ for line in sys .stdin : # goes through the input line by line in an infinite loop
114+
115+ sys .stdout .write (line )
116+
117+ if (line .find ("TCP Endpoint" ) != - 1 ):
118+ deviceConnType = "TCP"
119+ deviceName = readDevName (line )
120+ deviceID = readDevID (line )
121+ if (lastUpdateWasOverASecondAgo (lastTimeStamp )):
122+ updatePROMTextFile ()
123+ nextState = state .EVAL_RECEIVED
124+
125+ elif (line .find ("UDP Endpoint" ) != - 1 ):
126+ deviceConnType = "UDP"
127+ deviceName = readDevName (line )
128+ deviceID = readDevID (line )
129+ if (lastUpdateWasOverASecondAgo (lastTimeStamp )):
130+ updatePROMTextFile ()
131+ nextState = state .EVAL_RECEIVED
132+
133+ elif (line .find ("UART Endpoint" ) != - 1 ):
134+ deviceConnType = "UART"
135+ deviceName = readDevName (line )
136+ deviceID = readDevID (line )
137+ if (lastUpdateWasOverASecondAgo (lastTimeStamp )):
138+ updatePROMTextFile ()
139+ nextState = state .EVAL_RECEIVED
140+
141+ # ========= pseudo Switch case for state machine ==========
142+ if (currentState == state .IDLE ):
143+ pass
144+ elif (currentState == state .EVAL_RECEIVED ):
145+ nextState = state .EVAL_RECEIVED_CRC_ERR
146+
147+ elif (currentState == state .EVAL_RECEIVED_CRC_ERR ):
148+ if (line .find ("CRC error" ) != - 1 ):
149+ # regex to find the numbers within the line
150+ digits = re .findall (r"\d+" , line )
151+ writeMetric (METRIC_REC_CRCERR_CNT , digits [0 ])
152+ writeMetric (METRIC_REC_CRCERR_PCT , digits [1 ])
153+ writeMetric (METRIC_REC_CRCERR_KB , digits [2 ])
154+ nextState = state .EVAL_RECEIVED_SEQ_LOST
155+ else :
156+ pass
157+
158+ elif (currentState == state .EVAL_RECEIVED_SEQ_LOST ):
159+ if (line .find ("Sequence lost" ) != - 1 ):
160+ digits = re .findall (r"\d+" , line )
161+ writeMetric (METRIC_REC_SEQLOST_CNT , digits [0 ])
162+ writeMetric (METRIC_REC_SEQLOST_PCT , digits [1 ])
163+ nextState = state .EVAL_RECEIVED_HANDLED
164+ else :
165+ pass
166+
167+ elif (currentState == state .EVAL_RECEIVED_HANDLED ):
168+ if (line .find ("Handled" ) != - 1 ):
169+ digits = re .findall (r"\d+" , line )
170+ writeMetric (METRIC_REC_HANDLED_CNT , digits [0 ])
171+ writeMetric (METRIC_REC_HANDLED_KB , digits [1 ])
172+ nextState = state .EVAL_RECEIVED_TOTAL
173+ else :
174+ pass
175+
176+ elif (currentState == state .EVAL_RECEIVED_TOTAL ):
177+ if (line .find ("Total" ) != - 1 ):
178+ digits = re .findall (r"\d+" , line )
179+ writeMetric (METRIC_REC_TOTAL_CNT , digits [0 ])
180+ nextState = state .EVAL_TRANSMITTED
181+ else :
182+ pass
183+
184+ elif (currentState == state .EVAL_TRANSMITTED ):
185+ nextState = state .EVAL_TRANSMITTED_TOTAL
186+
187+ elif (currentState == state .EVAL_TRANSMITTED_TOTAL ):
188+ if (line .find ("Total" ) != - 1 ):
189+ digits = re .findall (r"\d+" , line )
190+ writeMetric (METRIC_TRANSM_TOTAL_CNT , digits [0 ])
191+ writeMetric (METRIC_TRANSM_TOTAL_KB , digits [1 ])
192+ nextState = state .SEND_INFO
193+ else :
194+ pass
195+
196+ elif (currentState == state .SEND_INFO ):
197+ nextState = state .IDLE
198+
199+ else :
200+ nextState = state .IDLE
201+
202+ currentState = nextState # update next state for the state machine
203+ lastTimeStamp = time .time () # update timestamp
0 commit comments