Skip to content

Commit d8c9287

Browse files
committed
compress files for export with gzip #451
1 parent 60d00d1 commit d8c9287

File tree

3 files changed

+100
-20
lines changed

3 files changed

+100
-20
lines changed

mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import datetime
22
import os
33
from typing import List
4+
import gzip
5+
import io
46

57
import pandas as pd
68
from psycopg2 import sql
@@ -31,10 +33,18 @@ def write_sql_to_csv(filename: str, sql_query: sql.SQL):
3133
Use the copy statement to write data from postgres to a csv file.
3234
"""
3335

36+
temp_file = "temp.csv"
3437
pg_db = auth.postgresDB()
35-
with open(filename, "w") as f:
38+
with open(temp_file, "w") as f:
3639
pg_db.copy_expert(sql_query, f)
37-
logger.info(f"wrote csv file from sql: {filename}")
40+
41+
with open(temp_file, 'rb') as f_in, gzip.open(filename, 'wb') as f_out:
42+
f_out.writelines(f_in)
43+
44+
# remove temp file
45+
os.remove(temp_file)
46+
47+
logger.info(f"wrote gzipped csv file from sql: {filename}")
3848

3949

4050
def load_df_from_csv(filename: str) -> pd.DataFrame:
@@ -44,7 +54,11 @@ def load_df_from_csv(filename: str) -> pd.DataFrame:
4454
"""
4555
dtype_dict = {"project_id": str, "group_id": str, "task_id": str}
4656

47-
df = pd.read_csv(filename, dtype=dtype_dict)
57+
df = pd.read_csv(
58+
filename,
59+
dtype=dtype_dict,
60+
compression="gzip"
61+
)
4862
logger.info(f"loaded pandas df from {filename}")
4963
return df
5064

@@ -322,11 +336,11 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
322336
"""
323337

324338
# set filenames
325-
results_filename = f"{DATA_PATH}/api/results/results_{project_id}.csv"
326-
tasks_filename = f"{DATA_PATH}/api/tasks/tasks_{project_id}.csv"
327-
groups_filename = f"{DATA_PATH}/api/groups/groups_{project_id}.csv"
328-
agg_results_filename = f"{DATA_PATH}/api/agg_results/agg_results_{project_id}.csv"
329-
agg_results_by_user_id_filename = f"{DATA_PATH}/api/users/users_{project_id}.csv"
339+
results_filename = f"{DATA_PATH}/api/results/results_{project_id}.csv.gz"
340+
tasks_filename = f"{DATA_PATH}/api/tasks/tasks_{project_id}.csv.gz"
341+
groups_filename = f"{DATA_PATH}/api/groups/groups_{project_id}.csv.gz"
342+
agg_results_filename = f"{DATA_PATH}/api/agg_results/agg_results_{project_id}.csv.gz"
343+
agg_results_by_user_id_filename = f"{DATA_PATH}/api/users/users_{project_id}.csv.gz"
330344
project_stats_by_date_filename = f"{DATA_PATH}/api/history/history_{project_id}.csv"
331345

332346
# load data from postgres or local storage if already downloaded
@@ -339,11 +353,23 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
339353
groups_df = get_groups(groups_filename, project_id)
340354
tasks_df = get_tasks(tasks_filename, project_id)
341355

356+
if any("maxar" in s for s in project_info["tile_server_names"]):
357+
add_metadata = True
358+
342359
# aggregate results by task id
343360
agg_results_df = get_agg_results_by_task_id(results_df, tasks_df)
344-
agg_results_df.to_csv(agg_results_filename, index_label="idx")
361+
agg_results_df.to_csv(
362+
agg_results_filename,
363+
index_label="idx",
364+
compression="gzip"
365+
)
366+
367+
geojson_functions.gzipped_csv_to_gzipped_geojson(
368+
filename=agg_results_filename,
369+
geometry_field="geom",
370+
add_metadata=add_metadata
371+
)
345372
logger.info(f"saved agg results for {project_id}: {agg_results_filename}")
346-
geojson_functions.csv_to_geojson(agg_results_filename, "geom")
347373

348374
# aggregate results by user id
349375
# TODO: solve memory issue for agg results by user id
@@ -361,10 +387,6 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
361387
sentry.capture_exception()
362388
logger.info(f"failed to agg results by user id for {project_id}")
363389

364-
if any("maxar" in s for s in project_info["tile_server_names"]):
365-
add_metadata_to_csv(agg_results_filename)
366-
geojson_functions.add_metadata_to_geojson(agg_results_filename)
367-
368390
project_stats_by_date_df = project_stats_by_date.get_project_history(
369391
results_df, groups_df
370392
)
@@ -380,7 +402,10 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
380402
# do not do this for ArbitraryGeometry / BuildingFootprint projects
381403
logger.info(f"do NOT generate tasking manager geometries for {project_id}")
382404
else:
383-
tasking_manager_geometries.generate_tasking_manager_geometries(project_id)
405+
tasking_manager_geometries.generate_tasking_manager_geometries(
406+
project_id=project_id,
407+
agg_results_filename=agg_results_filename
408+
)
384409

385410
# prepare output of function
386411
project_stats_dict = {

mapswipe_workers/mapswipe_workers/generate_stats/tasking_manager_geometries.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import csv
2+
import gzip
23
import threading
34
from queue import Queue
45

@@ -8,15 +9,15 @@
89
from mapswipe_workers.utils import geojson_functions, tile_functions
910

1011

11-
def load_data(project_id: str, csv_file: str) -> list:
12+
def load_data(project_id: str, gzipped_csv_file: str) -> list:
1213
"""
1314
This will load the aggregated results csv file into a list of dictionaries.
1415
For further steps we currently rely on task_x, task_y, task_z and yes_share and
1516
maybe_share and wkt
1617
"""
1718

1819
project_data = []
19-
with open(csv_file, "r") as f:
20+
with gzip.open(gzipped_csv_file, mode="rt") as f:
2021
reader = csv.reader(f, delimiter=",")
2122

2223
for i, row in enumerate(reader):
@@ -416,7 +417,7 @@ def dissolve_project_data(project_data):
416417
return dissolved_geometry
417418

418419

419-
def generate_tasking_manager_geometries(project_id: str):
420+
def generate_tasking_manager_geometries(project_id: str, agg_results_filename):
420421
"""
421422
This functions runs the workflow to create a GeoJSON file ready to be used in the
422423
HOT Tasking Manager.
@@ -428,14 +429,13 @@ def generate_tasking_manager_geometries(project_id: str):
428429
Finally, both data sets are saved into GeoJSON files.
429430
"""
430431

431-
raw_data_filename = f"{DATA_PATH}/api/agg_results/agg_results_{project_id}.csv"
432432
filtered_data_filename = f"{DATA_PATH}/api/yes_maybe/yes_maybe_{project_id}.geojson"
433433
tasking_manager_data_filename = (
434434
f"{DATA_PATH}/api/hot_tm/hot_tm_{project_id}.geojson"
435435
)
436436

437437
# load project data from existing files
438-
results = load_data(project_id, raw_data_filename)
438+
results = load_data(project_id, agg_results_filename)
439439

440440
# filter yes and maybe results
441441
filtered_results = filter_data(results)

mapswipe_workers/mapswipe_workers/utils/geojson_functions.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,67 @@
11
import json
22
import os
3+
import gzip
4+
import shutil
35
import subprocess
46

57
from osgeo import ogr, osr
68

79
from mapswipe_workers.definitions import logger
810

911

12+
def gzipped_csv_to_gzipped_geojson(
13+
filename: str,
14+
geometry_field: str = "geom",
15+
add_metadata: bool = False
16+
):
17+
"""Use ogr2ogr to convert csv file to GeoJSON.
18+
19+
Check if file is compressed.
20+
"""
21+
csv_file = "temp.csv"
22+
geojson_file = "temp.geojson"
23+
outfile = filename.replace(".csv", f"_{geometry_field}.geojson")
24+
filename_without_path = csv_file.split("/")[-1].replace(".csv", "")
25+
26+
with gzip.open(filename, 'rb') as f_in:
27+
with open(csv_file, 'wb') as f_out:
28+
shutil.copyfileobj(f_in, f_out)
29+
30+
# need to remove file here because ogr2ogr can't overwrite when choosing GeoJSON
31+
if os.path.isfile(geojson_file):
32+
os.remove(geojson_file)
33+
34+
# TODO: remove geom column from normal attributes in sql query
35+
subprocess.run(
36+
[
37+
"ogr2ogr",
38+
"-f",
39+
"GeoJSON",
40+
geojson_file,
41+
csv_file,
42+
"-sql",
43+
f'SELECT *, CAST({geometry_field} as geometry) FROM "{filename_without_path}"', # noqa E501
44+
],
45+
check=True,
46+
)
47+
logger.info(f"converted {filename} to {outfile}.")
48+
49+
if add_metadata:
50+
add_metadata_to_geojson(geojson_file)
51+
52+
cast_datatypes_for_geojson(geojson_file)
53+
54+
with open(geojson_file, "r") as f:
55+
json_data = json.load(f)
56+
57+
with gzip.open(outfile, 'wt') as fout:
58+
json.dump(json_data, fout)
59+
60+
# remove temp files
61+
os.remove(csv_file)
62+
os.remove(geojson_file)
63+
64+
1065
def csv_to_geojson(filename: str, geometry_field: str = "geom"):
1166
"""
1267
Use ogr2ogr to convert csv file to GeoJSON

0 commit comments

Comments
 (0)