Skip to content

Commit 11a3e4b

Browse files
authored
Merge pull request #453 from mapswipe/zip-files
compress files for export with gzip #451
2 parents 4c8592b + f0c5eb1 commit 11a3e4b

File tree

3 files changed

+103
-25
lines changed

3 files changed

+103
-25
lines changed

mapswipe_workers/mapswipe_workers/generate_stats/project_stats.py

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import datetime
22
import os
33
from typing import List
4+
import gzip
5+
import tempfile
46

57
import pandas as pd
68
from psycopg2 import sql
@@ -26,15 +28,21 @@ def add_metadata_to_csv(filename: str):
2628
logger.info(f"added metadata to {filename}.")
2729

2830

29-
def write_sql_to_csv(filename: str, sql_query: sql.SQL):
31+
def write_sql_to_gzipped_csv(filename: str, sql_query: sql.SQL):
3032
"""
3133
Use the copy statement to write data from postgres to a csv file.
3234
"""
3335

36+
# generate temporary file which will be automatically deleted at the end
37+
tmp_csv_file = os.path.join(tempfile._get_default_tempdir(), 'tmp.csv')
3438
pg_db = auth.postgresDB()
35-
with open(filename, "w") as f:
39+
with open(tmp_csv_file, "w") as f:
3640
pg_db.copy_expert(sql_query, f)
37-
logger.info(f"wrote csv file from sql: {filename}")
41+
42+
with open(tmp_csv_file, 'rb') as f_in, gzip.open(filename, 'wb') as f_out:
43+
f_out.writelines(f_in)
44+
45+
logger.info(f"wrote gzipped csv file from sql: {filename}")
3846

3947

4048
def load_df_from_csv(filename: str) -> pd.DataFrame:
@@ -44,7 +52,11 @@ def load_df_from_csv(filename: str) -> pd.DataFrame:
4452
"""
4553
dtype_dict = {"project_id": str, "group_id": str, "task_id": str}
4654

47-
df = pd.read_csv(filename, dtype=dtype_dict)
55+
df = pd.read_csv(
56+
filename,
57+
dtype=dtype_dict,
58+
compression="gzip"
59+
)
4860
logger.info(f"loaded pandas df from {filename}")
4961
return df
5062

@@ -73,7 +85,7 @@ def get_results(filename: str, project_id: str) -> pd.DataFrame:
7385
) TO STDOUT WITH CSV HEADER
7486
"""
7587
).format(sql.Literal(project_id))
76-
write_sql_to_csv(filename, sql_query)
88+
write_sql_to_gzipped_csv(filename, sql_query)
7789

7890
df = load_df_from_csv(filename)
7991

@@ -117,7 +129,7 @@ def get_tasks(filename: str, project_id: str) -> pd.DataFrame:
117129
) TO STDOUT WITH CSV HEADER
118130
"""
119131
).format(sql.Literal(project_id))
120-
write_sql_to_csv(filename, sql_query)
132+
write_sql_to_gzipped_csv(filename, sql_query)
121133

122134
df = load_df_from_csv(filename)
123135
return df
@@ -152,7 +164,7 @@ def get_groups(filename: str, project_id: str) -> pd.DataFrame:
152164
) TO STDOUT WITH CSV HEADER
153165
"""
154166
).format(sql.Literal(project_id))
155-
write_sql_to_csv(filename, sql_query)
167+
write_sql_to_gzipped_csv(filename, sql_query)
156168

157169
df = load_df_from_csv(filename)
158170
return df
@@ -322,11 +334,11 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
322334
"""
323335

324336
# set filenames
325-
results_filename = f"{DATA_PATH}/api/results/results_{project_id}.csv"
326-
tasks_filename = f"{DATA_PATH}/api/tasks/tasks_{project_id}.csv"
327-
groups_filename = f"{DATA_PATH}/api/groups/groups_{project_id}.csv"
328-
agg_results_filename = f"{DATA_PATH}/api/agg_results/agg_results_{project_id}.csv"
329-
agg_results_by_user_id_filename = f"{DATA_PATH}/api/users/users_{project_id}.csv"
337+
results_filename = f"{DATA_PATH}/api/results/results_{project_id}.csv.gz"
338+
tasks_filename = f"{DATA_PATH}/api/tasks/tasks_{project_id}.csv.gz"
339+
groups_filename = f"{DATA_PATH}/api/groups/groups_{project_id}.csv.gz"
340+
agg_results_filename = f"{DATA_PATH}/api/agg_results/agg_results_{project_id}.csv.gz"
341+
agg_results_by_user_id_filename = f"{DATA_PATH}/api/users/users_{project_id}.csv.gz"
330342
project_stats_by_date_filename = f"{DATA_PATH}/api/history/history_{project_id}.csv"
331343

332344
# load data from postgres or local storage if already downloaded
@@ -339,11 +351,22 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
339351
groups_df = get_groups(groups_filename, project_id)
340352
tasks_df = get_tasks(tasks_filename, project_id)
341353

354+
if any("maxar" in s for s in project_info["tile_server_names"]):
355+
add_metadata = True
356+
342357
# aggregate results by task id
343358
agg_results_df = get_agg_results_by_task_id(results_df, tasks_df)
344-
agg_results_df.to_csv(agg_results_filename, index_label="idx")
359+
agg_results_df.to_csv(
360+
agg_results_filename,
361+
index_label="idx"
362+
)
363+
364+
geojson_functions.gzipped_csv_to_gzipped_geojson(
365+
filename=agg_results_filename,
366+
geometry_field="geom",
367+
add_metadata=add_metadata
368+
)
345369
logger.info(f"saved agg results for {project_id}: {agg_results_filename}")
346-
geojson_functions.csv_to_geojson(agg_results_filename, "geom")
347370

348371
# aggregate results by user id
349372
# TODO: solve memory issue for agg results by user id
@@ -352,7 +375,8 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
352375
results_df, agg_results_df
353376
)
354377
agg_results_by_user_id_df.to_csv(
355-
agg_results_by_user_id_filename, index_label="idx"
378+
agg_results_by_user_id_filename,
379+
index_label="idx"
356380
)
357381
logger.info(
358382
f"saved agg results for {project_id}: {agg_results_by_user_id_filename}"
@@ -361,10 +385,6 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
361385
sentry.capture_exception()
362386
logger.info(f"failed to agg results by user id for {project_id}")
363387

364-
if any("maxar" in s for s in project_info["tile_server_names"]):
365-
add_metadata_to_csv(agg_results_filename)
366-
geojson_functions.add_metadata_to_geojson(agg_results_filename)
367-
368388
project_stats_by_date_df = project_stats_by_date.get_project_history(
369389
results_df, groups_df
370390
)
@@ -380,7 +400,10 @@ def get_per_project_statistics(project_id: str, project_info: pd.Series) -> dict
380400
# do not do this for ArbitraryGeometry / BuildingFootprint projects
381401
logger.info(f"do NOT generate tasking manager geometries for {project_id}")
382402
else:
383-
tasking_manager_geometries.generate_tasking_manager_geometries(project_id)
403+
tasking_manager_geometries.generate_tasking_manager_geometries(
404+
project_id=project_id,
405+
agg_results_filename=agg_results_filename
406+
)
384407

385408
# prepare output of function
386409
project_stats_dict = {

mapswipe_workers/mapswipe_workers/generate_stats/tasking_manager_geometries.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import csv
2+
import gzip
23
import threading
34
from queue import Queue
45

@@ -8,15 +9,15 @@
89
from mapswipe_workers.utils import geojson_functions, tile_functions
910

1011

11-
def load_data(project_id: str, csv_file: str) -> list:
12+
def load_data(project_id: str, gzipped_csv_file: str) -> list:
1213
"""
1314
This will load the aggregated results csv file into a list of dictionaries.
1415
For further steps we currently rely on task_x, task_y, task_z and yes_share and
1516
maybe_share and wkt
1617
"""
1718

1819
project_data = []
19-
with open(csv_file, "r") as f:
20+
with gzip.open(gzipped_csv_file, mode="rt") as f:
2021
reader = csv.reader(f, delimiter=",")
2122

2223
for i, row in enumerate(reader):
@@ -416,7 +417,7 @@ def dissolve_project_data(project_data):
416417
return dissolved_geometry
417418

418419

419-
def generate_tasking_manager_geometries(project_id: str):
420+
def generate_tasking_manager_geometries(project_id: str, agg_results_filename):
420421
"""
421422
This functions runs the workflow to create a GeoJSON file ready to be used in the
422423
HOT Tasking Manager.
@@ -428,14 +429,13 @@ def generate_tasking_manager_geometries(project_id: str):
428429
Finally, both data sets are saved into GeoJSON files.
429430
"""
430431

431-
raw_data_filename = f"{DATA_PATH}/api/agg_results/agg_results_{project_id}.csv"
432432
filtered_data_filename = f"{DATA_PATH}/api/yes_maybe/yes_maybe_{project_id}.geojson"
433433
tasking_manager_data_filename = (
434434
f"{DATA_PATH}/api/hot_tm/hot_tm_{project_id}.geojson"
435435
)
436436

437437
# load project data from existing files
438-
results = load_data(project_id, raw_data_filename)
438+
results = load_data(project_id, agg_results_filename)
439439

440440
# filter yes and maybe results
441441
filtered_results = filter_data(results)

mapswipe_workers/mapswipe_workers/utils/geojson_functions.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,67 @@
11
import json
22
import os
3+
import gzip
4+
import shutil
35
import subprocess
6+
import tempfile
47

58
from osgeo import ogr, osr
69

710
from mapswipe_workers.definitions import logger
811

912

13+
def gzipped_csv_to_gzipped_geojson(
14+
filename: str,
15+
geometry_field: str = "geom",
16+
add_metadata: bool = False
17+
):
18+
"""Convert gzipped csv file to gzipped GeoJSON.
19+
20+
First the gzipped files are unzipped and stored in temporary csv and geojson files.
21+
Then the unzipped csv file is converted into a geojson file with ogr2ogr.
22+
Last, the generated geojson file is again compressed using gzip.
23+
"""
24+
# generate temporary files which will be automatically deleted at the end
25+
tmp_csv_file = os.path.join(tempfile._get_default_tempdir(), 'tmp.csv')
26+
tmp_geojson_file = os.path.join(tempfile._get_default_tempdir(), 'tmp.geojson')
27+
28+
outfile = filename.replace(".csv", f"_{geometry_field}.geojson")
29+
30+
# uncompress content of zipped csv file and save to csv file
31+
with gzip.open(filename, 'rb') as f_in:
32+
with open(tmp_csv_file, "wb") as f_out:
33+
shutil.copyfileobj(f_in, f_out)
34+
35+
# use ogr2ogr to transform csv file into geojson file
36+
# TODO: remove geom column from normal attributes in sql query
37+
subprocess.run(
38+
[
39+
"ogr2ogr",
40+
"-f",
41+
"GeoJSON",
42+
tmp_geojson_file,
43+
tmp_csv_file,
44+
"-sql",
45+
f'SELECT *, CAST({geometry_field} as geometry) FROM "tmp"', # noqa E501
46+
],
47+
check=True,
48+
)
49+
50+
if add_metadata:
51+
add_metadata_to_geojson(tmp_geojson_file)
52+
53+
cast_datatypes_for_geojson(tmp_geojson_file)
54+
55+
# compress geojson file with gzip
56+
with open(tmp_geojson_file, "r") as f:
57+
json_data = json.load(f)
58+
59+
with gzip.open(outfile, 'wt') as fout:
60+
json.dump(json_data, fout)
61+
62+
logger.info(f"converted {filename} to {outfile} with ogr2ogr.")
63+
64+
1065
def csv_to_geojson(filename: str, geometry_field: str = "geom"):
1166
"""
1267
Use ogr2ogr to convert csv file to GeoJSON

0 commit comments

Comments
 (0)