Skip to content

Commit 338e8ba

Browse files
committed
Speedup data-transformer-app and refactor
Now pass (or skip) all exist pre-commit tests
1 parent 96bb64a commit 338e8ba

File tree

2 files changed

+38
-71
lines changed

2 files changed

+38
-71
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,9 @@ docker run -v "$PWD"/data/influx/:/influx-data/ --rm --network=eco-data-visualiz
6565
<!-- markdownlint-disable no-inline-html -->
6666
><sup>Depending on your internet bandwidth, CPU, Storage I/O, CSV file size and number of processed files `First Init` may take different times.
6767
For example, in laptop with `100Mbit/s` bandwidth, `Intel Core i7-8550U` (max clock speed `4Ghz`), SSD disk and:</sup>
68-
<sup> - 2 CSV files (together: 620MB) it takes `11m47s` (`9m39s` to transform data)</sup>
69-
<sup> - 1 CSV file (513MB) - `6m16s` (`4m18s` to transform data)</sup>
70-
<sup> - 1 CSV file (107MB) - `6m35s` (`4m32s` to transform data)</sup>
68+
<sup> - 2 CSV files (together: 620MB) it takes `8m11s` (`6m09s` to transform data)</sup>
69+
<sup> - 1 CSV file (513MB) - `5m27s` (`3m47s` to transform data)</sup>
70+
<sup> - 1 CSV file (107MB) - `3m42s` (`2m` to transform data)</sup>
7171
<!-- markdownlint-enable no-inline-html -->
7272
7373
5. Open [http://localhost/](http://localhost/) for see visualizations!
@@ -160,7 +160,7 @@ docker build --build-arg ENV=dev -t data-transformer ./data-transformer-app
160160
* [ ] Add AQI support for all specified in [doc](https://www.airnow.gov/sites/default/files/2018-05/aqi-technical-assistance-document-may2016.pdf)
161161
* [ ] Optimize `data-transformer-app`
162162
* [ ] Parallel sensors operation execution
163-
* [ ] Use less Disk I/O operations
163+
* [x] Use less Disk I/O operations
164164

165165
<!-- markdownlint-disable no-trailing-punctuation -->
166166
## Want help?

data-transformer-app/main.py

Lines changed: 34 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,8 @@
2121
from datetime import datetime
2222
from os import listdir
2323

24+
import configs as conf
2425
import pandas as pd
25-
from configs import AQI
26-
from configs import CHUNKSIZE
27-
from configs import PATH
28-
from configs import SENSORS
2926

3027
try:
3128
from typeguard import typechecked # noqa: WPS433
@@ -99,37 +96,16 @@ def remove_duplicate_rows(filename: str, extention: str = '.csv'):
9996
csv_file.writelines(lines)
10097

10198

102-
@typechecked # noqa: WPS211
103-
def write_influx_data( # pylint: disable=too-many-arguments
104-
filename: str,
105-
sensor_name_for_user: str,
106-
date: int,
107-
concentration: float,
108-
device_id: str,
109-
aqi: int = None,
110-
):
99+
@typechecked
100+
def write_influx_data(filename: str, collection: set):
111101
"""Append file with data in InfluxDB format.
112102
113103
Args:
114104
filename: (str) Filename.
115-
sensor_name_for_user: (str) Human readable sensor name.
116-
date: (str) Datetime string in `%Y-%m-%d %H:%M:%S` format.
117-
concentration: (float) Sensor value at `date`.
118-
device_id: (str) SaveEcoBot Device ID where this sensor installed.
119-
aqi: (int) Air Quality Index. Default to None.
105+
collection: (set) Data for file append.
120106
"""
121107
with open(f'data/influx/{filename}.influx', mode='a') as influx_file:
122-
123-
if aqi is None:
124-
influx_file.write(
125-
f'{sensor_name_for_user},device_id={device_id},have_aqi=false '
126-
+ f'concentration={concentration} {date}\n',
127-
)
128-
else:
129-
influx_file.write(
130-
f'{sensor_name_for_user},device_id={device_id},have_aqi=true '
131-
+ f'aqi={aqi},concentration={concentration} {date}\n',
132-
)
108+
influx_file.writelines(element for element in collection)
133109

134110

135111
@typechecked
@@ -196,10 +172,10 @@ def transform_date_to_nanoseconds(date) -> int:
196172
#######################################################################
197173

198174

199-
@typechecked
200-
def main() -> None:
175+
@typechecked # noqa: WPS210, WPS213, WPS231
176+
def main() -> None: # pylint: disable=R0914
201177
"""Logic."""
202-
files = find_csv_filenames(PATH)
178+
files = find_csv_filenames(conf.PATH)
203179

204180
if not files:
205181
logger.error( # pylint: disable=logging-not-lazy
@@ -211,8 +187,7 @@ def main() -> None:
211187
logger.info(f'Found next files: {files}')
212188

213189
for filename in files:
214-
215-
for sensor, human_readable_sensor_name in SENSORS.items():
190+
for sensor, human_readable_sensor_name in conf.SENSORS.items():
216191

217192
logs.setFormatter(
218193
logging.Formatter(
@@ -238,13 +213,13 @@ def main() -> None:
238213
open(f'data/csv/{sensor_file}.csv', 'w').close() # noqa: WPS515
239214

240215
pandas_csv = pd.read_csv(
241-
f'{PATH}/{filename}',
242-
chunksize=CHUNKSIZE,
216+
f'{conf.PATH}/{filename}',
217+
chunksize=conf.CHUNKSIZE,
243218
delimiter=',',
244219
dtype=str,
245220
)
246221
for chunk in pandas_csv:
247-
logger.info(f'Proccess chunk rows: {CHUNKSIZE}')
222+
logger.info(f'Proccess chunk rows: {conf.CHUNKSIZE}')
248223
process_chunk_rows(chunk, sensor_file, sensor)
249224

250225
logger.info('Get unique rows')
@@ -253,7 +228,7 @@ def main() -> None:
253228
#
254229
# Get data for Influx
255230
#
256-
logger.info('Transform data for Database format')
231+
logger.info('Transform data to Database format')
257232

258233
# Cleanup previous data
259234
with open(f'data/influx/{sensor_file}.influx', 'w') as influx_file:
@@ -266,39 +241,31 @@ def main() -> None:
266241
267242
""")
268243

244+
influx_data = set()
245+
can_calculate_aqi = sensor in conf.AQI
246+
269247
with open(f'data/csv/{sensor_file}.csv', mode='r') as csv_file:
270248
csv_reader = csv.reader(csv_file, delimiter=',')
271249

272-
if sensor not in AQI:
273-
for row in csv_reader:
274-
device_id = row[0]
275-
date = transform_date_to_nanoseconds(row[1])
276-
concentration = round(float(row[2]), 1)
277-
278-
write_influx_data(
279-
sensor_file,
280-
human_readable_sensor_name,
281-
date,
282-
concentration,
283-
device_id,
250+
for row in csv_reader:
251+
device_id = row[0]
252+
date = transform_date_to_nanoseconds(row[1])
253+
concentration = round(float(row[2]), 1)
254+
255+
if can_calculate_aqi:
256+
aqi = calculate_aqi(conf.AQI, sensor, concentration) # noqa: WPS220
257+
258+
influx_data.add( # noqa: WPS220
259+
f'{human_readable_sensor_name},device_id={device_id},have_aqi=true '
260+
+ f'aqi={aqi},concentration={concentration} {date}\n',
261+
)
262+
else:
263+
influx_data.add( # noqa: WPS220
264+
f'{human_readable_sensor_name},device_id={device_id},have_aqi=false '
265+
+ f'concentration={concentration} {date}\n',
284266
)
285-
continue
286-
287-
for row in csv_reader: # noqa: WPS440
288-
device_id = row[0] # noqa: WPS441
289-
date = transform_date_to_nanoseconds(row[1]) # noqa: WPS441
290-
concentration = round(float(row[2]), 1) # noqa: WPS441
291-
292-
aqi = calculate_aqi(AQI, sensor, concentration)
293-
294-
write_influx_data(
295-
sensor_file,
296-
human_readable_sensor_name,
297-
date,
298-
concentration,
299-
device_id,
300-
aqi,
301-
)
267+
268+
write_influx_data(sensor_file, influx_data)
302269

303270

304271
if __name__ == '__main__':

0 commit comments

Comments
 (0)