3333 def typechecked (func = None ): # noqa: WPS440
3434 """Skip runtime type checking on the function arguments."""
3535 return func
36+ else :
37+ from types import MappingProxyType # noqa: WPS433 pylint: disable=unused-import
38+
3639
3740logger = logging .getLogger (__name__ )
3841logger .setLevel (logging .INFO )
39- logger .addHandler (logging .StreamHandler (sys .stdout ))
42+ logs = logging .StreamHandler (sys .stdout )
43+ logger .addHandler (logs )
44+
4045
4146#######################################################################
4247# F U N C T I O N S #
4348#######################################################################
4449
4550
4651@typechecked
47- def process (dataframe : pd .core .frame .DataFrame , filename : str , sensor : str ):
52+ def process_chunk_rows (
53+ dataframe : pd .core .frame .DataFrame ,
54+ filename : str ,
55+ sensor : str ,
56+ ):
4857 """Split sensors data to separate file and sort it.
4958
5059 Args:
@@ -72,11 +81,29 @@ def process(dataframe: pd.core.frame.DataFrame, filename: str, sensor: str):
7281 )
7382
7483
84+ @typechecked
85+ def remove_duplicate_rows (filename : str , extention : str = '.csv' ):
86+ """Remove duplicate rows from provided file.
87+
88+ Args:
89+ filename: (str) Filename of processed file.
90+ extention (str): File extention. Default to ".csv"
91+ """
92+ with open (f'data/csv/{ filename } { extention } ' , 'r+' ) as csv_file :
93+ # Get unique rows
94+ lines = set (csv_file .readlines ())
95+ # Cleanup file
96+ csv_file .seek (0 )
97+ csv_file .truncate ()
98+ # Write unique rows
99+ csv_file .writelines (lines )
100+
101+
75102@typechecked # noqa: WPS211
76103def write_influx_data ( # pylint: disable=too-many-arguments
77104 filename : str ,
78105 sensor_name_for_user : str ,
79- date ,
106+ date : int ,
80107 concentration : float ,
81108 device_id : str ,
82109 aqi : int = None ,
@@ -92,8 +119,6 @@ def write_influx_data( # pylint: disable=too-many-arguments
92119 aqi: (int) Air Quality Index. Default to None.
93120 """
94121 with open (f'data/influx/{ filename } .influx' , mode = 'a' ) as influx_file :
95- date = datetime .strptime (date , '%Y-%m-%d %H:%M:%S' ).timetuple () # noqa: WPS323
96- date = int (time .mktime (date ) * 10 ** 9 )
97122
98123 if aqi is None :
99124 influx_file .write (
@@ -121,6 +146,56 @@ def find_csv_filenames(path_to_dir: str, suffix: str = '.csv'):
121146 return [filename for filename in filenames if filename .endswith (suffix )]
122147
123148
149+ @typechecked
150+ def calculate_aqi (aqi : 'MappingProxyType[str, dict]' , sensor : str , concentration : float ) -> int :
151+ """Calculate Air Quality Index.
152+
153+ Calculations based on:
154+ https://www.airnow.gov/sites/default/files/2018-05/aqi-technical-assistance-document-may2016.pdf
155+
156+ Args:
157+ aqi: (MappingProxyType[str, dict]) Nested dictionary with values for AQI calculation.
158+ sensor: (str) Sensor name for which it will AQI count.
159+ concentration: (float) Raw data from sensor.
160+
161+ Returns:
162+ int: Air Quality Index value.
163+
164+ """
165+ for upper_bound , _ in aqi [sensor ].items ():
166+ if concentration < float (upper_bound ):
167+ aqi_value = (
168+ (_ ['aqi_high' ] - _ ['aqi_low' ])
169+ / (_ ['pollutant_high' ] - _ ['pollutant_low' ])
170+ * (concentration - _ ['pollutant_low' ])
171+ + _ ['aqi_low' ]
172+ )
173+ break
174+
175+ return round (aqi_value )
176+
177+
178+ @typechecked
179+ def transform_date_to_nanoseconds (date ) -> int :
180+ """Get date from string and return it in UNIX nanoseconds format.
181+
182+ Args:
183+ date: (str) Datetime string in `%Y-%m-%d %H:%M:%S` format.
184+
185+ Returns:
186+ int: Date in UNIX nanoseconds.
187+ """
188+ date = datetime .strptime (date , '%Y-%m-%d %H:%M:%S' ).timetuple () # noqa: WPS323
189+ date = time .mktime (date ) * 10 ** 9
190+
191+ return int (date )
192+
193+
194+ #######################################################################
195+ # M A I N #
196+ #######################################################################
197+
198+
124199@typechecked
125200def main () -> None :
126201 """Logic."""
@@ -138,11 +213,22 @@ def main() -> None:
138213 for filename in files :
139214
140215 for sensor , human_readable_sensor_name in SENSORS .items ():
141- sensor_file = f'{ filename } -{ sensor } '
216+
217+ logs .setFormatter (
218+ logging .Formatter (
219+ '\n {asctime} - {message}' , datefmt = '%H:%M:%S' , style = '{' ,
220+ ),
221+ )
142222 logger .info (
143- f'\n { time .strftime ("%H:%M:%S" )} - ' +
144223 f'Start work on "{ human_readable_sensor_name } " sensor data from { filename } ' ,
145224 )
225+ logs .setFormatter (
226+ logging .Formatter (
227+ '{asctime} ----- {message}' , datefmt = '%H:%M:%S' , style = '{' ,
228+ ),
229+ )
230+
231+ sensor_file = f'{ filename } -{ sensor } '
146232
147233 #
148234 # Split sensors data to separate file and sort it
@@ -158,20 +244,16 @@ def main() -> None:
158244 dtype = str ,
159245 )
160246 for chunk in pandas_csv :
161- logger .info (f'{ time . strftime ( "%H:%M:%S" ) } ----- Proccess chunk rows: { CHUNKSIZE } ' )
162- process (chunk , sensor_file , sensor )
247+ logger .info (f'Proccess chunk rows: { CHUNKSIZE } ' )
248+ process_chunk_rows (chunk , sensor_file , sensor )
163249
164- # Save uniq rows
165- logger .info (f'{ time .strftime ("%H:%M:%S" )} ----- Get unique rows' )
166- with open (f'data/csv/{ sensor_file } .csv' , 'r' ) as csv_file :
167- lines = set (csv_file .readlines ())
168- with open (f'data/csv/{ sensor_file } .csv' , 'w' ) as csv_file : # noqa: WPS440
169- csv_file .writelines (lines ) # noqa: WPS441
250+ logger .info ('Get unique rows' )
251+ remove_duplicate_rows (sensor_file )
170252
171253 #
172254 # Get data for Influx
173255 #
174- logger .info (f' { time . strftime ( "%H:%M:%S" ) } ----- Transform data for Database format' )
256+ logger .info (' Transform data for Database format' )
175257
176258 # Cleanup previous data
177259 with open (f'data/influx/{ sensor_file } .influx' , 'w' ) as influx_file :
@@ -184,46 +266,38 @@ def main() -> None:
184266
185267""" )
186268
187- with open (f'data/csv/{ sensor_file } .csv' , mode = 'r' ) as csv_file : # noqa: WPS440
188- csv_reader = csv .reader (csv_file , delimiter = ',' ) # noqa: WPS441
269+ with open (f'data/csv/{ sensor_file } .csv' , mode = 'r' ) as csv_file :
270+ csv_reader = csv .reader (csv_file , delimiter = ',' )
189271
190- for row in csv_reader :
191- device_id = row [0 ]
192- date = row [1 ]
193- concentration = round (float (row [2 ]), 1 )
272+ if sensor not in AQI :
273+ for row in csv_reader :
274+ device_id = row [0 ]
275+ date = transform_date_to_nanoseconds (row [1 ])
276+ concentration = round (float (row [2 ]), 1 )
194277
195- if sensor not in AQI :
196278 write_influx_data (
197279 sensor_file ,
198280 human_readable_sensor_name ,
199281 date ,
200282 concentration ,
201283 device_id ,
202284 )
203- continue
204-
205- #
206- # CALCULATING THE AQI
207- #
208-
209- for upper_bound , _ in AQI [sensor ].items ():
210- if concentration < float (upper_bound ):
211- aqi_value = (
212- (_ ['aqi_high' ] - _ ['aqi_low' ])
213- / (_ ['pollutant_high' ] - _ ['pollutant_low' ])
214- * (concentration - _ ['pollutant_low' ])
215- + _ ['aqi_low' ]
216- )
217- aqi_value = round (aqi_value )
218- break
285+ continue
286+
287+ for row in csv_reader : # noqa: WPS440
288+ device_id = row [0 ] # noqa: WPS441
289+ date = transform_date_to_nanoseconds (row [1 ]) # noqa: WPS441
290+ concentration = round (float (row [2 ]), 1 ) # noqa: WPS441
291+
292+ aqi = calculate_aqi (AQI , sensor , concentration )
219293
220294 write_influx_data (
221295 sensor_file ,
222296 human_readable_sensor_name ,
223297 date ,
224298 concentration ,
225299 device_id ,
226- aqi_value ,
300+ aqi ,
227301 )
228302
229303
0 commit comments